This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aa22da38f43 [HUDI-8603] Fixing lock provider for single writer
use-cases (#12371)
aa22da38f43 is described below
commit aa22da38f43a80ebe8e531e63a25a84fc8353718
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Nov 30 00:31:28 2024 -0800
[HUDI-8603] Fixing lock provider for single writer use-cases (#12371)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 11 ++++++
.../apache/hudi/config/TestHoodieWriteConfig.java | 43 +++++++++++++++++++++-
2 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8af086c560d..0dda5e19abc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3405,6 +3405,17 @@ public class HoodieWriteConfig extends HoodieConfig {
}
private void autoAdjustConfigsForConcurrencyMode(boolean
isLockProviderPropertySet) {
+ // for a single writer scenario, with all table services inline, lets
set InProcessLockProvider
+ if (writeConfig.getWriteConcurrencyMode() ==
WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) {
+ if (writeConfig.getLockProviderClass() != null &&
!writeConfig.getLockProviderClass().equals(InProcessLockProvider.class.getCanonicalName()))
{
+ // add logs only when explicitly overridden by the user.
+ LOG.warn(String.format("For a single writer mode, overriding lock
provider class (%s) to %s. So, user configured lock provider %s may not take
effect",
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName(), writeConfig.getLockProviderClass()));
+ writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+ InProcessLockProvider.class.getName());
+ }
+ }
+
if (!isLockProviderPropertySet && writeConfig.isAutoAdjustLockConfigs()
&& isLockRequiredForSingleWriter()) {
// auto adjustment is required only for deltastreamer and spark
streaming where async table services can be executed in the same JVM.
// This is targeted at Single writer with async table services
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 9be904ee925..7e1f7755479 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -314,6 +314,44 @@ public class TestHoodieWriteConfig {
inProcessLockProviderClassName);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testAutoAdjustLockConfigsSingleWriter(HoodieTableType tableType)
{
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withAutoAdjustLockConfigs(true)
+ .withClusteringConfig(new
HoodieClusteringConfig.Builder().withInlineClustering(true).build())
+ .withProperties(properties)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
+ .build();
+
+ verifyConcurrencyControlRelatedConfigs(writeConfig,
+ true, false, true,
+
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+
+ // even if user explicitly sets a LP, we override it to InProcess
LockProvider.
+ writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withAutoAdjustLockConfigs(true)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(FileSystemBasedLockProviderTestClass.class)
+ .build())
+ .withClusteringConfig(new
HoodieClusteringConfig.Builder().withInlineClustering(true).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
+ .withProperties(properties)
+ .build();
+
+ verifyConcurrencyControlRelatedConfigs(writeConfig,
+ true, false, true,
+
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ InProcessLockProvider.class.getName());
+ }
+
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
@@ -357,15 +395,16 @@ public class TestHoodieWriteConfig {
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).build())
.withAutoAdjustLockConfigs(true)
.withProperties(properties)
.build();
verifyConcurrencyControlRelatedConfigs(writeConfig,
- true, tableType == HoodieTableType.MERGE_ON_READ, true,
+ true, false, true,
WriteConcurrencyMode.valueOf(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue()),
- FileSystemBasedLockProviderTestClass.class.getName());
+ InProcessLockProvider.class.getName());
// 2. User can set the lock provider via properties
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new
HashMap<String, String>() {