jiangok2006 opened a new issue #2537:
URL: https://github.com/apache/hudi/issues/2537
Lian Jiang Jan 28th at 7:07 AM
I found that hudi delta streamer has trouble to use transformer. The
upstream data from kafka has Timestamp type instead of long type of time field.
I cannot change the upstream data and deltastreamer does not support Timestamp
type in TimestampBasedKeyGenerator. So I have to use a transformer to convert
the timestamp type. But a transformer makes the delta streamer fail due to
error like "Commit 20210128140331 failed and rolled-back !". Even a simple sql
query like below will throw that error:
--transformer-class
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT * FROM <SRC>" \
Any idea? I am using 0.6.0. This blocks us using deltastreamer in our
pipeline. Thanks for any quick response. (edited)
6 replies
Sudha 7 days ago
Hi Lian can you paste the full stack trace of that error?
Lian Jiang 7 days ago
Thanks @Sudha Here is the stacktrace:
Exception in thread "main" org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and
rolled-back !
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:152)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:147)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and
rolled-back !
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.hudi.async.AbstractAsyncService.waitForShutdown(AbstractAsyncService.java:79)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:150)
... 15 more
Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620
failed and rolled-back !
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:595)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620
failed and rolled-back !
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:442)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
... 4 more
21/01/28 19:37:05 INFO ShutdownHookManager: Shutdown hook called
21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory
/mnt/tmp/spark-e69f8874-bc3b-4d24-83b0-a3701131dcd2
21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory
/mnt/tmp/spark-c25b42a0-116e-4d0e-ae05-c9528365fead
Here is the command:
Lian Jiang 7 days ago
spark-submit --packages
org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0
\
--class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.6.0/hudi-utilities-bundle_2.11-0.6.0.jar
\
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--props /tmp/kafka-source.properties \
--source-class
org.apache.hudi.utilities.sources.AvroKafkaSource \
--table-type COPY_ON_WRITE \
--target-base-path s3://mylocation \
--target-table mytable \
--op UPSERT \
--continuous \
--min-sync-interval-seconds 100 \
--source-ordering-field Metadata.processingTime \
--hoodie-conf
hoodie.datasource.write.recordkey.field=Metadata.id \
--hoodie-conf
hoodie.datasource.write.precombine.field=Metadata.processingTime \
--hoodie-conf
hoodie.datasource.write.partitionpath.field=processingTime2 \
--hoodie-conf
hoodie.datasource.hive_sync.partition_fields=processingTime2 \
--hoodie-conf
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
\
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
\
--hoodie-conf
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM-dd-HH \
--hoodie-conf
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP \
--hoodie-conf
hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf bootstrap.servers=my_kafka_broker:6020 \
--hoodie-conf sasl.mechanism=SCRAM-SHA-256 \
--hoodie-conf security.protocol=SASL_SSL \
--hoodie-conf
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required username=\"username\" password=\"password\";" \
--hoodie-conf
hoodie.deltastreamer.source.kafka.topic=my_topic \
--hoodie-conf
schema.registry.url=https://schema-registry.net:443 \
--hoodie-conf
hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.net/subjects/mysubject/versions/latest
\
--transformer-class
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT
*, cast(Metadata.processingTime as long) processingTime2 FROM <SRC>"
Let me know if you need other info. Thanks. (edited)
Sudha 6 days ago
Hi @Lian Jiang I was expecting to see some error specific to the transformer
in the stack trace. But not seeing anything like that. One thing I noted in
you sql is you might want to rewrite like this
SELECT *, cast(a.Metadata.processingTime as long) processingTime2 FROM <SRC>
a
Can you try that and check if that helps ?
Lian Jiang 5 days ago
Thanks @Sudha, unfortunately, I tried it and still got the same error. I
found below error:
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
at
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
... 3 more
Caused by: java.lang.NegativeArraySizeException
at
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
at
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.Invoke_10$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_17$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_18$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_41$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_6.createExternalRow_41_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_13.createExternalRow_85_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
... 28 more
I can use spark-shell and hudi lib to simulate what delta streamer does
(read a dataframe, use spark sql to cast processingTime to processingTime2,
save to hudi) and the spark-shell works. However, delta streamer's transformer
throws serDe exception when handling the kafka avro data. Both spark-shell and
deltastreamer use org.apache.spark:spark-avro_2.11:2.4.4. Any idea? Thanks.
(edited)
Sudha 3 days ago
@Lian Jiang Both uses the same. Could you create a GH issue with this for
more support. At this point, I am also out of idea, May be Gary or Balaji can
help wiht this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]