bradleyhurley opened a new issue #2146:
URL: https://github.com/apache/hudi/issues/2146


   **Describe the problem you faced**
   
   When attempting to run the DeltaStreamer in BULK_INSERT mode we are 
experiencing a ` java.io.NotSerializableException: 
org.apache.hudi.common.util.RocksDBDAO` exception.
   
   **Spark Submit**
   ```
   spark-submit
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
   --jars s3://data-lake-adp-qa/hudi/libraries/hudi-ext-1.0-SNAPSHOT.jar
   --master yarn
   --deploy-mode client
   --num-executors 200
   --executor-cores 3
   --executor-memory 20G
   --driver-memory 6g
   --op BULK_INSERT
   --filter-dupes
   --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
   --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
   --conf spark.yarn.executor.memoryOverhead=5G
   --conf spark.task.maxFailures=10
   --conf spark.memory.fraction=0.4
   --conf spark.rdd.compress=true
   --conf spark.kryoserializer.buffer.max=200m
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
   --conf spark.dynamicAllocation.enabled=True
   --conf spark.reducer.maxReqsInFlight=1
   --conf spark.shuffle.io.retryWait=60s
   --conf spark.shuffle.io.maxRetries=10
   --conf spark.port.maxRetries=100 
   /usr/lib/hudi/hudi-utilities-bundle.jar
   --table-type COPY_ON_WRITE
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource
   --source-ordering-field {{ ordering_field }}
   --target-base-path {{ s3_path }}
   --target-table {{ table_name }}
   --props s3://data-lake-adp-qa//hudi/config/hudi.properties
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --hoodie-conf hoodie.embed.timeline.server=true
   --hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
   --hoodie-conf hoodie.compact.inline=false
   --hoodie-conf hoodie.datasource.write.recordkey.field={{ record_key }}
   --hoodie-conf hoodie.datasource.write.partitionpath.field={{ partition_path 
}}
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url= {{ 
table_schema_url }}
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.targetUrl={{ 
table_schema_url }}
   --hoodie-conf schema.registry.url={{ schema_registry_url }}
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic= {{ kafka_topic }}
   --hoodie-conf group.id={{ group_id }}
   --hoodie-conf enable.auto.commit=false
   --hoodie-conf bootstrap.servers={{ brokers }}
   --hoodie-conf auto.offset.reset=earliest
   --hoodie-conf hoodie.consistency.check.enabled=true
   --hoodie-conf security.protocol=SSL
   --hoodie-conf ssl.keystore.location=/usr/lib/jvm/jre/lib/security/cacerts
   --hoodie-conf ssl.keystore.password={{ cert password }}
   --hoodie-conf hoodie.deltastreamer.kafka.source.maxEvents=10000000
   --hoodie-conf hoodie.insert.shuffle.parallelism=600
   --hoodie-conf hoodie.upsert.shuffle.parallelism=600
   --enable-hive-sync
   --hoodie-conf hoodie.datasource.hive_sync.table={{ table_name }}
   --hoodie-conf hoodie.datasource.hive_sync.database={{ database_name }}
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=1205
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenDecimalSupport
   --hoodie-conf hoodie.datasource.hive_sync.partition_fields={{ 
partition_field }}
   ```
   
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use the provided configuration to read data out of Kafka with the op flag 
set to `BULK_INSERT`
   
   **Expected behavior**
   
   I would expect that the DeltaStreamer job would complete successfully, and 
the table would be registered with the Glue catalog and queryable in Athena.
   
   **Environment Description**
   
   AWS EMR 5.30. Spark Jobs are submitted via the EMR Step API.
   
   Hudi version : 0.5.2-inc
   
   Spark version : 2.4.5
   
   Hive version : 2.3.6
   
   Hadoop version : 2.8.5
   
   Storage (HDFS/S3/GCS..) : S3
   
   Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   When running the same configuration, but without providing the `op` flag set 
to `BULK_INSERT` allowing the default behavior of `UPSERT` run the data is 
processed without issue.
   
   The `SimpleKeyGenDecimalSupport` is a very slightly modified version of the 
`SimpleKeyGen` class enhanced to support using decimal values for the record 
key and partition path values.
   
   I am still able to manually register the table by running the 
`/usr/lib/hudi/bin/run_sync_tool.sh`tool.
   
   ```
   /usr/lib/hudi/bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://localhost:10000 \
     --user hive \
     --pass {{ hive_password }} \
     --base-path  {{ s3_path}}  \
     --database {{ database_name }}  \
     --table {{ table_name }} \
     --partitioned-by {{ partition_path }} \
     --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor
   ```
   
   **Stacktrace**
   
   ```
   ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down
   org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2327)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.map(RDD.scala:392)
        at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.scheduleClean(HoodieCopyOnWriteTable.java:304)
        at 
org.apache.hudi.client.HoodieCleanClient.scheduleClean(HoodieCleanClient.java:114)
        at 
org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:91)
        at 
org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:835)
        at 
org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:512)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.io.NotSerializableException: 
org.apache.hudi.common.util.RocksDBDAO
   Serialization stack:
        - object not serializable (class: 
org.apache.hudi.common.util.RocksDBDAO, value: 
org.apache.hudi.common.util.RocksDBDAO@57e9cd2)
        - field (class: 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB, 
type: class org.apache.hudi.common.util.RocksDBDAO)
        - object (class 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, 
org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@306a9cd8)
        - field (class: org.apache.hudi.table.CleanHelper, name: 
fileSystemView, type: interface 
org.apache.hudi.common.table.SyncableFileSystemView)
        - object (class org.apache.hudi.table.CleanHelper, 
org.apache.hudi.table.CleanHelper@651a399)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
org.apache.hudi.table.HoodieCopyOnWriteTable, 
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
org/apache/hudi/table/HoodieCopyOnWriteTable.lambda$scheduleClean$d4c53b8f$1:(Lorg/apache/hudi/table/CleanHelper;Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;,
 
instantiatedMethodType=(Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;,
 numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class 
org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736, 
org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736@46e32574)
        - field (class: 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, 
type: interface org.apache.spark.api.java.function.Function)
        - object (class 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to