PavelPetukhov opened a new issue #2317:
URL: https://github.com/apache/hudi/issues/2317
**Describe the problem you faced**
Trying to store Kafka messages into HDFS with HoodieDeltaStreamer.
Spark Submit job works as expected for some time but fails after few minutes
I designed my test as follows:
Starting with few records in the table, I am adding KC connector, then
loading table schema to Schema Registry.
After that I am starting spark submit job, while it is running, I am adding
1 new record per 10 secs into my table.
I see those messages in Kafka and HoodieDeltaStreamer works fine storing
what is expected, but then it fails
It fails with:
20/12/09 18:50:48 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down
delta-sync due to exception
org.apache.spark.SparkException: Task not serializable
**To Reproduce**
spark-submit
--conf spark.eventLog.overwrite=true
--conf spark.rdd.compress=true
--conf "spark.eventLog.enabled=true"
--conf "spark.eventLog.dir=hdfs://some_dir"
--packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4
--master yarn
--deploy-mode cluster
--num-executors 2
--executor-cores 3
--conf spark.task.maxFailures=10
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf spark.yarn.executor.memoryOverhead=5G
--conf spark.memory.fraction=0.4
--conf spark.rdd.compress=true
--conf spark.kryoserializer.buffer.max=200m
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.reducer.maxReqsInFlight=1
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
hudi-utilities-bundle_2.11-0.6.0.jar
--table-type MERGE_ON_READ
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field ts_ms
--target-base-path /user/hdfs/final_table
--target-table table_name
--hoodie-conf hoodie.upsert.shuffle.parallelism=2
--hoodie-conf hoodie.insert.shuffle.parallelism=2
--hoodie-conf hoodie.delete.shuffle.parallelism=2
--hoodie-conf hoodie.bulkinsert.shuffle.parallelism=2
--hoodie-conf hoodie.embed.timeline.server=false
--hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
--hoodie-conf hoodie.compact.inline=false
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.partitionpath.field=
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=url
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=topic_name
--hoodie-conf bootstrap.servers=bootstrap_servers:9092
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf schema.registry.url=schema-registry
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--continuous
**Expected behavior**
Expecting spark submit job to run without failures
**Environment Description**
* Hudi version : 0.6.0
* Spark version : 2.4.4
* Hive version : -
* Hadoop version : apache-2.8.4
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
20/12/09 18:50:48 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down
delta-sync due to exception
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:380)
at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:379)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:379)
at
org.apache.spark.api.java.JavaRDDLike$class.flatMap(JavaRDDLike.scala:126)
at
org.apache.spark.api.java.AbstractJavaRDDLike.flatMap(JavaRDDLike.scala:45)
at
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.generateCompactionPlan(HoodieMergeOnReadTableCompactor.java:198)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:78)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:107)
at
org.apache.hudi.table.HoodieMergeOnReadTable.scheduleCompaction(HoodieMergeOnReadTable.java:123)
at
org.apache.hudi.client.HoodieWriteClient.scheduleCompactionAtInstant(HoodieWriteClient.java:629)
at
org.apache.hudi.client.HoodieWriteClient.scheduleCompaction(HoodieWriteClient.java:617)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:421)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException:
org.apache.hudi.common.util.collection.RocksDBDAO
Serialization stack:
- object not serializable (class:
org.apache.hudi.common.util.collection.RocksDBDAO, value:
org.apache.hudi.common.util.collection.RocksDBDAO@249bfede)
- field (class:
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB,
type: class org.apache.hudi.common.util.collection.RocksDBDAO)
- object (class
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView,
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@333a7a34)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 4)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor,
functionalInterfaceMethod=org/apache/spark/api/java/function/FlatMapFunction.call:(Ljava/lang/Object;)Ljava/util/Iterator;,
implementation=invokeSpecial
org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.lambda$generateCompactionPlan$6303544c$1:(Lorg/apache/hudi/common/table/view/TableFileSystemView$SliceView;Ljava/util/Set;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/String;)Ljava/util/Iterator;,
instantiatedMethodType=(Ljava/lang/String;)Ljava/util/Iterator;,
numCaptured=4])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$194/85856431,
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$194/85856431@913d14d)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1,
name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 24 more
20/12/09 18:50:48 WARN deltastreamer.HoodieDeltaStreamer: Gracefully
shutting down compactor
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]