jiangok2006 opened a new issue #2537:
URL: https://github.com/apache/hudi/issues/2537


   Lian Jiang Jan 28th at 7:07 AM
   I found that hudi delta streamer has trouble to use transformer. The 
upstream data from kafka has Timestamp type instead of long type of time field. 
I cannot change the upstream data and deltastreamer does not support Timestamp 
type in TimestampBasedKeyGenerator.  So I have to use a transformer to convert 
the timestamp type. But a transformer makes the delta streamer fail due to 
error like "Commit 20210128140331 failed and rolled-back !". Even a simple sql 
query like below will throw that error:
   
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
   --hoodie-conf  hoodie.deltastreamer.transformer.sql="SELECT * FROM <SRC>" \
   
   Any idea? I am using 0.6.0. This blocks us using deltastreamer in our 
pipeline. Thanks for any quick response. (edited) 
   6 replies
   
   
   Sudha  7 days ago
   Hi Lian can you paste the full stack trace of that error?
   
   
   Lian Jiang  7 days ago
   Thanks @Sudha Here is the stacktrace:
   
   Exception in thread "main" org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and 
rolled-back !
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:152)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:147)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: Commit 20210128193620 failed and 
rolled-back !
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.hudi.async.AbstractAsyncService.waitForShutdown(AbstractAsyncService.java:79)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:150)
        ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620 
failed and rolled-back !
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:595)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: Commit 20210128193620 
failed and rolled-back !
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:442)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
        ... 4 more
   21/01/28 19:37:05 INFO ShutdownHookManager: Shutdown hook called
   21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory 
/mnt/tmp/spark-e69f8874-bc3b-4d24-83b0-a3701131dcd2
   21/01/28 19:37:05 INFO ShutdownHookManager: Deleting directory 
/mnt/tmp/spark-c25b42a0-116e-4d0e-ae05-c9528365fead
   
   Here is the command:
   Lian Jiang  7 days ago
   
   spark-submit --packages 
org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0
 \
                   --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
                   
https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.6.0/hudi-utilities-bundle_2.11-0.6.0.jar
 \
                   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
                   --props /tmp/kafka-source.properties \
                   --source-class 
org.apache.hudi.utilities.sources.AvroKafkaSource \
                   --table-type COPY_ON_WRITE \
                   --target-base-path s3://mylocation \
                   --target-table mytable \
                   --op UPSERT \
                   --continuous \
                   --min-sync-interval-seconds 100 \
                   --source-ordering-field Metadata.processingTime \
                   --hoodie-conf  
hoodie.datasource.write.recordkey.field=Metadata.id \
                   --hoodie-conf  
hoodie.datasource.write.precombine.field=Metadata.processingTime \
                   --hoodie-conf  
hoodie.datasource.write.partitionpath.field=processingTime2 \
                   --hoodie-conf  
hoodie.datasource.hive_sync.partition_fields=processingTime2 \
                   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
 \
                   --hoodie-conf  
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 \
                   --hoodie-conf  
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM-dd-HH \
                   --hoodie-conf  
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP \
                   --hoodie-conf  
hoodie.datasource.write.hive_style_partitioning=true \
                   --hoodie-conf  bootstrap.servers=my_kafka_broker:6020 \
                   --hoodie-conf  sasl.mechanism=SCRAM-SHA-256 \
                   --hoodie-conf  security.protocol=SASL_SSL \
                   --hoodie-conf  
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule       
          required username=\"username\" password=\"password\";" \
                   --hoodie-conf  
hoodie.deltastreamer.source.kafka.topic=my_topic \
                   --hoodie-conf  
schema.registry.url=https://schema-registry.net:443 \
                   --hoodie-conf  
hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.net/subjects/mysubject/versions/latest
 \
                   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
                   --hoodie-conf  hoodie.deltastreamer.transformer.sql="SELECT 
*, cast(Metadata.processingTime as long) processingTime2 FROM <SRC>"
   
   Let me know if you need other info. Thanks. (edited) 
   
   
   Sudha  6 days ago
   Hi @Lian Jiang I was expecting to see some error specific to the transformer 
in the stack trace.  But not seeing anything like that. One thing I noted in 
you sql is you might want to rewrite like this
   
   SELECT *, cast(a.Metadata.processingTime as long) processingTime2 FROM <SRC> 
a
   
   Can you try that and check if that helps ?
   Lian Jiang  5 days ago
   Thanks @Sudha, unfortunately, I tried it and still got the same error. I 
found below error:
   
        at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
        at 
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
        at 
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        ... 3 more
   Caused by: java.lang.NegativeArraySizeException
        at 
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
        at 
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.Invoke_10$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_1$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_17$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_18$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_1$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_41$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_6.createExternalRow_41_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection$NestedClass_13.createExternalRow_85_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
        ... 28 more
   
   I can use spark-shell and hudi lib to simulate what delta streamer does 
(read a dataframe, use spark sql to cast processingTime to processingTime2, 
save to hudi) and the spark-shell works. However, delta streamer's transformer 
throws serDe exception when handling the kafka avro data. Both spark-shell and 
deltastreamer use org.apache.spark:spark-avro_2.11:2.4.4. Any idea? Thanks. 
(edited)
   
    
   Sudha  3 days ago
   @Lian Jiang  Both uses the same. Could you create a GH issue with this for 
more support. At this point, I am also out of idea, May be Gary or Balaji can 
help wiht this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to