This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aa22da38f43 [HUDI-8603] Fixing lock provider for single writer 
use-cases (#12371)
aa22da38f43 is described below

commit aa22da38f43a80ebe8e531e63a25a84fc8353718
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Nov 30 00:31:28 2024 -0800

    [HUDI-8603] Fixing lock provider for single writer use-cases (#12371)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 11 ++++++
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 43 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8af086c560d..0dda5e19abc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3405,6 +3405,17 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     private void autoAdjustConfigsForConcurrencyMode(boolean 
isLockProviderPropertySet) {
+      // for a single writer scenario, with all table services inline, lets 
set InProcessLockProvider
+      if (writeConfig.getWriteConcurrencyMode() == 
WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) {
+        if (writeConfig.getLockProviderClass() != null && 
!writeConfig.getLockProviderClass().equals(InProcessLockProvider.class.getCanonicalName()))
 {
+          // add logs only when explicitly overridden by the user.
+          LOG.warn(String.format("For a single writer mode, overriding lock 
provider class (%s) to %s. So, user configured lock provider %s may not take 
effect",
+              HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getName(), writeConfig.getLockProviderClass()));
+          writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+              InProcessLockProvider.class.getName());
+        }
+      }
+
       if (!isLockProviderPropertySet && writeConfig.isAutoAdjustLockConfigs() 
&& isLockRequiredForSingleWriter()) {
         // auto adjustment is required only for deltastreamer and spark 
streaming where async table services can be executed in the same JVM.
         // This is targeted at Single writer with async table services
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 9be904ee925..7e1f7755479 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -314,6 +314,44 @@ public class TestHoodieWriteConfig {
         inProcessLockProviderClassName);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testAutoAdjustLockConfigsSingleWriter(HoodieTableType tableType) 
{
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withAutoAdjustLockConfigs(true)
+        .withClusteringConfig(new 
HoodieClusteringConfig.Builder().withInlineClustering(true).build())
+        .withProperties(properties)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
+        .build();
+
+    verifyConcurrencyControlRelatedConfigs(writeConfig,
+        true, false, true,
+        
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
+        
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+
+    // even if user explicitly sets a LP, we override it to InProcess 
LockProvider.
+    writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withAutoAdjustLockConfigs(true)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(FileSystemBasedLockProviderTestClass.class)
+            .build())
+        .withClusteringConfig(new 
HoodieClusteringConfig.Builder().withInlineClustering(true).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
+        .withProperties(properties)
+        .build();
+
+    verifyConcurrencyControlRelatedConfigs(writeConfig,
+        true, false, true,
+        
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
+        
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        InProcessLockProvider.class.getName());
+  }
+
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
@@ -357,15 +395,16 @@ public class TestHoodieWriteConfig {
         .withLockConfig(HoodieLockConfig.newBuilder()
             .withLockProvider(FileSystemBasedLockProviderTestClass.class)
             .build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
         .withAutoAdjustLockConfigs(true)
         .withProperties(properties)
         .build();
 
     verifyConcurrencyControlRelatedConfigs(writeConfig,
-        true, tableType == HoodieTableType.MERGE_ON_READ, true,
+        true, false, true,
         
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
         
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
-        FileSystemBasedLockProviderTestClass.class.getName());
+        InProcessLockProvider.class.getName());
 
     // 2. User can set the lock provider via properties
     verifyConcurrencyControlRelatedConfigs(createWriteConfig(new 
HashMap<String, String>() {

Reply via email to