bradleyhurley opened a new issue #2146:
URL: https://github.com/apache/hudi/issues/2146
**Describe the problem you faced**
When attempting to run the DeltaStreamer in BULK_INSERT mode we are
experiencing a ` java.io.NotSerializableException:
org.apache.hudi.common.util.RocksDBDAO` exception.
**Spark Submit**
```
spark-submit
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
--jars s3://data-lake-adp-qa/hudi/libraries/hudi-ext-1.0-SNAPSHOT.jar
--master yarn
--deploy-mode client
--num-executors 200
--executor-cores 3
--executor-memory 20G
--driver-memory 6g
--op BULK_INSERT
--filter-dupes
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
--conf spark.yarn.executor.memoryOverhead=5G
--conf spark.task.maxFailures=10
--conf spark.memory.fraction=0.4
--conf spark.rdd.compress=true
--conf spark.kryoserializer.buffer.max=200m
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.dynamicAllocation.enabled=True
--conf spark.reducer.maxReqsInFlight=1
--conf spark.shuffle.io.retryWait=60s
--conf spark.shuffle.io.maxRetries=10
--conf spark.port.maxRetries=100
/usr/lib/hudi/hudi-utilities-bundle.jar
--table-type COPY_ON_WRITE
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field {{ ordering_field }}
--target-base-path {{ s3_path }}
--target-table {{ table_name }}
--props s3://data-lake-adp-qa//hudi/config/hudi.properties
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--hoodie-conf hoodie.embed.timeline.server=true
--hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
--hoodie-conf hoodie.compact.inline=false
--hoodie-conf hoodie.datasource.write.recordkey.field={{ record_key }}
--hoodie-conf hoodie.datasource.write.partitionpath.field={{ partition_path
}}
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url= {{
table_schema_url }}
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.targetUrl={{
table_schema_url }}
--hoodie-conf schema.registry.url={{ schema_registry_url }}
--hoodie-conf hoodie.deltastreamer.source.kafka.topic= {{ kafka_topic }}
--hoodie-conf group.id={{ group_id }}
--hoodie-conf enable.auto.commit=false
--hoodie-conf bootstrap.servers={{ brokers }}
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf hoodie.consistency.check.enabled=true
--hoodie-conf security.protocol=SSL
--hoodie-conf ssl.keystore.location=/usr/lib/jvm/jre/lib/security/cacerts
--hoodie-conf ssl.keystore.password={{ cert password }}
--hoodie-conf hoodie.deltastreamer.kafka.source.maxEvents=10000000
--hoodie-conf hoodie.insert.shuffle.parallelism=600
--hoodie-conf hoodie.upsert.shuffle.parallelism=600
--enable-hive-sync
--hoodie-conf hoodie.datasource.hive_sync.table={{ table_name }}
--hoodie-conf hoodie.datasource.hive_sync.database={{ database_name }}
--hoodie-conf
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
--hoodie-conf hoodie.bulkinsert.shuffle.parallelism=1205
--hoodie-conf
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenDecimalSupport
--hoodie-conf hoodie.datasource.hive_sync.partition_fields={{
partition_field }}
```
**To Reproduce**
Steps to reproduce the behavior:
1. Use the provided configuration to read data out of Kafka with the op flag
set to `BULK_INSERT`
**Expected behavior**
I would expect that the DeltaStreamer job would complete successfully, and
the table would be registered with the Glue catalog and queryable in Athena.
**Environment Description**
AWS EMR 5.30. Spark Jobs are submitted via the EMR Step API.
Hudi version : 0.5.2-inc
Spark version : 2.4.5
Hive version : 2.3.6
Hadoop version : 2.8.5
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : No
**Additional context**
When running the same configuration, but without providing the `op` flag set
to `BULK_INSERT` allowing the default behavior of `UPSERT` run the data is
processed without issue.
The `SimpleKeyGenDecimalSupport` is a very slightly modified version of the
`SimpleKeyGen` class enhanced to support using decimal values for the record
key and partition path values.
I am still able to manually register the table by running the
`/usr/lib/hudi/bin/run_sync_tool.sh`tool.
```
/usr/lib/hudi/bin/run_sync_tool.sh --jdbc-url jdbc:hive2://localhost:10000 \
--user hive \
--pass {{ hive_password }} \
--base-path {{ s3_path}} \
--database {{ database_name }} \
--table {{ table_name }} \
--partitioned-by {{ partition_path }} \
--partition-value-extractor
org.apache.hudi.hive.MultiPartKeysValueExtractor
```
**Stacktrace**
```
ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2327)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.map(RDD.scala:392)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
at
org.apache.hudi.table.HoodieCopyOnWriteTable.scheduleClean(HoodieCopyOnWriteTable.java:304)
at
org.apache.hudi.client.HoodieCleanClient.scheduleClean(HoodieCleanClient.java:114)
at
org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:91)
at
org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:835)
at
org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:512)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.hudi.common.util.RocksDBDAO
Serialization stack:
- object not serializable (class:
org.apache.hudi.common.util.RocksDBDAO, value:
org.apache.hudi.common.util.RocksDBDAO@57e9cd2)
- field (class:
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB,
type: class org.apache.hudi.common.util.RocksDBDAO)
- object (class
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView,
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@306a9cd8)
- field (class: org.apache.hudi.table.CleanHelper, name:
fileSystemView, type: interface
org.apache.hudi.common.table.SyncableFileSystemView)
- object (class org.apache.hudi.table.CleanHelper,
org.apache.hudi.table.CleanHelper@651a399)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.hudi.table.HoodieCopyOnWriteTable,
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/hudi/table/HoodieCopyOnWriteTable.lambda$scheduleClean$d4c53b8f$1:(Lorg/apache/hudi/table/CleanHelper;Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;,
instantiatedMethodType=(Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class
org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736,
org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736@46e32574)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1,
type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]