soumilshah1995 opened a new issue, #11378:
URL: https://github.com/apache/hudi/issues/11378

   Hello,
   
   We have been experimenting with a multi-writer setup and have confirmed that 
it works perfectly with two writers. The image below shows our sample setup:
   
   
![image](https://github.com/apache/hudi/assets/39345855/6b386561-a760-4d6e-8652-9a477dc3fe2d)
   
   To further enhance our setup, we wanted to test running the cleaner in 
parallel asynchronously. The first run of the cleaner was successful, but 
subsequent runs have been failing.
   
   In our setup, we have two jobs: u1 and u2.
   
   u1 touches partitions in NY.
   u2 touches partitions in CA.
   Both jobs have the following common configurations:
   
   ```
   # "hoodie.clean.automatic": "false",
   # "hoodie.clean.async": "true",
   "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
   "hoodie.cleaner.policy.failed.writes": "LAZY",
   "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
   
   ```
   
   #### Note : Also tried # "hoodie.clean.automatic": true and false 
   
   
   We tested the setup both with and without the following flags:
   
   ```
   # "hoodie.clean.automatic": "false",
   # "hoodie.clean.async": "true",
   
   ```
   
   Here is our cleaner async configuration:
   
   
   ```
   spark-submit \
       --class 'org.apache.hudi.utilities.HoodieCleaner' \
       --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0' \
       --properties-file spark-config.properties \
       --master 'local[*]' \
       --executor-memory 3g \
       
/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
 \
       --target-base-path 
'file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/'
 \
       --hoodie-conf 'hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS' \
       --hoodie-conf 'hoodie.cleaner.fileversions.retained=1' \
       --hoodie-conf 'hoodie.cleaner.parallelism=100' \
       --hoodie-conf 'hoodie.clean.automatic=true' \
       --hoodie-conf 'hoodie.clean.async=true' \
       --hoodie-conf 
'hoodie.write.concurrency.mode=optimistic_concurrency_control' \
       --hoodie-conf 'hoodie.cleaner.policy.failed.writes=LAZY' \
       --hoodie-conf 
'hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider'
   
   ```
   
   The cleaner fails when running together with both u1 and u2 jobs.
   
   ### Logs
   ```
   
   
   retrieving :: 
org.apache.spark#spark-submit-parent-bd84a89e-a4f1-4b97-a1d5-225d51043b44
        confs: [default]
        0 artifacts copied, 1 already retrieved (0kB/3ms)
   24/06/01 10:16:11 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   24/06/01 10:16:11 INFO SparkContext: Running Spark version 3.4.0
   24/06/01 10:16:11 INFO ResourceUtils: 
==============================================================
   24/06/01 10:16:11 INFO ResourceUtils: No custom resources configured for 
spark.driver.
   24/06/01 10:16:11 INFO ResourceUtils: 
==============================================================
   24/06/01 10:16:11 INFO SparkContext: Submitted application: 
hoodie-cleaner-table_name=orders
   24/06/01 10:16:11 INFO ResourceProfile: Default ResourceProfile created, 
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , 
memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: 
offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: 
cpus, amount: 1.0)
   24/06/01 10:16:11 INFO ResourceProfile: Limiting resource is cpu
   24/06/01 10:16:11 INFO ResourceProfileManager: Added ResourceProfile id: 0
   24/06/01 10:16:11 INFO SecurityManager: Changing view acls to: soumilshah
   24/06/01 10:16:11 INFO SecurityManager: Changing modify acls to: soumilshah
   24/06/01 10:16:11 INFO SecurityManager: Changing view acls groups to: 
   24/06/01 10:16:11 INFO SecurityManager: Changing modify acls groups to: 
   24/06/01 10:16:11 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: soumilshah; groups 
with view permissions: EMPTY; users with modify permissions: soumilshah; groups 
with modify permissions: EMPTY
   24/06/01 10:16:11 INFO deprecation: mapred.output.compression.codec is 
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
   24/06/01 10:16:11 INFO deprecation: mapred.output.compress is deprecated. 
Instead, use mapreduce.output.fileoutputformat.compress
   24/06/01 10:16:11 INFO deprecation: mapred.output.compression.type is 
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
   24/06/01 10:16:11 INFO Utils: Successfully started service 'sparkDriver' on 
port 65007.
   24/06/01 10:16:11 INFO SparkEnv: Registering MapOutputTracker
   24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMaster
   24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology information
   24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: 
BlockManagerMasterEndpoint up
   24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
   24/06/01 10:16:11 INFO DiskBlockManager: Created local directory at 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/blockmgr-2e982a52-ad15-457a-8856-7b9857ec303d
   24/06/01 10:16:11 INFO MemoryStore: MemoryStore started with capacity 434.4 
MiB
   24/06/01 10:16:11 INFO SparkEnv: Registering OutputCommitCoordinator
   24/06/01 10:16:11 INFO JettyUtils: Start Jetty 0.0.0.0:8090 for SparkUI
   24/06/01 10:16:11 INFO Utils: Successfully started service 'SparkUI' on port 
8090.
   24/06/01 10:16:11 INFO SparkContext: Added JAR 
file:///Users/soumilshah/.ivy2/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar
 at 
spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar
 with timestamp 1717251371606
   24/06/01 10:16:11 INFO SparkContext: Added JAR 
file:/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
 at spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar 
with timestamp 1717251371606
   24/06/01 10:16:11 INFO Executor: Starting executor ID driver on host 
10.14.129.125
   24/06/01 10:16:11 INFO Executor: Starting executor with user classpath 
(userClassPathFirst = false): ''
   24/06/01 10:16:11 INFO Executor: Fetching 
spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar 
with timestamp 1717251371606
   24/06/01 10:16:11 INFO TransportClientFactory: Successfully created 
connection to /10.14.129.125:65007 after 15 ms (0 ms spent in bootstraps)
   24/06/01 10:16:11 INFO Utils: Fetching 
spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar to 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp10727026823756083903.tmp
   24/06/01 10:16:12 INFO Executor: Adding 
file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/hudi-utilities-slim-bundle_2.12-0.14.0.jar
 to class loader
   24/06/01 10:16:12 INFO Executor: Fetching 
spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar
 with timestamp 1717251371606
   24/06/01 10:16:12 INFO Utils: Fetching 
spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar
 to 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp13175198092609918093.tmp
   24/06/01 10:16:12 INFO Executor: Adding 
file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar
 to class loader
   24/06/01 10:16:12 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 65010.
   24/06/01 10:16:12 INFO NettyBlockTransferService: Server created on 
10.14.129.125:65010
   24/06/01 10:16:12 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
   24/06/01 10:16:12 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, 10.14.129.125, 65010, None)
   24/06/01 10:16:12 INFO BlockManagerMasterEndpoint: Registering block manager 
10.14.129.125:65010 with 434.4 MiB RAM, BlockManagerId(driver, 10.14.129.125, 
65010, None)
   24/06/01 10:16:12 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, 10.14.129.125, 65010, None)
   24/06/01 10:16:12 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, 10.14.129.125, 65010, None)
   24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Cannot find 
HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
   24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Properties file 
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
   24/06/01 10:16:12 INFO HoodieCleaner: Creating Cleaner with configs : 
{hoodie.cleaner.fileversions.retained=1, hoodie.clean.automatic=true, 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider,
 hoodie.cleaner.policy.failed.writes=LAZY, 
hoodie.write.concurrency.mode=optimistic_concurrency_control, 
hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS, hoodie.clean.async=true, 
hoodie.cleaner.parallelism=100}
   24/06/01 10:16:12 INFO HoodieWriteConfig: Automatically set 
hoodie.cleaner.policy.failed.writes=LAZY since optimistic concurrency control 
is used
   24/06/01 10:16:12 INFO EmbeddedTimelineService: Starting Timeline service !!
   24/06/01 10:16:12 INFO EmbeddedTimelineService: Overriding hostIp to 
(10.14.129.125) found in spark-conf. It was null
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with 
storage type :MEMORY
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating in-memory based Table 
View
   24/06/01 10:16:12 INFO log: Logging initialized @2012ms to 
org.apache.hudi.org.eclipse.jetty.util.log.Slf4jLog
   24/06/01 10:16:12 INFO Javalin: 
          __                      __ _            __ __
         / /____ _ _   __ ____ _ / /(_)____      / // /
    __  / // __ `/| | / // __ `// // // __ \    / // /_
   / /_/ // /_/ / | |/ // /_/ // // // / / /   /__  __/
   \____/ \__,_/  |___/ \__,_//_//_//_/ /_/      /_/
   
             https://javalin.io/documentation
   
   24/06/01 10:16:12 INFO Javalin: Starting Javalin ...
   24/06/01 10:16:12 INFO Javalin: You are running Javalin 4.6.7 (released 
October 24, 2022. Your Javalin version is 586 days old. Consider checking for a 
newer version.).
   24/06/01 10:16:12 INFO Server: jetty-9.4.48.v20220622; built: 
2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm 
11.0.22+0
   24/06/01 10:16:12 INFO Server: Started @2143ms
   24/06/01 10:16:12 INFO Javalin: Listening on http://localhost:65011/
   24/06/01 10:16:12 INFO Javalin: Javalin started in 68ms \o/
   24/06/01 10:16:12 INFO TimelineService: Starting Timeline server on port 
:65011
   24/06/01 10:16:12 INFO EmbeddedTimelineService: Started embedded timeline 
server at 10.14.129.125:65011
   24/06/01 10:16:12 INFO BaseHoodieClient: Timeline Server already running. 
Not restarting the service
   24/06/01 10:16:12 INFO CleanerUtils: Cleaned failed attempts if any
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline 
for 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
   24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 1 ms to read  0 
instants, 0 replaced file groups
   24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering 
operations
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with 
storage type :REMOTE_FIRST
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table 
view
   24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in 
internal map, falling back to reading from DFS
   24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in 
internal map, falling back to reading from DFS
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline 
for 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
   24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read  0 
instants, 0 replaced file groups
   24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering 
operations
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with 
storage type :REMOTE_FIRST
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table 
view
   24/06/01 10:16:12 INFO BaseHoodieWriteClient: Cleaner started
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline 
for 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
   24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
   24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
   24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read  0 
instants, 0 replaced file groups
   24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering 
operations
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with 
storage type :REMOTE_FIRST
   24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table 
view
   24/06/01 10:16:12 INFO BaseHoodieWriteClient: Scheduling cleaning at instant 
time :20240601101612647
   24/06/01 10:16:12 INFO BaseHoodieClient: Stopping Timeline service !!
   24/06/01 10:16:12 INFO EmbeddedTimelineService: Closing Timeline server
   24/06/01 10:16:12 INFO TimelineService: Closing Timeline Service
   24/06/01 10:16:12 INFO Javalin: Stopping Javalin ...
   24/06/01 10:16:12 INFO Javalin: Javalin has stopped
   24/06/01 10:16:12 INFO TimelineService: Closed Timeline Service
   24/06/01 10:16:12 INFO EmbeddedTimelineService: Closed Timeline server
   24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed
   24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed
   24/06/01 10:16:12 ERROR HoodieCleaner: Failed to run cleaning for 
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
   java.lang.ExceptionInInitializerError
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:398)
        at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:95)
        at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:75)
        at 
org.apache.avro.specific.SpecificData.lambda$getClass$2(SpecificData.java:257)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
        at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42)
        at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255)
        at 
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:488)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
        at 
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:355)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
        at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at 
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248)
        at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
        at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata(TimelineMetadataUtils.java:173)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.getCommitsSinceLastCleaning(CleanPlanActionExecutor.java:74)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.needsCleaning(CleanPlanActionExecutor.java:89)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:173)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCleaning(HoodieSparkCopyOnWriteTable.java:217)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865)
        at org.apache.hudi.utilities.HoodieCleaner.run(HoodieCleaner.java:70)
        at org.apache.hudi.utilities.HoodieCleaner.main(HoodieCleaner.java:114)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.lang.IllegalStateException: Recursive update
        at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1760)
        at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42)
        at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255)
        at 
org.apache.avro.specific.SpecificData.getForSchema(SpecificData.java:162)
        at 
org.apache.avro.specific.SpecificDatumWriter.<init>(SpecificDatumWriter.java:47)
        at 
org.apache.hudi.avro.model.HoodieCleanPartitionMetadata.<clinit>(HoodieCleanPartitionMetadata.java:532)
        ... 47 more
   24/06/01 10:16:12 INFO SparkContext: SparkContext is stopping with exitCode 
0.
   24/06/01 10:16:12 INFO SparkUI: Stopped Spark web UI at 
http://10.14.129.125:8090
   24/06/01 10:16:12 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
   24/06/01 10:16:12 INFO MemoryStore: MemoryStore cleared
   24/06/01 10:16:12 INFO BlockManager: BlockManager stopped
   24/06/01 10:16:12 INFO BlockManagerMaster: BlockManagerMaster stopped
   24/06/01 10:16:12 INFO 
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   24/06/01 10:16:12 INFO SparkContext: Successfully stopped SparkContext
   24/06/01 10:16:12 INFO ShutdownHookManager: Shutdown hook called
   24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-9d9ed177-add3-4aea-bd51-af54389afe1e
   24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8
   (base) soumilshah@Soumils-MacBook-Pro conflictdetection % 
   
   
   ```
   
   #### U1.py
   ```
   try:
       import os
       import sys
       import uuid
       import pyspark
       import datetime
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from faker import Faker
       import datetime
       from datetime import datetime
       import random
       import pandas as pd
       from pyspark.sql.types import StructType, StructField, StringType, 
DateType, FloatType
       from pyspark.sql.functions import col
   
       print("Imports loaded ")
   
   except Exception as e:
       print("error", e)
   
   HUDI_VERSION = '0.14.0'
   SPARK_VERSION = '3.4'
   
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   SUBMIT_ARGS = f"--packages 
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   schema = StructType([
       StructField("orderID", StringType(), True),
       StructField("productSKU", StringType(), True),
       StructField("customerID", StringType(), True),
       StructField("orderDate", StringType(), True),
       StructField("orderAmount", FloatType(), True),
       StructField("state", StringType(), True)
   ])
   
   def write_to_hudi(spark_df,
                     table_name,
                     db_name,
                     method='upsert',
                     table_type='COPY_ON_WRITE',
                     recordkey='',
                     precombine='',
                     partition_fields='',
                     index_type='BLOOM'
                     ):
       path = 
f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
   
       hudi_options = {
           'hoodie.table.name': table_name,
           'hoodie.datasource.write.table.type': table_type,
           'hoodie.datasource.write.table.name': table_name,
           'hoodie.datasource.write.operation': method,
           'hoodie.datasource.write.recordkey.field': recordkey,
           'hoodie.datasource.write.precombine.field': precombine,
           "hoodie.datasource.write.partitionpath.field": partition_fields,
   
           # "hoodie.clean.automatic":"false",
           # " hoodie.clean.async":"true",
   
           "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
           "hoodie.cleaner.policy.failed.writes": "LAZY",
           "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
   
   
   
       }
       print(hudi_options)
   
       print("\n")
       print(path)
       print("\n")
   
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
   
   # Initial data
   data = [
       ("order1", "prod001##", "cust001", "2024-01-15", 150.00, "CA"),
       ("order006", "prod006##", "cust006", "2024-01-20", 350.00, "CA"),
   ]
   
   # Loop to update productSKU and write to Hudi
   for i in range(1, 100):  # Number of iterations
       # Update productSKU
       updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, 
orderDate, orderAmount, state)
                       for (orderID, productSKU, customerID, orderDate, 
orderAmount, state) in data]
   
       # Create the DataFrame with updated data
       df = spark.createDataFrame(updated_data, schema)
   
       # Show the DataFrame with the updated "productSKU" column
       df.show()
   
       # Write to Hudi
       write_to_hudi(
           spark_df=df,
           method="upsert",
           db_name="default",
           table_name="orders",
           recordkey="orderID",
           precombine="orderDate",
           partition_fields="state",
           index_type="RECORD_INDEX"
       )
       import time
       time.sleep(3)
   
   spark.stop()
   
   ```
   
   # u2.py
   ```
   try:
       import os
       import sys
       import uuid
       import pyspark
       import datetime
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from faker import Faker
       import datetime
       from datetime import datetime
       import random
       import pandas as pd
       from pyspark.sql.types import StructType, StructField, StringType, 
DateType, FloatType
       from pyspark.sql.functions import col
   
       print("Imports loaded ")
   
   except Exception as e:
       print("error", e)
   
   HUDI_VERSION = '0.14.0'
   SPARK_VERSION = '3.4'
   
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   SUBMIT_ARGS = f"--packages 
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   schema = StructType([
       StructField("orderID", StringType(), True),
       StructField("productSKU", StringType(), True),
       StructField("customerID", StringType(), True),
       StructField("orderDate", StringType(), True),
       StructField("orderAmount", FloatType(), True),
       StructField("state", StringType(), True)
   ])
   
   
   def write_to_hudi(spark_df,
                     table_name,
                     db_name,
                     method='upsert',
                     table_type='COPY_ON_WRITE',
                     recordkey='',
                     precombine='',
                     partition_fields='',
                     index_type='BLOOM'
                     ):
       path = 
f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
   
       hudi_options = {
           'hoodie.table.name': table_name,
           'hoodie.datasource.write.table.type': table_type,
           'hoodie.datasource.write.table.name': table_name,
           'hoodie.datasource.write.operation': method,
           'hoodie.datasource.write.recordkey.field': recordkey,
           'hoodie.datasource.write.precombine.field': precombine,
           "hoodie.datasource.write.partitionpath.field": partition_fields,
   
           # "hoodie.clean.automatic":"false",
           # " hoodie.clean.async":"true",
   
           "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
           "hoodie.cleaner.policy.failed.writes": "LAZY",
           "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
   
       }
       print(hudi_options)
   
       print("\n")
       print(path)
       print("\n")
   
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
   
   
   # Initial data
   data = [
       ("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"),
       ("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY")
   ]
   
   # Loop to update productSKU and write to Hudi
   for i in range(1, 100):  # Number of iterations
       # Update productSKU
       updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, 
orderDate, orderAmount, state)
                       for (orderID, productSKU, customerID, orderDate, 
orderAmount, state) in data]
   
       # Create the DataFrame with updated data
       df = spark.createDataFrame(updated_data, schema)
   
       # Show the DataFrame with the updated "productSKU" column
       df.show()
   
       # Write to Hudi
       write_to_hudi(
           spark_df=df,
           method="upsert",
           db_name="default",
           table_name="orders",
           recordkey="orderID",
           precombine="orderDate",
           partition_fields="state",
           index_type="RECORD_INDEX"
       )
       import time
   
       time.sleep(2)
   
   spark.stop()
   
   ```
   
   Any insights or suggestions on resolving this issue would be greatly 
appreciated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to