ehurheap opened a new issue, #9807:
URL: https://github.com/apache/hudi/issues/9807

   **Describe the problem you faced**
   
   We are trying to delete records from a MOR table with the following write 
configs for locking and concurrency and are running into 
`java.io.NotSerializableException: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider`
   
   write concurrency/locking configs:
   
   ```
   (hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL)
   
(hoodie.write.lock.conflict.resolution.strategy,org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy)
   (hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000)
   (hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local)
   (hoodie.write.lock.dynamodb.region,us-east-1)
   (hoodie.write.lock.dynamodb.table,datalake-locks)
   
(hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider)
   
   ```
   (Note the config above is taken from an integration test where we specify a 
local dynamoDB but the same error happens in production dynamoDB).
   
   delete code snippet:
   
   ```
   val writeClient: SparkRDDWriteClient[_] = buildWriteClient(config, 
spark.sparkContext)
       try {
         val deleteInstant = writeClient.startCommit()
         writeClient.delete(recordKeys, deleteInstant)
   ...
   
   ```
   
   **To Reproduce**
   
   1. Build an RDD of recordKeys to delete
   2. Build a SparkRDDWriteClient with concurrency and locking configs as 
specified above
   3. execute `writeClient.delete(recordKeys, deleteInstant)`
   4. observe `java.io.NotSerializableException: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider`
   
   **Expected behavior**
   delete is successful 
   
   **Environment Description**
   
   Hudi version : 0.13.0
   Spark version : 3.3.0
   Hive version : n/a
   Hadoop version : 3.2.1
   Storage (HDFS/S3/GCS..) : s3
   Running on Docker? (yes/no) : no
   
   **Additional context**
   
   We are using the writeclient to delete an RDD of recordKeys because the 
records keys were generated using UUID keys, which cannot be deleted using the 
dataframe API. This was surfaced in [this 
issue](https://github.com/apache/hudi/issues/9079). It seems we did not test 
with full concurrency/locking configured at that time.
   
   **Stacktrace**
   
   ```
   org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit 
time 20230927230655498
   org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit 
time 20230927230655498
           at 
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117)
           at 
org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45)
           at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105)
           at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80)
           at 
org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243)
           at 
com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:82)
           at 
com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:210)
           at 
com.heap.datalake.delete.UserPropertiesDeleter$.deleteUsers(UserPropertiesDeleter.scala:48)
           at 
com.heap.datalake.delete.UserPropertiesDeleter$.run(UserPropertiesDeleter.scala:23)
           at 
com.heap.datalake.delete.DeleteUsersApp$.$anonfun$run$2(DeleteUsersApp.scala:56)
           at scala.util.Try$.apply(Try.scala:213)
           at 
com.heap.datalake.delete.DeleteUsersApp$.run(DeleteUsersApp.scala:56)
           at 
com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2(DeleteUsersApp.scala:38)
           at 
com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2$adapted(DeleteUsersApp.scala:23)
           at scala.util.Success.foreach(Try.scala:253)
           at 
com.heap.datalake.delete.DeleteUsersApp$.main(DeleteUsersApp.scala:23)
           at com.heap.datalake.delete.DeleteUsersApp.main(DeleteUsersApp.scala)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742)
   Caused by: org.apache.spark.SparkException: Task not serializable
           at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
           at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
           at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
           at org.apache.spark.SparkContext.clean(SparkContext.scala:2493)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
           at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
           at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
           at 
org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
           at 
org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
           at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
           at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:163)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:283)
           at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:186)
           at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:174)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:273)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:178)
           at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83)
           at 
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103)
           ... 21 more
   Caused by: java.io.NotSerializableException: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
   Serialization stack:
           - object not serializable (class: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider, value: 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider@697af25)
           - field (class: org.apache.hudi.client.transaction.lock.LockManager, 
name: lockProvider, type: interface org.apache.hudi.common.lock.LockProvider)
           - object (class org.apache.hudi.client.transaction.lock.LockManager, 
org.apache.hudi.client.transaction.lock.LockManager@a109d65)
           - field (class: 
org.apache.hudi.client.transaction.TransactionManager, name: lockManager, type: 
class org.apache.hudi.client.transaction.lock.LockManager)
           - object (class 
org.apache.hudi.client.transaction.TransactionManager, 
org.apache.hudi.client.transaction.TransactionManager@157ce6e9)
           - field (class: 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor, name: txnManager, 
type: class org.apache.hudi.client.transaction.TransactionManager)
           - object (class 
org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor, 
org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor@d63f085)
           - element of array (index: 0)
           - array (class [Ljava.lang.Object;, size 2)
           - field (class: java.lang.invoke.SerializedLambda, name: 
capturedArgs, type: class [Ljava.lang.Object;)
           - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor, 
functionalInterfaceMethod=org/apache/spark/api/java/functi
   on/Function2.call:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, 
implementation=invokeSpecial 
org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1:(Lorg/apache/sp
   
ark/Partitioner;Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, 
instantiatedMethodType=(Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;,
 numCaptured=2])
           - writeReplace data (class: java.lang.invoke.SerializedLambda)
           - object (class 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125,
 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125@467af2d8)
           - element of array (index: 0)
           - array (class [Ljava.lang.Object;, size 1)
           - field (class: java.lang.invoke.SerializedLambda, name: 
capturedArgs, type: class [Ljava.lang.Object;)
           - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=interface 
org.apache.spark.api.java.JavaRDDLike, 
functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Obje
   ct;)Ljava/lang/Object;, implementation=invokeStatic 
org/apache/spark/api/java/JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted:(Lorg/apache/spark/api/java/function/Function2;Ljava/lang/Object;Lscala/collection/Iterato
   r;)Lscala/collection/Iterator;, 
instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
 numCaptured=1])
           - writeReplace data (class: java.lang.invoke.SerializedLambda)
           - object (class 
org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748, 
org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748@1c7adacb)
           - element of array (index: 0)
           - array (class [Ljava.lang.Object;, size 1)
           - field (class: java.lang.invoke.SerializedLambda, name: 
capturedArgs, type: class [Ljava.lang.Object;)
           - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Obj
   ect;)Ljava/lang/Object;, implementation=invokeStatic 
org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndex$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/c
   ollection/Iterator;, 
instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
 numCaptured=1])
           - writeReplace data (class: java.lang.invoke.SerializedLambda)
           - object (class org.apache.spark.rdd.RDD$$Lambda$4810/1052938329, 
org.apache.spark.rdd.RDD$$Lambda$4810/1052938329@3c31aa92)
           - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, 
type: interface scala.Function3)
           - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[146] at mapPartitionsWithIndex at 
BaseSparkCommitActionExecutor.java:249)
           - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: 
class org.apache.spark.rdd.RDD)
           - object (class org.apache.spark.OneToOneDependency, 
org.apache.spark.OneToOneDependency@373e82e8)
           -         - writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
           - object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@44652f9f)
           - writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
           - object (class scala.collection.immutable.$colon$colon, 
List(org.apache.spark.OneToOneDependency@373e82e8))
           - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: 
interface scala.collection.Seq)
           - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[147] at flatMap at BaseSparkCommitActionExecutor.java:255)
           - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: 
class org.apache.spark.rdd.RDD)
           - object (class org.apache.spark.OneToOneDependency, 
org.apache.spark.OneToOneDependency@66f562d4)
           - writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
           - object (class scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@d993b51)
           - writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
           - object (class scala.collection.immutable.$colon$colon, 
List(org.apache.spark.OneToOneDependency@66f562d4))
           - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: 
interface scala.collection.Seq)
           - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[148] at map at HoodieJavaRDD.java:111)
           - element of array (index: 0)
           - array (class [Ljava.lang.Object;, size 1)
           - field (class: java.lang.invoke.SerializedLambda, name: 
capturedArgs, type: class [Ljava.lang.Object;)
           - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementat
   ion=invokeStatic 
org/apache/spark/rdd/RDD.$anonfun$collect$2:(Lorg/apache/spark/rdd/RDD;Lscala/collection/Iterator;)Ljava/lang/Object;,
 instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, 
numCaptured=1]
   )
           - writeReplace data (class: java.lang.invoke.SerializedLambda)
           - object (class org.apache.spark.rdd.RDD$$Lambda$3667/1073189054, 
org.apache.spark.rdd.RDD$$Lambda$3667/1073189054@770c17a4)
           at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
           at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
           at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
           at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
           ... 42 more
   23/09/27 23:07:54 INFO SparkContext: Invoking stop() from shutdown hook
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to