PavelPetukhov opened a new issue #2317:
URL: https://github.com/apache/hudi/issues/2317


   **Describe the problem you faced**
   
   Trying to store Kafka messages into HDFS with HoodieDeltaStreamer.
   Spark Submit job works as expected for some time but fails after few minutes
   
   I designed my test as follows:
   Starting with few records in the table, I am adding KC connector, then 
loading table schema to Schema Registry.
   After that I am starting spark submit job, while it is running, I am adding 
1 new record per 10 secs into my table.
   I see those messages in Kafka and HoodieDeltaStreamer works fine storing 
what is expected, but then it fails
   
   It fails with:
   20/12/09 18:50:48 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down 
delta-sync due to exception
   org.apache.spark.SparkException: Task not serializable
   
   **To Reproduce**
   
   spark-submit 
   --conf spark.eventLog.overwrite=true 
   --conf spark.rdd.compress=true 
   --conf "spark.eventLog.enabled=true" 
   --conf "spark.eventLog.dir=hdfs://some_dir" 
   --packages 
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4
 
   --master yarn 
   --deploy-mode cluster 
   --num-executors 2 
   --executor-cores 3 
   --conf spark.task.maxFailures=10 
   --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35" 
   --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35" 
   --conf spark.yarn.executor.memoryOverhead=5G 
   --conf spark.memory.fraction=0.4 
   --conf spark.rdd.compress=true 
   --conf spark.kryoserializer.buffer.max=200m 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.reducer.maxReqsInFlight=1  
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
hudi-utilities-bundle_2.11-0.6.0.jar 
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --source-ordering-field ts_ms 
   --target-base-path /user/hdfs/final_table 
   --target-table table_name  
   --hoodie-conf hoodie.upsert.shuffle.parallelism=2 
   --hoodie-conf hoodie.insert.shuffle.parallelism=2 
   --hoodie-conf hoodie.delete.shuffle.parallelism=2 
   --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=2 
   --hoodie-conf hoodie.embed.timeline.server=false 
   --hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE 
   --hoodie-conf hoodie.compact.inline=false 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.partitionpath.field= 
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=url 
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=topic_name 
   --hoodie-conf bootstrap.servers=bootstrap_servers:9092 
   --hoodie-conf auto.offset.reset=earliest 
   --hoodie-conf schema.registry.url=schema-registry 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --continuous
   
   **Expected behavior**
   
   Expecting spark submit job to run without failures
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.4
   
   * Hive version : -
   
   * Hadoop version : apache-2.8.4
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   20/12/09 18:50:48 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down 
delta-sync due to exception
   org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:380)
        at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:379)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.flatMap(RDD.scala:379)
        at 
org.apache.spark.api.java.JavaRDDLike$class.flatMap(JavaRDDLike.scala:126)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.flatMap(JavaRDDLike.scala:45)
        at 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.generateCompactionPlan(HoodieMergeOnReadTableCompactor.java:198)
        at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:78)
        at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:107)
        at 
org.apache.hudi.table.HoodieMergeOnReadTable.scheduleCompaction(HoodieMergeOnReadTable.java:123)
        at 
org.apache.hudi.client.HoodieWriteClient.scheduleCompactionAtInstant(HoodieWriteClient.java:629)
        at 
org.apache.hudi.client.HoodieWriteClient.scheduleCompaction(HoodieWriteClient.java:617)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:421)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.NotSerializableException: 
org.apache.hudi.common.util.collection.RocksDBDAO
   Serialization stack:
        - object not serializable (class: 
org.apache.hudi.common.util.collection.RocksDBDAO, value: 
org.apache.hudi.common.util.collection.RocksDBDAO@249bfede)
        - field (class: 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB, 
type: class org.apache.hudi.common.util.collection.RocksDBDAO)
        - object (class 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@333a7a34)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 4)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor, 
functionalInterfaceMethod=org/apache/spark/api/java/function/FlatMapFunction.call:(Ljava/lang/Object;)Ljava/util/Iterator;,
 implementation=invokeSpecial 
org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.lambda$generateCompactionPlan$6303544c$1:(Lorg/apache/hudi/common/table/view/TableFileSystemView$SliceView;Ljava/util/Set;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/String;)Ljava/util/Iterator;,
 instantiatedMethodType=(Ljava/lang/String;)Ljava/util/Iterator;, 
numCaptured=4])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$194/85856431,
 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$194/85856431@913d14d)
        - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, 
name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
        - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, 
<function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
        ... 24 more
   20/12/09 18:50:48 WARN deltastreamer.HoodieDeltaStreamer: Gracefully 
shutting down compactor
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to