This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6973f4b2e6d [HUDI-6997] A new WriteConcurrencyMode type for
non-blocking concurrency control (#9933)
6973f4b2e6d is described below
commit 6973f4b2e6db9a3ba43c5e2f7497f5c380878565
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Nov 1 08:11:56 2023 +0800
[HUDI-6997] A new WriteConcurrencyMode type for non-blocking concurrency
control (#9933)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../org/apache/hudi/config/HoodieCleanConfig.java | 2 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 56 ++++++------
.../table/action/index/RunIndexActionExecutor.java | 7 +-
.../action/index/ScheduleIndexActionExecutor.java | 7 +-
.../org/apache/hudi/table/marker/WriteMarkers.java | 5 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 62 +++++++++++++
.../hudi/common/model/WriteConcurrencyMode.java | 20 ++++-
.../hudi/configuration/OptionsInference.java | 2 +-
.../apache/hudi/configuration/OptionsResolver.java | 12 +--
.../hudi/sink/StreamWriteOperatorCoordinator.java | 2 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 100 ++++++++++++---------
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 18 ++--
.../org/apache/hudi/sink/utils/TestWriteBase.java | 2 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
16 files changed, 203 insertions(+), 98 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 85b5bebc681..7dbd07ea1cc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -383,7 +383,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*/
public void bootstrap(Option<Map<String, String>> extraMetadata) {
// TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned
later
- if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
throw new HoodieException("Cannot bootstrap the table in multi-writer
mode");
}
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT,
Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index a4114152023..f2851370acf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -143,7 +143,7 @@ public class HoodieCleanConfig extends HoodieConfig {
.withInferFunction(cfg -> {
Option<String> writeConcurrencyModeOpt =
Option.ofNullable(cfg.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE));
if (!writeConcurrencyModeOpt.isPresent()
- ||
!writeConcurrencyModeOpt.get().equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()))
{
+ ||
!WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyModeOpt.get())) {
return Option.empty();
}
return Option.of(HoodieFailedWritesCleaningPolicy.LAZY.name());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5ae7ab25fbd..3e446db48c5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2597,7 +2597,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* Returns whether the explicit guard of lock is required.
*/
public boolean isLockRequired() {
- return !isDefaultLockProvider() ||
getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ return !isDefaultLockProvider() ||
getWriteConcurrencyMode().supportsMultiWriter();
}
/**
@@ -2638,21 +2638,20 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public boolean needResolveWriteConflict(WriteOperationType operationType) {
- if (getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
- // NB-CC don't need to resolve write conflict except bulk insert
operation
- return WriteOperationType.BULK_INSERT == operationType ||
!isNonBlockingConcurrencyControl();
- } else {
- // SINGLE_WRITER case don't need to resolve write conflict
- return false;
+ WriteConcurrencyMode mode = getWriteConcurrencyMode();
+ switch (mode) {
+ case SINGLE_WRITER:
+ return false;
+ case OPTIMISTIC_CONCURRENCY_CONTROL:
+ return true;
+ case NON_BLOCKING_CONCURRENCY_CONTROL:
+ // NB-CC don't need to resolve write conflict except bulk insert
operation
+ return WriteOperationType.BULK_INSERT == operationType;
+ default:
+ throw new IllegalArgumentException("Invalid WriteConcurrencyMode " +
mode);
}
}
- public boolean isNonBlockingConcurrencyControl() {
- return getTableType().equals(HoodieTableType.MERGE_ON_READ)
- && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
- && isSimpleBucketIndex();
- }
-
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3219,7 +3218,7 @@ public class HoodieWriteConfig extends HoodieConfig {
// needed to guard against any concurrent table write operations. If
user has
// not configured any lock provider, let's use the InProcess lock
provider.
return writeConfig.isMetadataTableEnabled() &&
writeConfig.areAnyTableServicesAsync()
- &&
!writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ && !writeConfig.getWriteConcurrencyMode().supportsMultiWriter();
}
private void autoAdjustConfigsForConcurrencyMode(boolean
isLockProviderPropertySet) {
@@ -3236,16 +3235,16 @@ public class HoodieWriteConfig extends HoodieConfig {
}
// We check if "hoodie.cleaner.policy.failed.writes"
- // is properly set to LAZY for optimistic concurrency control
- String writeConcurrencyMode =
writeConfig.getString(WRITE_CONCURRENCY_MODE);
- if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()
- .equalsIgnoreCase(writeConcurrencyMode)) {
+ // is properly set to LAZY for multi-writers
+ WriteConcurrencyMode writeConcurrencyMode =
writeConfig.getWriteConcurrencyMode();
+ if (writeConcurrencyMode.supportsMultiWriter()) {
// In this case, we assume that the user takes care of setting the
lock provider used
writeConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
- LOG.info(String.format("Automatically set %s=%s since optimistic
concurrency control is used",
+ LOG.info(String.format("Automatically set %s=%s since %s is used",
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
- HoodieFailedWritesCleaningPolicy.LAZY.name()));
+ HoodieFailedWritesCleaningPolicy.LAZY.name(),
+ writeConcurrencyMode.name()));
}
}
@@ -3254,15 +3253,22 @@ public class HoodieWriteConfig extends HoodieConfig {
// Ensure Layout Version is good
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
+ WriteConcurrencyMode writeConcurrencyMode =
writeConfig.getWriteConcurrencyMode();
if (writeConfig.isEarlyConflictDetectionEnable()) {
- checkArgument(writeConfig.getString(WRITE_CONCURRENCY_MODE)
-
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()),
+ checkArgument(writeConcurrencyMode.isOptimisticConcurrencyControl(),
"To use early conflict detection, set
hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL");
}
- if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
-
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
+ if (writeConcurrencyMode.supportsMultiWriter()) {
checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
- .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable
optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
+ .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()),
+ String.format(
+ "To enable %s, set hoodie.cleaner.policy.failed.writes=LAZY",
+ writeConcurrencyMode.name()));
+ }
+ if (writeConcurrencyMode ==
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL) {
+ checkArgument(
+ writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)
&& writeConfig.isSimpleBucketIndex(),
+ "Non-blocking concurrency control requires the MOR table with
simple bucket index");
}
HoodieCleaningPolicy cleaningPolicy =
HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index c7e27e18de5..d4fc3ea3fb1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
@@ -246,9 +247,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
private HoodieInstant validateAndGetIndexInstant() {
// ensure lock provider configured
- if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
- throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
- WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ if (!config.getWriteConcurrencyMode().supportsMultiWriter() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s or %s
and configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name(),
NON_BLOCKING_CONCURRENCY_CONTROL.name()));
}
return table.getActiveTimeline()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index c8557cbbc4c..42964df04a4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
@@ -125,9 +126,9 @@ public class ScheduleIndexActionExecutor<T, I, K, O>
extends BaseActionExecutor<
throw new HoodieIndexException("Not all index types are valid: " +
partitionIndexTypes);
}
// ensure lock provider configured
- if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
- throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
- WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ if (!config.getWriteConcurrencyMode().supportsMultiWriter() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s or %s
and configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name(),
NON_BLOCKING_CONCURRENCY_CONTROL.name()));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
index fc4d3dbfdf4..19ee931b633 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
@@ -81,8 +81,7 @@ public abstract class WriteMarkers implements Serializable {
*/
public Option<Path> create(String partitionPath, String fileName, IOType
type, HoodieWriteConfig writeConfig,
String fileId, HoodieActiveTimeline
activeTimeline) {
- if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
- && writeConfig.isEarlyConflictDetectionEnable()) {
+ if (writeConfig.getWriteConcurrencyMode().isOptimisticConcurrencyControl()
&& writeConfig.isEarlyConflictDetectionEnable()) {
HoodieTimeline pendingCompactionTimeline =
activeTimeline.filterPendingCompactionTimeline();
HoodieTimeline pendingReplaceTimeline =
activeTimeline.filterPendingReplaceTimeline();
// TODO If current is compact or clustering then create marker directly
without early conflict detection.
@@ -122,7 +121,7 @@ public abstract class WriteMarkers implements Serializable {
public Option<Path> createIfNotExists(String partitionPath, String fileName,
IOType type, HoodieWriteConfig writeConfig,
String fileId, HoodieActiveTimeline
activeTimeline) {
if (writeConfig.isEarlyConflictDetectionEnable()
- &&
writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ &&
writeConfig.getWriteConcurrencyMode().isOptimisticConcurrencyControl()) {
HoodieTimeline pendingCompactionTimeline =
activeTimeline.filterPendingCompactionTimeline();
HoodieTimeline pendingReplaceTimeline =
activeTimeline.filterPendingReplaceTimeline();
// TODO If current is compact or clustering then create marker directly
without early conflict detection.
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index f9d5d69aec0..5c93f924ece 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -500,6 +500,68 @@ public class TestHoodieWriteConfig {
assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner",
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
}
+ @Test
+ public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+ .build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+ .build();
+
+ // Verify automatically set hoodie.cleaner.policy.failed.writes=LAZY for
non-blocking concurrency control
+ verifyConcurrencyControlRelatedConfigs(writeConfig,
+ true, true, true,
+ WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+ }
+
+ @Test
+ public void testNonBlockingConcurrencyControlInvalidEarlyConflictDetection()
{
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+ HoodieWriteConfig.Builder writeConfigBuilder =
HoodieWriteConfig.newBuilder().withPath("/tmp");
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> writeConfigBuilder.withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+ .build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+ .withEarlyConflictDetectionEnable(true)
+ .build(),
+ "To use early conflict detection, set
hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL");
+ }
+
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void
testNonBlockingConcurrencyControlInvalidTableTypeOrIndexType(HoodieTableType
tableType) {
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieTableConfig.TYPE.key(), tableType.name());
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+ HoodieWriteConfig.Builder writeConfigBuilder =
HoodieWriteConfig.newBuilder().withPath("/tmp");
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> writeConfigBuilder.withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.SIMPLE)
+ .build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+ .build(),
+ "Non-blocking concurrency control requires the MOR table with simple
bucket index");
+ }
+
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
final Properties properties = new Properties();
configs.forEach(properties::setProperty);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
index c11f3733983..f93a6b9ef0f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.model;
import org.apache.hudi.common.config.EnumDescription;
import org.apache.hudi.common.config.EnumFieldDescription;
+import java.util.Locale;
+
/**
* Different concurrency modes for write operations.
*/
@@ -35,9 +37,23 @@ public enum WriteConcurrencyMode {
@EnumFieldDescription("Multiple writers can operate on the table with lazy
conflict resolution "
+ "using locks. This means that only one writer succeeds if multiple
writers write to the "
+ "same file group.")
- OPTIMISTIC_CONCURRENCY_CONTROL;
+ OPTIMISTIC_CONCURRENCY_CONTROL,
+
+ // Multiple writer can perform write ops on a MOR table with non-blocking
conflict resolution
+ @EnumFieldDescription("Multiple writers can operate on the table with
non-blocking conflict resolution. "
+ + "The writers can write into the same file group with the conflicts
resolved automatically "
+ + "by the query reader and the compactor.")
+ NON_BLOCKING_CONCURRENCY_CONTROL;
+
+ public boolean supportsMultiWriter() {
+ return this == OPTIMISTIC_CONCURRENCY_CONTROL || this ==
NON_BLOCKING_CONCURRENCY_CONTROL;
+ }
+
+ public static boolean supportsMultiWriter(String name) {
+ return
WriteConcurrencyMode.valueOf(name.toUpperCase(Locale.ROOT)).supportsMultiWriter();
+ }
- public boolean supportsOptimisticConcurrencyControl() {
+ public boolean isOptimisticConcurrencyControl() {
return this == OPTIMISTIC_CONCURRENCY_CONTROL;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
index 9ffa71726e4..388503b5032 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -80,7 +80,7 @@ public class OptionsInference {
* @see ClientIds
*/
public static void setupClientId(Configuration conf) {
- if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+ if (OptionsResolver.isMultiWriter(conf)) {
// explicit client id always has higher priority
if (!conf.contains(FlinkOptions.WRITE_CLIENT_ID)) {
try (ClientIds clientIds = ClientIds.builder().conf(conf).build()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 18cc4a23ae4..dded4207b8a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -306,15 +306,14 @@ public class OptionsResolver {
* Returns whether the writer txn should be guarded by lock.
*/
public static boolean isLockRequired(Configuration conf) {
- return conf.getBoolean(FlinkOptions.METADATA_ENABLED) ||
isOptimisticConcurrencyControl(conf);
+ return conf.getBoolean(FlinkOptions.METADATA_ENABLED) ||
isMultiWriter(conf);
}
/**
- * Returns whether OCC is enabled.
+ * Returns whether multi-writer is enabled.
*/
- public static boolean isOptimisticConcurrencyControl(Configuration conf) {
- return conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ public static boolean isMultiWriter(Configuration conf) {
+ return
WriteConcurrencyMode.supportsMultiWriter(conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
}
/**
@@ -371,7 +370,8 @@ public class OptionsResolver {
* Returns whether this is non-blocking concurrency control.
*/
public static boolean isNonBlockingConcurrencyControl(Configuration config) {
- return isMorTable(config) && isSimpleBucketIndexType(config) &&
isOptimisticConcurrencyControl(config);
+ return config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+
.equalsIgnoreCase(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
}
public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 860dffe99eb..dfc713244d5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -202,7 +202,7 @@ public class StreamWriteOperatorCoordinator
initHiveSync();
}
// start client id heartbeats for optimistic concurrency control
- if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+ if (OptionsResolver.isMultiWriter(conf)) {
initClientIds(conf);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index da712523c16..9364528eb33 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
@@ -49,6 +51,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for stream write.
@@ -520,27 +523,39 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// case1: txn2's time range is involved in txn1
// |----------- txn1 -----------|
// | ----- txn2 ----- |
- @Test
- public void testWriteMultiWriterInvolved() throws Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ @ParameterizedTest
+ @EnumSource(value = WriteConcurrencyMode.class, names =
{"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
+ public void testWriteMultiWriterInvolved(WriteConcurrencyMode
writeConcurrencyMode) throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
writeConcurrencyMode.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
- TestHarness pipeline1 = preparePipeline(conf)
- .consume(TestData.DATA_SET_INSERT_DUPLICATES)
- .assertEmptyDataFiles();
- // now start pipeline2 and commit the txn
- Configuration conf2 = conf.clone();
- conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
- preparePipeline(conf2)
- .consume(TestData.DATA_SET_INSERT_DUPLICATES)
- .assertEmptyDataFiles()
- .checkpoint(1)
- .assertNextEvent()
- .checkpointComplete(1)
- .checkWrittenData(EXPECTED3, 1);
- // step to commit the 2nd txn
- validateConcurrentCommit(pipeline1);
+ if (OptionsResolver.isCowTable(conf) &&
OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+ validateNonBlockingConcurrencyControlConditions();
+ } else {
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+ // now start pipeline2 and commit the txn
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ preparePipeline(conf2)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1);
+ // step to commit the 2nd txn
+ validateConcurrentCommit(pipeline1);
+ }
+ }
+
+ protected void validateNonBlockingConcurrencyControlConditions() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> preparePipeline(conf),
+ "Non-blocking concurrency control requires the MOR table with simple
bucket index");
}
private void validateConcurrentCommit(TestHarness pipeline) throws Exception
{
@@ -562,32 +577,37 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// case2: txn2's time range has partial overlap with txn1
// |----------- txn1 -----------|
// | ----- txn2 ----- |
- @Test
- public void testWriteMultiWriterPartialOverlapping() throws Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ @ParameterizedTest
+ @EnumSource(value = WriteConcurrencyMode.class, names =
{"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
+ public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode
writeConcurrencyMode) throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
writeConcurrencyMode.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
- TestHarness pipeline1 = preparePipeline(conf)
- .consume(TestData.DATA_SET_INSERT_DUPLICATES)
- .assertEmptyDataFiles();
- // now start pipeline2 and suspend the txn commit
- Configuration conf2 = conf.clone();
- conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
- TestHarness pipeline2 = preparePipeline(conf2)
- .consume(TestData.DATA_SET_INSERT_DUPLICATES)
- .assertEmptyDataFiles();
-
- // step to commit the 1st txn, should succeed
- pipeline1.checkpoint(1)
- .assertNextEvent()
- .checkpointComplete(1)
- .checkWrittenData(EXPECTED3, 1);
+ if (OptionsResolver.isCowTable(conf) &&
OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+ validateNonBlockingConcurrencyControlConditions();
+ } else {
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+ // now start pipeline2 and suspend the txn commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+
+ // step to commit the 1st txn, should succeed
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1);
- // step to commit the 2nd txn
- // should success for concurrent modification of same fileGroups if using
non-blocking concurrency control
- // should throw exception otherwise
- validateConcurrentCommit(pipeline2);
+ // step to commit the 2nd txn
+ // should success for concurrent modification of same fileGroups if
using non-blocking concurrency control
+ // should throw exception otherwise
+ validateConcurrentCommit(pipeline2);
+ }
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 816c9a4e655..e07118cb052 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -89,7 +89,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
@Test
public void testNonBlockingConcurrencyControlWithPartialUpdatePayload()
throws Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
PartialUpdateAvroPayload.class.getName());
// disable schedule compaction in writers
@@ -133,7 +133,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
// There is no base file in partition dir because there is no compaction
yet.
pipeline1.assertEmptyDataFiles();
- // schedule compaction outside of all writers
+ // schedule compaction outside all writers
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
Option<String> scheduleInstant =
writeClient.scheduleCompaction(Option.empty());
assertNotNull(scheduleInstant.get());
@@ -163,7 +163,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
@Test
public void testNonBlockingConcurrencyControlWithInflightInstant() throws
Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
// disable schedule compaction in writers
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
@@ -199,7 +199,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
pipeline2.checkpoint(1)
.assertNextEvent();
- // schedule compaction outside of all writers
+ // schedule compaction outside all writers
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
Option<String> scheduleInstant =
writeClient.scheduleCompaction(Option.empty());
assertNotNull(scheduleInstant.get());
@@ -234,8 +234,8 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
// |----- txn2 ------|
// the txn2 would fail to commit caused by conflict
@Test
- public void testBulkInsertInMultiWriter() throws Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ public void testBulkInsertWithNonBlockingConcurrencyControl() throws
Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
PartialUpdateAvroPayload.class.getName());
// disable schedule compaction in writers
@@ -276,8 +276,8 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
// |----------- txn2 -----------|
// both two txn would success to commit
@Test
- public void testBulkInsertInSequence() throws Exception {
- conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl()
throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
PartialUpdateAvroPayload.class.getName());
// disable schedule compaction in writers
@@ -316,7 +316,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
pipeline2.checkWrittenData(tmpSnapshotResult, 1);
- // schedule compaction outside of all writers
+ // schedule compaction outside all writers
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
Option<String> scheduleInstant =
writeClient.scheduleCompaction(Option.empty());
assertNotNull(scheduleInstant.get());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b2208b88b53..4d9ff3226b0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -551,7 +551,7 @@ public class TestWriteBase {
protected String lastCompleteInstant() {
// If using optimistic concurrency control, fetch last complete instant
of current writer from ckp metadata
// because there are multiple write clients commit to the timeline.
- if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+ if (OptionsResolver.isMultiWriter(conf)) {
return this.ckpMetadata.lastCompleteInstant();
} else {
// fetch the instant from timeline.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 965340c637a..10b77d4b91c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -165,7 +165,7 @@ class DefaultSource extends RelationProvider
def validateMultiWriterConfigs(options: Map[String, String]) : Unit = {
if (ConfigUtils.resolveEnum(classOf[WriteConcurrencyMode],
options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
- WRITE_CONCURRENCY_MODE.defaultValue())) ==
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) {
+ WRITE_CONCURRENCY_MODE.defaultValue())).supportsMultiWriter()) {
// ensure some valid value is set for identifier
checkState(options.contains(STREAMING_CHECKPOINT_IDENTIFIER.key()), "For
multi-writer scenarios, please set "
+ STREAMING_CHECKPOINT_IDENTIFIER.key() + ". Each writer should set
different values for this identifier")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3889657faf2..a63f861480d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -181,7 +181,7 @@ class HoodieSparkSqlWriterInternal {
} catch {
case e: HoodieWriteConflictException =>
val writeConcurrencyMode =
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
- if
(writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
&& counter < maxRetry) {
+ if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode)
&& counter < maxRetry) {
counter += 1
log.warn(s"Conflict found. Retrying again for attempt no $counter")
} else {