PavelPetukhov opened a new issue #2321:
URL: https://github.com/apache/hudi/issues/2321


   **Describe the problem you faced**
   
   Started spark submit job works fine and as expected (meaning that parquet 
files with data are stored and etc) for two minutes, but then it crashes (stack 
trace attached). And it happens on new incoming Kafka message
   
   ```
   spark-submit 
   --conf spark.eventLog.overwrite=true 
   --conf spark.rdd.compress=true 
   --conf "spark.eventLog.enabled=true" 
   --conf 
"spark.eventLog.dir=hdfs://hadoop-01-dwh.dclub.cloud.devmail.ru:9000/eventLogging"
 
   --packages 
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4
 
   --master yarn 
   --deploy-mode cluster 
   --num-executors 2 
   --executor-cores 3 
   --conf spark.task.maxFailures=10 
   --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35" 
   --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35" 
   --conf spark.yarn.executor.memoryOverhead=5G 
   --conf spark.memory.fraction=0.4 
   --conf spark.rdd.compress=true 
   --conf spark.kryoserializer.buffer.max=200m 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.reducer.maxReqsInFlight=1   
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
hudi-utilities-bundle_2.11-0.6.0.jar 
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --source-ordering-field ts_ms 
   --target-base-path /user/hdfs/ml_training_data.kc_ds_main.private 
   --target-table ml_training_data.kc_ds_main.private  
   --hoodie-conf hoodie.upsert.shuffle.parallelism=2 
   --hoodie-conf hoodie.insert.shuffle.parallelism=2 
   --hoodie-conf hoodie.delete.shuffle.parallelism=2 
   --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=2 
   --hoodie-conf hoodie.embed.timeline.server=false 
   --hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE 
   --hoodie-conf hoodie.compact.inline=false 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.partitionpath.field= 
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=url
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=topic 
   --hoodie-conf bootstrap.servers=kafka 
   --hoodie-conf auto.offset.reset=earliest 
   --hoodie-conf group.id=hudi_group 
   --hoodie-conf group.instance.id= 
   --hoodie-conf allow.auto.create.topics=true 
   --hoodie-conf client.id=consumer-console-consumer-1  
   --hoodie-conf schema.registry.url=schema_registry
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --continuous
   ```
   
   Stacktrace is below, but worth noting that in the logs I see that something 
similar happened few times without errors:
   ```
   20/12/10 17:28:39 INFO collection.RocksDBDAO: Prefix Search for (query=) on 
hudi_pending_compaction__user_hdfs_[my_topic_name]. Total Time Taken (msec)=3. 
Serialization Time taken(micro)=0, num entries=0
   20/12/10 17:28:39 INFO clean.CleanPlanner: No earliest commit to retain. No 
need to scan partitions !!
   20/12/10 17:28:39 INFO clean.CleanActionExecutor: Nothing to clean here. It 
is already clean
   20/12/10 17:28:40 INFO client.AbstractHoodieWriteClient: Committed 
20201210172828
   ...
   ```
   
   **Expected behavior**
   
   not to crash
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.4
   
   * Hive version : - 
   
   * Hadoop version : 2.8.4
   
   * Storage (HDFS/S3/GCS..) : hdfs
   
   * Running on Docker? (yes/no) : no
   
   
   **Stacktrace**
   
   ```20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Initializing 
pending compaction operations. Count=0
   20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Initializing 
external data file mapping. Count=0
   20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Created ROCKSDB 
based file-system view at /tmp/hoodie_timeline_rocksdb
   20/12/10 17:29:22 INFO collection.RocksDBDAO: Prefix Search for (query=) on 
hudi_pending_compaction__user_hdfs_[my_topic_name]. Total Time Taken (msec)=0. 
Serialization Time taken(micro)=0, num entries=0
   20/12/10 17:29:22 INFO compact.HoodieMergeOnReadTableCompactor: Compacting 
/user/hdfs/[my_topic_name] with commit 20201210172922
   20/12/10 17:29:22 INFO compact.HoodieMergeOnReadTableCompactor: Compaction 
looking for files to compact in [default] partitions
   20/12/10 17:29:22 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down 
delta-sync due to exception
   org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:380)
        at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:379)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.flatMap(RDD.scala:379)
        at 
org.apache.spark.api.java.JavaRDDLike$class.flatMap(JavaRDDLike.scala:126)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.flatMap(JavaRDDLike.scala:45)
        at 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.generateCompactionPlan(HoodieMergeOnReadTableCompactor.java:198)
        at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:78)
        at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:107)
        at 
org.apache.hudi.table.HoodieMergeOnReadTable.scheduleCompaction(HoodieMergeOnReadTable.java:123)
        at 
org.apache.hudi.client.HoodieWriteClient.scheduleCompactionAtInstant(HoodieWriteClient.java:629)
        at 
org.apache.hudi.client.HoodieWriteClient.scheduleCompaction(HoodieWriteClient.java:617)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:421)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.NotSerializableException: 
org.apache.hudi.common.util.collection.RocksDBDAO
   Serialization stack:
        - object not serializable (class: 
org.apache.hudi.common.util.collection.RocksDBDAO, value: 
org.apache.hudi.common.util.collection.RocksDBDAO@7f73bc0c)
        - field (class: 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB, 
type: class org.apache.hudi.common.util.collection.RocksDBDAO)
        - object (class 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@6225959a)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 4)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor, 
functionalInterfaceMethod=org/apache/spark/api/java/function/FlatMapFunction.call:(Ljava/lang/Object;)Ljava/util/Iterator;,
 implementation=invokeSpecial 
org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.lambda$generateCompactionPlan$6303544c$1:(Lorg/apache/hudi/common/table/view/TableFileSystemView$SliceView;Ljava/util/Set;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/String;)Ljava/util/Iterator;,
 instantiatedMethodType=(Ljava/lang/String;)Ljava/util/Iterator;, 
numCaptured=4])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$200/617304614,
 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$200/617304614@2fac0bf2)
        - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, 
name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
        - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, 
<function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
        ... 24 more
   20/12/10 17:29:22 INFO deltastreamer.HoodieDeltaStreamer: Delta Sync 
shutdown. Error ?true
   20/12/10 17:29:22 WARN deltastreamer.HoodieDeltaStreamer: Gracefully 
shutting down compactor```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to