JosepSampe opened a new issue, #13036:
URL: https://github.com/apache/hudi/issues/13036

   **Describe the problem you faced**
   
   Currently, I am using GCP to run workloads on an Apache Hudi table stored in 
GCS. To enable distributed locking concurrency control, I found in the 
documentation that it is possible to configure a Hive Metastore for distributed 
locking ([Hive Metastore-based concurrency 
control](https://hudi.apache.org/docs/concurrency_control/#hivemetastore-based)).
 GCP provides a managed Hive Metastore called Dataproc Metastore, so I 
attempted to use it with Hudi.
   
   I realized that the Hudi Hive Metastore implementation does not 
automatically create the necessary resources, unlike DynamoDB. As a result, I 
created the database and table using SQL commands and provided the appropriate 
configuration in the hudi-defaults.conf file.
   
   At this point, I executed a workload with multiple writers, and it appears 
to complete successfully, except that I noticed a WARNING and a traceback 
during the execution of the experiment. 
   
   As shown in the following image, I can see the `lock()` activity in the 
Dataproc Metastore dashboard during the experiment:
   
   
![Image](https://github.com/user-attachments/assets/12ab3d0d-e087-4460-8dc0-bedb8976a2cb)
   
   However, I can also see this warning and traceback in one of the spark 
executors:
   
   ```
   2025-03-18T13:42:56,296 WARN RetryHelper: Catch Exception for acquire lock, 
will retry after 5000 ms.
   org.apache.hudi.exception.HoodieLockException: Unable to acquire the lock. 
Current lock owner information : 
           at 
org.apache.hudi.client.transaction.lock.LockManager.lambda$lock$20c251e3$1(LockManager.java:83)
           at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:94)
           at 
org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:77)
           at 
org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:58)
           at 
org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2.archiveIfRequired(TimelineArchiverV2.java:98)
           at 
org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:834)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:887)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:616)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:582)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:258)
           at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:93)
           at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:63)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:211)
           at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:206)
           at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
           at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
           at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
           at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
           at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
           at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
           at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
           at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
           at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
           at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
           at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
           at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
           at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
           at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
           at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
   ```
   
   So my questions are:  
       1. Is it actually necessary to configure `Zookeeper` when using GCP 
Dataproc Metastore? The `lock()` appears to be working based on the dashboard 
activity, but in the Hudi docs, you mention: *"Zookeeper is required for HMS 
lock provider. Users can set the Zookeeper configs for Hive using Hudi."*
       2.  is this warning and exception important, or is it just an 
informative message indicating that another writer has the lock, so the current 
writer must wait until the lock is released? If it is just an informative 
message, is it really necessary to show a traceback in addition to the warning? 
   
   Because of the exception, I am unsure if the `lock()` is actually working 
properly. Additionally, because of the exception, I'm unsure whether Zookeeper 
is necessary when using Dataproc Metastore.
   
   
   **Steps to reproduce the behavior:**
   
   To create the resources in the Dataproc Metastore I used a spark-shell:
   
   ```
   spark-shell --conf spark.sql.catalogImplementation=hive --conf 
spark.hadoop.hive.metastore.uris=thrift://10.10.10.5:9083 --conf 
spark.hadoop.hive.metastore.warehouse.dir=gs://gcs-bucket-my-dev-metastore-04103c0c-3fe9-4e9d-8574-a9eddf2/hive-warehouse
   spark.sql("CREATE DATABASE my_testing_lock").show()
   spark.sql("SHOW DATABASES").show()
   spark.sql("USE my_testing_lock")
   spark.sql("CREATE TABLE IF NOT EXISTS my_testing_lock_table (key STRING) 
STORED AS PARQUET").show()
   ```
   
   Then, for this test, I used these keys in the `hudi-defaults.conf` file:
   
   ```
   hoodie.write.lock.provider                  
org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider
   hoodie.write.lock.hivemetastore.uris        thrift://10.10.10.5:9083
   hoodie.write.lock.hivemetastore.database    my_testing_lock
   hoodie.write.lock.hivemetastore.table            my_testing_lock_tablel
   ```
   
   **Expected behavior**
   
   Execute the workload with multiple writers without any exception.
   
   **Environment Description**
   
   * Hudi version : 1.0.0
   
   * Spark version : 3.5.2
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : Spark workers run on GKE
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to