qianchutao created HUDI-2487:
--------------------------------

             Summary: An empty message in Kafka causes a task exception
                 Key: HUDI-2487
                 URL: https://issues.apache.org/jira/browse/HUDI-2487
             Project: Apache Hudi
          Issue Type: Improvement
          Components: DeltaStreamer
            Reporter: qianchutao
            Assignee: qianchutao
             Fix For: 0.9.0


h1. Question:

      When I use deltaStreamer to update hive tables in upsert mode from json 
data in Kafka to HUDi, if the value of the message body in Kafka is null, the 
task throws an exception.
h2. Exception description:

Lost task 0.1 in stage 2.0 (TID 24, 
node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6): 
org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null
 at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463)
 at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389)
 at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
 at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
h1. The task Settings:

 
{code:java}
hoodie.datasource.write.precombine.field=tmSmp
hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId
hoodie.datasource.hive_sync.partition_fields=db,dt
hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.meta.sync.enable=true
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.support_timestamp=true
hoodie.datasource.hive_sync.auto_create_database=true
hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool
hoodie.datasource.hive_sync.base_file_format=PARQUET
{code}
 

 
h1. Spark-submit Script parameter Settings:

 
{code:java}
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
--source-ordering-field tmSmp \
--table-type MERGE_ON_READ  \
--target-table ${TABLE_NAME} \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider 
\
--enable-sync \
--op UPSERT \
--continuous \
{code}
 

 

       So I think some optimizations can be made to prevent task throwing, such 
as filtering messages with a null value in Kafka.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to