ehurheap opened a new issue, #9807: URL: https://github.com/apache/hudi/issues/9807
**Describe the problem you faced** We are trying to delete records from a MOR table with the following write configs for locking and concurrency and are running into `java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider` write concurrency/locking configs: ``` (hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL) (hoodie.write.lock.conflict.resolution.strategy,org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy) (hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000) (hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local) (hoodie.write.lock.dynamodb.region,us-east-1) (hoodie.write.lock.dynamodb.table,datalake-locks) (hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider) ``` (Note the config above is taken from an integration test where we specify a local dynamoDB but the same error happens in production dynamoDB). delete code snippet: ``` val writeClient: SparkRDDWriteClient[_] = buildWriteClient(config, spark.sparkContext) try { val deleteInstant = writeClient.startCommit() writeClient.delete(recordKeys, deleteInstant) ... ``` **To Reproduce** 1. Build an RDD of recordKeys to delete 2. Build a SparkRDDWriteClient with concurrency and locking configs as specified above 3. execute `writeClient.delete(recordKeys, deleteInstant)` 4. observe `java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider` **Expected behavior** delete is successful **Environment Description** Hudi version : 0.13.0 Spark version : 3.3.0 Hive version : n/a Hadoop version : 3.2.1 Storage (HDFS/S3/GCS..) : s3 Running on Docker? (yes/no) : no **Additional context** We are using the writeclient to delete an RDD of recordKeys because the records keys were generated using UUID keys, which cannot be deleted using the dataframe API. This was surfaced in [this issue](https://github.com/apache/hudi/issues/9079). It seems we did not test with full concurrency/locking configured at that time. **Stacktrace** ``` org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498 org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498 at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117) at org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80) at org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243) at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:82) at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:210) at com.heap.datalake.delete.UserPropertiesDeleter$.deleteUsers(UserPropertiesDeleter.scala:48) at com.heap.datalake.delete.UserPropertiesDeleter$.run(UserPropertiesDeleter.scala:23) at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$run$2(DeleteUsersApp.scala:56) at scala.util.Try$.apply(Try.scala:213) at com.heap.datalake.delete.DeleteUsersApp$.run(DeleteUsersApp.scala:56) at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2(DeleteUsersApp.scala:38) at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2$adapted(DeleteUsersApp.scala:23) at scala.util.Success.foreach(Try.scala:253) at com.heap.datalake.delete.DeleteUsersApp$.main(DeleteUsersApp.scala:23) at com.heap.datalake.delete.DeleteUsersApp.main(DeleteUsersApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163) at org.apache.spark.SparkContext.clean(SparkContext.scala:2493) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:163) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:283) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:186) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:174) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:273) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:178) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83) at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103) ... 21 more Caused by: java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider Serialization stack: - object not serializable (class: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider, value: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider@697af25) - field (class: org.apache.hudi.client.transaction.lock.LockManager, name: lockProvider, type: interface org.apache.hudi.common.lock.LockProvider) - object (class org.apache.hudi.client.transaction.lock.LockManager, org.apache.hudi.client.transaction.lock.LockManager@a109d65) - field (class: org.apache.hudi.client.transaction.TransactionManager, name: lockManager, type: class org.apache.hudi.client.transaction.lock.LockManager) - object (class org.apache.hudi.client.transaction.TransactionManager, org.apache.hudi.client.transaction.TransactionManager@157ce6e9) - field (class: org.apache.hudi.table.action.commit.BaseCommitActionExecutor, name: txnManager, type: class org.apache.hudi.client.transaction.TransactionManager) - object (class org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor, org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor@d63f085) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 2) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor, functionalInterfaceMethod=org/apache/spark/api/java/functi on/Function2.call:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1:(Lorg/apache/sp ark/Partitioner;Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, instantiatedMethodType=(Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, numCaptured=2]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125, org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125@467af2d8) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Obje ct;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted:(Lorg/apache/spark/api/java/function/Function2;Ljava/lang/Object;Lscala/collection/Iterato r;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748, org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748@1c7adacb) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Obj ect;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndex$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/c ollection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.rdd.RDD$$Lambda$4810/1052938329, org.apache.spark.rdd.RDD$$Lambda$4810/1052938329@3c31aa92) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[146] at mapPartitionsWithIndex at BaseSparkCommitActionExecutor.java:249) - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@373e82e8) - - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@44652f9f) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@373e82e8)) - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[147] at flatMap at BaseSparkCommitActionExecutor.java:255) - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@66f562d4) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@d993b51) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@66f562d4)) - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[148] at map at HoodieJavaRDD.java:111) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementat ion=invokeStatic org/apache/spark/rdd/RDD.$anonfun$collect$2:(Lorg/apache/spark/rdd/RDD;Lscala/collection/Iterator;)Ljava/lang/Object;, instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, numCaptured=1] ) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.rdd.RDD$$Lambda$3667/1073189054, org.apache.spark.rdd.RDD$$Lambda$3667/1073189054@770c17a4) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441) ... 42 more 23/09/27 23:07:54 INFO SparkContext: Invoking stop() from shutdown hook ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
