soumilshah1995 opened a new issue, #11378: URL: https://github.com/apache/hudi/issues/11378
Hello, We have been experimenting with a multi-writer setup and have confirmed that it works perfectly with two writers. The image below shows our sample setup:  To further enhance our setup, we wanted to test running the cleaner in parallel asynchronously. The first run of the cleaner was successful, but subsequent runs have been failing. In our setup, we have two jobs: u1 and u2. u1 touches partitions in NY. u2 touches partitions in CA. Both jobs have the following common configurations: ``` # "hoodie.clean.automatic": "false", # "hoodie.clean.async": "true", "hoodie.write.concurrency.mode": "optimistic_concurrency_control", "hoodie.cleaner.policy.failed.writes": "LAZY", "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider", ``` #### Note : Also tried # "hoodie.clean.automatic": true and false We tested the setup both with and without the following flags: ``` # "hoodie.clean.automatic": "false", # "hoodie.clean.async": "true", ``` Here is our cleaner async configuration: ``` spark-submit \ --class 'org.apache.hudi.utilities.HoodieCleaner' \ --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0' \ --properties-file spark-config.properties \ --master 'local[*]' \ --executor-memory 3g \ /Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \ --target-base-path 'file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/' \ --hoodie-conf 'hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS' \ --hoodie-conf 'hoodie.cleaner.fileversions.retained=1' \ --hoodie-conf 'hoodie.cleaner.parallelism=100' \ --hoodie-conf 'hoodie.clean.automatic=true' \ --hoodie-conf 'hoodie.clean.async=true' \ --hoodie-conf 'hoodie.write.concurrency.mode=optimistic_concurrency_control' \ --hoodie-conf 'hoodie.cleaner.policy.failed.writes=LAZY' \ --hoodie-conf 'hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider' ``` The cleaner fails when running together with both u1 and u2 jobs. ### Logs ``` retrieving :: org.apache.spark#spark-submit-parent-bd84a89e-a4f1-4b97-a1d5-225d51043b44 confs: [default] 0 artifacts copied, 1 already retrieved (0kB/3ms) 24/06/01 10:16:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/06/01 10:16:11 INFO SparkContext: Running Spark version 3.4.0 24/06/01 10:16:11 INFO ResourceUtils: ============================================================== 24/06/01 10:16:11 INFO ResourceUtils: No custom resources configured for spark.driver. 24/06/01 10:16:11 INFO ResourceUtils: ============================================================== 24/06/01 10:16:11 INFO SparkContext: Submitted application: hoodie-cleaner-table_name=orders 24/06/01 10:16:11 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 24/06/01 10:16:11 INFO ResourceProfile: Limiting resource is cpu 24/06/01 10:16:11 INFO ResourceProfileManager: Added ResourceProfile id: 0 24/06/01 10:16:11 INFO SecurityManager: Changing view acls to: soumilshah 24/06/01 10:16:11 INFO SecurityManager: Changing modify acls to: soumilshah 24/06/01 10:16:11 INFO SecurityManager: Changing view acls groups to: 24/06/01 10:16:11 INFO SecurityManager: Changing modify acls groups to: 24/06/01 10:16:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: soumilshah; groups with view permissions: EMPTY; users with modify permissions: soumilshah; groups with modify permissions: EMPTY 24/06/01 10:16:11 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec 24/06/01 10:16:11 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress 24/06/01 10:16:11 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type 24/06/01 10:16:11 INFO Utils: Successfully started service 'sparkDriver' on port 65007. 24/06/01 10:16:11 INFO SparkEnv: Registering MapOutputTracker 24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMaster 24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 24/06/01 10:16:11 INFO DiskBlockManager: Created local directory at /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/blockmgr-2e982a52-ad15-457a-8856-7b9857ec303d 24/06/01 10:16:11 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 24/06/01 10:16:11 INFO SparkEnv: Registering OutputCommitCoordinator 24/06/01 10:16:11 INFO JettyUtils: Start Jetty 0.0.0.0:8090 for SparkUI 24/06/01 10:16:11 INFO Utils: Successfully started service 'SparkUI' on port 8090. 24/06/01 10:16:11 INFO SparkContext: Added JAR file:///Users/soumilshah/.ivy2/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar at spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar with timestamp 1717251371606 24/06/01 10:16:11 INFO SparkContext: Added JAR file:/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar at spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar with timestamp 1717251371606 24/06/01 10:16:11 INFO Executor: Starting executor ID driver on host 10.14.129.125 24/06/01 10:16:11 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): '' 24/06/01 10:16:11 INFO Executor: Fetching spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar with timestamp 1717251371606 24/06/01 10:16:11 INFO TransportClientFactory: Successfully created connection to /10.14.129.125:65007 after 15 ms (0 ms spent in bootstraps) 24/06/01 10:16:11 INFO Utils: Fetching spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar to /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp10727026823756083903.tmp 24/06/01 10:16:12 INFO Executor: Adding file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/hudi-utilities-slim-bundle_2.12-0.14.0.jar to class loader 24/06/01 10:16:12 INFO Executor: Fetching spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar with timestamp 1717251371606 24/06/01 10:16:12 INFO Utils: Fetching spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar to /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp13175198092609918093.tmp 24/06/01 10:16:12 INFO Executor: Adding file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar to class loader 24/06/01 10:16:12 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65010. 24/06/01 10:16:12 INFO NettyBlockTransferService: Server created on 10.14.129.125:65010 24/06/01 10:16:12 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/06/01 10:16:12 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.14.129.125, 65010, None) 24/06/01 10:16:12 INFO BlockManagerMasterEndpoint: Registering block manager 10.14.129.125:65010 with 434.4 MiB RAM, BlockManagerId(driver, 10.14.129.125, 65010, None) 24/06/01 10:16:12 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.14.129.125, 65010, None) 24/06/01 10:16:12 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.14.129.125, 65010, None) 24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 24/06/01 10:16:12 INFO HoodieCleaner: Creating Cleaner with configs : {hoodie.cleaner.fileversions.retained=1, hoodie.clean.automatic=true, hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider, hoodie.cleaner.policy.failed.writes=LAZY, hoodie.write.concurrency.mode=optimistic_concurrency_control, hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS, hoodie.clean.async=true, hoodie.cleaner.parallelism=100} 24/06/01 10:16:12 INFO HoodieWriteConfig: Automatically set hoodie.cleaner.policy.failed.writes=LAZY since optimistic concurrency control is used 24/06/01 10:16:12 INFO EmbeddedTimelineService: Starting Timeline service !! 24/06/01 10:16:12 INFO EmbeddedTimelineService: Overriding hostIp to (10.14.129.125) found in spark-conf. It was null 24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :MEMORY 24/06/01 10:16:12 INFO FileSystemViewManager: Creating in-memory based Table View 24/06/01 10:16:12 INFO log: Logging initialized @2012ms to org.apache.hudi.org.eclipse.jetty.util.log.Slf4jLog 24/06/01 10:16:12 INFO Javalin: __ __ _ __ __ / /____ _ _ __ ____ _ / /(_)____ / // / __ / // __ `/| | / // __ `// // // __ \ / // /_ / /_/ // /_/ / | |/ // /_/ // // // / / / /__ __/ \____/ \__,_/ |___/ \__,_//_//_//_/ /_/ /_/ https://javalin.io/documentation 24/06/01 10:16:12 INFO Javalin: Starting Javalin ... 24/06/01 10:16:12 INFO Javalin: You are running Javalin 4.6.7 (released October 24, 2022. Your Javalin version is 586 days old. Consider checking for a newer version.). 24/06/01 10:16:12 INFO Server: jetty-9.4.48.v20220622; built: 2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm 11.0.22+0 24/06/01 10:16:12 INFO Server: Started @2143ms 24/06/01 10:16:12 INFO Javalin: Listening on http://localhost:65011/ 24/06/01 10:16:12 INFO Javalin: Javalin started in 68ms \o/ 24/06/01 10:16:12 INFO TimelineService: Starting Timeline server on port :65011 24/06/01 10:16:12 INFO EmbeddedTimelineService: Started embedded timeline server at 10.14.129.125:65011 24/06/01 10:16:12 INFO BaseHoodieClient: Timeline Server already running. Not restarting the service 24/06/01 10:16:12 INFO CleanerUtils: Cleaned failed attempts if any 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]} 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]} 24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 1 ms to read 0 instants, 0 replaced file groups 24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations 24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST 24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view 24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in internal map, falling back to reading from DFS 24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in internal map, falling back to reading from DFS 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]} 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]} 24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read 0 instants, 0 replaced file groups 24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations 24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST 24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view 24/06/01 10:16:12 INFO BaseHoodieWriteClient: Cleaner started 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]} 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ 24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties 24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata 24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]} 24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read 0 instants, 0 replaced file groups 24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations 24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST 24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view 24/06/01 10:16:12 INFO BaseHoodieWriteClient: Scheduling cleaning at instant time :20240601101612647 24/06/01 10:16:12 INFO BaseHoodieClient: Stopping Timeline service !! 24/06/01 10:16:12 INFO EmbeddedTimelineService: Closing Timeline server 24/06/01 10:16:12 INFO TimelineService: Closing Timeline Service 24/06/01 10:16:12 INFO Javalin: Stopping Javalin ... 24/06/01 10:16:12 INFO Javalin: Javalin has stopped 24/06/01 10:16:12 INFO TimelineService: Closed Timeline Service 24/06/01 10:16:12 INFO EmbeddedTimelineService: Closed Timeline server 24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed 24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed 24/06/01 10:16:12 ERROR HoodieCleaner: Failed to run cleaning for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/ java.lang.ExceptionInInitializerError at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:95) at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:75) at org.apache.avro.specific.SpecificData.lambda$getClass$2(SpecificData.java:257) at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42) at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:488) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:355) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248) at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209) at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata(TimelineMetadataUtils.java:173) at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.getCommitsSinceLastCleaning(CleanPlanActionExecutor.java:74) at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.needsCleaning(CleanPlanActionExecutor.java:89) at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:173) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCleaning(HoodieSparkCopyOnWriteTable.java:217) at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628) at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751) at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861) at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834) at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865) at org.apache.hudi.utilities.HoodieCleaner.run(HoodieCleaner.java:70) at org.apache.hudi.utilities.HoodieCleaner.main(HoodieCleaner.java:114) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.IllegalStateException: Recursive update at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1760) at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42) at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255) at org.apache.avro.specific.SpecificData.getForSchema(SpecificData.java:162) at org.apache.avro.specific.SpecificDatumWriter.<init>(SpecificDatumWriter.java:47) at org.apache.hudi.avro.model.HoodieCleanPartitionMetadata.<clinit>(HoodieCleanPartitionMetadata.java:532) ... 47 more 24/06/01 10:16:12 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/06/01 10:16:12 INFO SparkUI: Stopped Spark web UI at http://10.14.129.125:8090 24/06/01 10:16:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/06/01 10:16:12 INFO MemoryStore: MemoryStore cleared 24/06/01 10:16:12 INFO BlockManager: BlockManager stopped 24/06/01 10:16:12 INFO BlockManagerMaster: BlockManagerMaster stopped 24/06/01 10:16:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/06/01 10:16:12 INFO SparkContext: Successfully stopped SparkContext 24/06/01 10:16:12 INFO ShutdownHookManager: Shutdown hook called 24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-9d9ed177-add3-4aea-bd51-af54389afe1e 24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8 (base) soumilshah@Soumils-MacBook-Pro conflictdetection % ``` #### U1.py ``` try: import os import sys import uuid import pyspark import datetime from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from faker import Faker import datetime from datetime import datetime import random import pandas as pd from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType from pyspark.sql.functions import col print("Imports loaded ") except Exception as e: print("error", e) HUDI_VERSION = '0.14.0' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() schema = StructType([ StructField("orderID", StringType(), True), StructField("productSKU", StringType(), True), StructField("customerID", StringType(), True), StructField("orderDate", StringType(), True), StructField("orderAmount", FloatType(), True), StructField("state", StringType(), True) ]) def write_to_hudi(spark_df, table_name, db_name, method='upsert', table_type='COPY_ON_WRITE', recordkey='', precombine='', partition_fields='', index_type='BLOOM' ): path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}" hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.table.type': table_type, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': method, 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.precombine.field': precombine, "hoodie.datasource.write.partitionpath.field": partition_fields, # "hoodie.clean.automatic":"false", # " hoodie.clean.async":"true", "hoodie.write.concurrency.mode": "optimistic_concurrency_control", "hoodie.cleaner.policy.failed.writes": "LAZY", "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider", } print(hudi_options) print("\n") print(path) print("\n") spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) # Initial data data = [ ("order1", "prod001##", "cust001", "2024-01-15", 150.00, "CA"), ("order006", "prod006##", "cust006", "2024-01-20", 350.00, "CA"), ] # Loop to update productSKU and write to Hudi for i in range(1, 100): # Number of iterations # Update productSKU updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, orderDate, orderAmount, state) for (orderID, productSKU, customerID, orderDate, orderAmount, state) in data] # Create the DataFrame with updated data df = spark.createDataFrame(updated_data, schema) # Show the DataFrame with the updated "productSKU" column df.show() # Write to Hudi write_to_hudi( spark_df=df, method="upsert", db_name="default", table_name="orders", recordkey="orderID", precombine="orderDate", partition_fields="state", index_type="RECORD_INDEX" ) import time time.sleep(3) spark.stop() ``` # u2.py ``` try: import os import sys import uuid import pyspark import datetime from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from faker import Faker import datetime from datetime import datetime import random import pandas as pd from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType from pyspark.sql.functions import col print("Imports loaded ") except Exception as e: print("error", e) HUDI_VERSION = '0.14.0' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() schema = StructType([ StructField("orderID", StringType(), True), StructField("productSKU", StringType(), True), StructField("customerID", StringType(), True), StructField("orderDate", StringType(), True), StructField("orderAmount", FloatType(), True), StructField("state", StringType(), True) ]) def write_to_hudi(spark_df, table_name, db_name, method='upsert', table_type='COPY_ON_WRITE', recordkey='', precombine='', partition_fields='', index_type='BLOOM' ): path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}" hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.table.type': table_type, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': method, 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.precombine.field': precombine, "hoodie.datasource.write.partitionpath.field": partition_fields, # "hoodie.clean.automatic":"false", # " hoodie.clean.async":"true", "hoodie.write.concurrency.mode": "optimistic_concurrency_control", "hoodie.cleaner.policy.failed.writes": "LAZY", "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider", } print(hudi_options) print("\n") print(path) print("\n") spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) # Initial data data = [ ("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"), ("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY") ] # Loop to update productSKU and write to Hudi for i in range(1, 100): # Number of iterations # Update productSKU updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, orderDate, orderAmount, state) for (orderID, productSKU, customerID, orderDate, orderAmount, state) in data] # Create the DataFrame with updated data df = spark.createDataFrame(updated_data, schema) # Show the DataFrame with the updated "productSKU" column df.show() # Write to Hudi write_to_hudi( spark_df=df, method="upsert", db_name="default", table_name="orders", recordkey="orderID", precombine="orderDate", partition_fields="state", index_type="RECORD_INDEX" ) import time time.sleep(2) spark.stop() ``` Any insights or suggestions on resolving this issue would be greatly appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
