PavelPetukhov opened a new issue #2321:
URL: https://github.com/apache/hudi/issues/2321
**Describe the problem you faced**
Started spark submit job works fine and as expected (meaning that parquet
files with data are stored and etc) for two minutes, but then it crashes (stack
trace attached). And it happens on new incoming Kafka message
```
spark-submit
--conf spark.eventLog.overwrite=true
--conf spark.rdd.compress=true
--conf "spark.eventLog.enabled=true"
--conf
"spark.eventLog.dir=hdfs://hadoop-01-dwh.dclub.cloud.devmail.ru:9000/eventLogging"
--packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4
--master yarn
--deploy-mode cluster
--num-executors 2
--executor-cores 3
--conf spark.task.maxFailures=10
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf spark.yarn.executor.memoryOverhead=5G
--conf spark.memory.fraction=0.4
--conf spark.rdd.compress=true
--conf spark.kryoserializer.buffer.max=200m
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.reducer.maxReqsInFlight=1
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
hudi-utilities-bundle_2.11-0.6.0.jar
--table-type MERGE_ON_READ
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field ts_ms
--target-base-path /user/hdfs/ml_training_data.kc_ds_main.private
--target-table ml_training_data.kc_ds_main.private
--hoodie-conf hoodie.upsert.shuffle.parallelism=2
--hoodie-conf hoodie.insert.shuffle.parallelism=2
--hoodie-conf hoodie.delete.shuffle.parallelism=2
--hoodie-conf hoodie.bulkinsert.shuffle.parallelism=2
--hoodie-conf hoodie.embed.timeline.server=false
--hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
--hoodie-conf hoodie.compact.inline=false
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.partitionpath.field=
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=url
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=topic
--hoodie-conf bootstrap.servers=kafka
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf group.id=hudi_group
--hoodie-conf group.instance.id=
--hoodie-conf allow.auto.create.topics=true
--hoodie-conf client.id=consumer-console-consumer-1
--hoodie-conf schema.registry.url=schema_registry
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--continuous
```
Stacktrace is below, but worth noting that in the logs I see that something
similar happened few times without errors:
```
20/12/10 17:28:39 INFO collection.RocksDBDAO: Prefix Search for (query=) on
hudi_pending_compaction__user_hdfs_[my_topic_name]. Total Time Taken (msec)=3.
Serialization Time taken(micro)=0, num entries=0
20/12/10 17:28:39 INFO clean.CleanPlanner: No earliest commit to retain. No
need to scan partitions !!
20/12/10 17:28:39 INFO clean.CleanActionExecutor: Nothing to clean here. It
is already clean
20/12/10 17:28:40 INFO client.AbstractHoodieWriteClient: Committed
20201210172828
...
```
**Expected behavior**
not to crash
**Environment Description**
* Hudi version : 0.6.0
* Spark version : 2.4.4
* Hive version : -
* Hadoop version : 2.8.4
* Storage (HDFS/S3/GCS..) : hdfs
* Running on Docker? (yes/no) : no
**Stacktrace**
```20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Initializing
pending compaction operations. Count=0
20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Initializing
external data file mapping. Count=0
20/12/10 17:29:22 INFO view.RocksDbBasedFileSystemView: Created ROCKSDB
based file-system view at /tmp/hoodie_timeline_rocksdb
20/12/10 17:29:22 INFO collection.RocksDBDAO: Prefix Search for (query=) on
hudi_pending_compaction__user_hdfs_[my_topic_name]. Total Time Taken (msec)=0.
Serialization Time taken(micro)=0, num entries=0
20/12/10 17:29:22 INFO compact.HoodieMergeOnReadTableCompactor: Compacting
/user/hdfs/[my_topic_name] with commit 20201210172922
20/12/10 17:29:22 INFO compact.HoodieMergeOnReadTableCompactor: Compaction
looking for files to compact in [default] partitions
20/12/10 17:29:22 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down
delta-sync due to exception
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:380)
at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:379)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:379)
at
org.apache.spark.api.java.JavaRDDLike$class.flatMap(JavaRDDLike.scala:126)
at
org.apache.spark.api.java.AbstractJavaRDDLike.flatMap(JavaRDDLike.scala:45)
at
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.generateCompactionPlan(HoodieMergeOnReadTableCompactor.java:198)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:78)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:107)
at
org.apache.hudi.table.HoodieMergeOnReadTable.scheduleCompaction(HoodieMergeOnReadTable.java:123)
at
org.apache.hudi.client.HoodieWriteClient.scheduleCompactionAtInstant(HoodieWriteClient.java:629)
at
org.apache.hudi.client.HoodieWriteClient.scheduleCompaction(HoodieWriteClient.java:617)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:421)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:244)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException:
org.apache.hudi.common.util.collection.RocksDBDAO
Serialization stack:
- object not serializable (class:
org.apache.hudi.common.util.collection.RocksDBDAO, value:
org.apache.hudi.common.util.collection.RocksDBDAO@7f73bc0c)
- field (class:
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB,
type: class org.apache.hudi.common.util.collection.RocksDBDAO)
- object (class
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView,
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@6225959a)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 4)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor,
functionalInterfaceMethod=org/apache/spark/api/java/function/FlatMapFunction.call:(Ljava/lang/Object;)Ljava/util/Iterator;,
implementation=invokeSpecial
org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.lambda$generateCompactionPlan$6303544c$1:(Lorg/apache/hudi/common/table/view/TableFileSystemView$SliceView;Ljava/util/Set;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/String;)Ljava/util/Iterator;,
instantiatedMethodType=(Ljava/lang/String;)Ljava/util/Iterator;,
numCaptured=4])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$200/617304614,
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor$$Lambda$200/617304614@2fac0bf2)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1,
name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 24 more
20/12/10 17:29:22 INFO deltastreamer.HoodieDeltaStreamer: Delta Sync
shutdown. Error ?true
20/12/10 17:29:22 WARN deltastreamer.HoodieDeltaStreamer: Gracefully
shutting down compactor```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]