[
https://issues.apache.org/jira/browse/HUDI-2487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420111#comment-17420111
]
qianchutao edited comment on HUDI-2487 at 9/25/21, 2:09 PM:
------------------------------------------------------------
I have modified the <org. Apache. Hudi. Utilities. Sources. JsonKafkaSource#
fetchNewData> code.
Filter null message filtering in Kafka to prevent subsequent tasks from
throwing exceptions because of NULL messages.
This is my pull requst:
[pull request|https://github.com/apache/hudi/pull/3715]
was (Author: qianchutao):
I have modified the <org. Apache. Hudi. Utilities. Sources. JsonKafkaSource#
fetchNewData> code.Filter null message filtering in Kafka to prevent subsequent
tasks from throwing exceptions because of NULL messages
> An empty message in Kafka causes a task exception
> -------------------------------------------------
>
> Key: HUDI-2487
> URL: https://issues.apache.org/jira/browse/HUDI-2487
> Project: Apache Hudi
> Issue Type: Improvement
> Components: DeltaStreamer
> Reporter: qianchutao
> Assignee: qianchutao
> Priority: Major
> Labels: easyfix, newbie, pull-request-available
> Fix For: 0.9.0
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> h1. Question:
> When I use deltaStreamer to update hive tables in upsert mode from json
> data in Kafka to HUDi, if the value of the message body in Kafka is null, the
> task throws an exception.
> h2. Exception description:
> Lost task 0.1 in stage 2.0 (TID 24,
> node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6):
> org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null
> at
> org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> h1. The task Settings:
>
> {code:java}
> hoodie.datasource.write.precombine.field=tmSmp
> hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId
> hoodie.datasource.hive_sync.partition_fields=db,dt
> hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
> hoodie.datasource.hive_sync.enable=true
> hoodie.datasource.meta.sync.enable=true
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
> hoodie.datasource.hive_sync.support_timestamp=true
> hoodie.datasource.hive_sync.auto_create_database=true
> hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool
> hoodie.datasource.hive_sync.base_file_format=PARQUET
> {code}
>
>
> h1. Spark-submit Script parameter Settings:
>
> {code:java}
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
> --source-ordering-field tmSmp \
> --table-type MERGE_ON_READ \
> --target-table ${TABLE_NAME} \
> --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
> --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --enable-sync \
> --op UPSERT \
> --continuous \
> {code}
>
>
> So I think some optimizations can be made to prevent task throwing,
> such as filtering messages with a null value in Kafka.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)