PRASHANT BHOSALE created HUDI-1641:
--------------------------------------
Summary: Issue for Integrating Hudi with Kafka using Avro Schema
Key: HUDI-1641
URL: https://issues.apache.org/jira/browse/HUDI-1641
Project: Apache Hudi
Issue Type: Bug
Components: DeltaStreamer, Spark Integration, Utilities
Reporter: PRASHANT BHOSALE
Fix For: 0.7.0
I am trying to integrate Hudi with Kafka topic.
Steps followed :
# Created Kafka topic in confluent with schema defined.
# Using kafka-avro-console-producer, I am trying to produce data.
# I am running Hudi Delta Streamer in continuous mode.
I am getting the below error :
{code:java}
21/02/25 13:48:14 ERROR TaskResultGetter: Exception while getting task
result21/02/25 13:48:14 ERROR TaskResultGetter: Exception while getting task
resultorg.apache.spark.SparkException: Error reading attempting to read avro
data -- encountered an unknown fingerprint: 103427103938146401, not sure what
schema to use. This could happen if you registered additional schemas after
starting your spark context. at
org.apache.spark.serializer.GenericAvroSerializer$$anonfun$4.apply(GenericAvroSerializer.scala:141)
at
org.apache.spark.serializer.GenericAvroSerializer$$anonfun$4.apply(GenericAvroSerializer.scala:138)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at
org.apache.spark.serializer.GenericAvroSerializer.deserializeDatum(GenericAvroSerializer.scala:137)
at
org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:162)
at
org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:371)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:88) at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:72)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)21/02/25 13:48:14 INFO YarnScheduler:
Removed TaskSet 14.0, whose tasks have all completed, from pool 21/02/25
13:48:14 INFO YarnScheduler: Cancelling stage 1421/02/25 13:48:14 INFO
YarnScheduler: Killing all running tasks in stage 14: Stage cancelled21/02/25
13:48:14 INFO DAGScheduler: ResultStage 14 (isEmpty at DeltaSync.java:380)
failed in 0.696 s due to Job aborted due to stage failure: Exception while
getting task result: org.apache.spark.SparkException: Error reading attempting
to read avro data -- encountered an unknown fingerprint: 103427103938146401,
not sure what schema to use. This could happen if you registered additional
schemas after starting your spark context.21/02/25 13:48:14 INFO DAGScheduler:
Job 8 failed: isEmpty at DeltaSync.java:380, took 0.704193 s21/02/25 13:48:14
ERROR HoodieDeltaStreamer: Shutting down delta-sync due to
exceptionorg.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result: org.apache.spark.SparkException: Error
reading attempting to read avro data -- encountered an unknown fingerprint:
103427103938146401, not sure what schema to use. This could happen if you
registered additional schemas after starting your spark context. at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028) at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at
org.apache.spark.rdd.RDD.take(RDD.scala:1337) at
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472) at
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472) at
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at
org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471) at
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544) at
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:587)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
Can someone please have a look at this and guide me if I need to see at any of
my configurations in order to fix this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)