This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6973f4b2e6d [HUDI-6997] A new WriteConcurrencyMode type for 
non-blocking concurrency control (#9933)
6973f4b2e6d is described below

commit 6973f4b2e6db9a3ba43c5e2f7497f5c380878565
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Nov 1 08:11:56 2023 +0800

    [HUDI-6997] A new WriteConcurrencyMode type for non-blocking concurrency 
control (#9933)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../org/apache/hudi/config/HoodieCleanConfig.java  |   2 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  56 ++++++------
 .../table/action/index/RunIndexActionExecutor.java |   7 +-
 .../action/index/ScheduleIndexActionExecutor.java  |   7 +-
 .../org/apache/hudi/table/marker/WriteMarkers.java |   5 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |  62 +++++++++++++
 .../hudi/common/model/WriteConcurrencyMode.java    |  20 ++++-
 .../hudi/configuration/OptionsInference.java       |   2 +-
 .../apache/hudi/configuration/OptionsResolver.java |  12 +--
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   2 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 100 ++++++++++++---------
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |  18 ++--
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |   2 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |   2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +-
 16 files changed, 203 insertions(+), 98 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 85b5bebc681..7dbd07ea1cc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -383,7 +383,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    */
   public void bootstrap(Option<Map<String, String>> extraMetadata) {
     // TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned 
later
-    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+    if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
       throw new HoodieException("Cannot bootstrap the table in multi-writer 
mode");
     }
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, 
Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index a4114152023..f2851370acf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -143,7 +143,7 @@ public class HoodieCleanConfig extends HoodieConfig {
       .withInferFunction(cfg -> {
         Option<String> writeConcurrencyModeOpt = 
Option.ofNullable(cfg.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE));
         if (!writeConcurrencyModeOpt.isPresent()
-            || 
!writeConcurrencyModeOpt.get().equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()))
 {
+            || 
!WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyModeOpt.get())) {
           return Option.empty();
         }
         return Option.of(HoodieFailedWritesCleaningPolicy.LAZY.name());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5ae7ab25fbd..3e446db48c5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2597,7 +2597,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * Returns whether the explicit guard of lock is required.
    */
   public boolean isLockRequired() {
-    return !isDefaultLockProvider() || 
getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+    return !isDefaultLockProvider() || 
getWriteConcurrencyMode().supportsMultiWriter();
   }
 
   /**
@@ -2638,21 +2638,20 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean needResolveWriteConflict(WriteOperationType operationType) {
-    if (getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
-      // NB-CC don't need to resolve write conflict except bulk insert 
operation
-      return WriteOperationType.BULK_INSERT == operationType || 
!isNonBlockingConcurrencyControl();
-    } else {
-      // SINGLE_WRITER case don't need to resolve write conflict
-      return false;
+    WriteConcurrencyMode mode = getWriteConcurrencyMode();
+    switch (mode) {
+      case SINGLE_WRITER:
+        return false;
+      case OPTIMISTIC_CONCURRENCY_CONTROL:
+        return true;
+      case NON_BLOCKING_CONCURRENCY_CONTROL:
+        // NB-CC don't need to resolve write conflict except bulk insert 
operation
+        return WriteOperationType.BULK_INSERT == operationType;
+      default:
+        throw new IllegalArgumentException("Invalid WriteConcurrencyMode " + 
mode);
     }
   }
 
-  public boolean isNonBlockingConcurrencyControl() {
-    return getTableType().equals(HoodieTableType.MERGE_ON_READ)
-        && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
-        && isSimpleBucketIndex();
-  }
-
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3219,7 +3218,7 @@ public class HoodieWriteConfig extends HoodieConfig {
       // needed to guard against any concurrent table write operations. If 
user has
       // not configured any lock provider, let's use the InProcess lock 
provider.
       return writeConfig.isMetadataTableEnabled() && 
writeConfig.areAnyTableServicesAsync()
-          && 
!writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+          && !writeConfig.getWriteConcurrencyMode().supportsMultiWriter();
     }
 
     private void autoAdjustConfigsForConcurrencyMode(boolean 
isLockProviderPropertySet) {
@@ -3236,16 +3235,16 @@ public class HoodieWriteConfig extends HoodieConfig {
       }
 
       // We check if "hoodie.cleaner.policy.failed.writes"
-      // is properly set to LAZY for optimistic concurrency control
-      String writeConcurrencyMode = 
writeConfig.getString(WRITE_CONCURRENCY_MODE);
-      if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()
-          .equalsIgnoreCase(writeConcurrencyMode)) {
+      // is properly set to LAZY for multi-writers
+      WriteConcurrencyMode writeConcurrencyMode = 
writeConfig.getWriteConcurrencyMode();
+      if (writeConcurrencyMode.supportsMultiWriter()) {
         // In this case, we assume that the user takes care of setting the 
lock provider used
         
writeConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
             HoodieFailedWritesCleaningPolicy.LAZY.name());
-        LOG.info(String.format("Automatically set %s=%s since optimistic 
concurrency control is used",
+        LOG.info(String.format("Automatically set %s=%s since %s is used",
             HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
-            HoodieFailedWritesCleaningPolicy.LAZY.name()));
+            HoodieFailedWritesCleaningPolicy.LAZY.name(),
+            writeConcurrencyMode.name()));
       }
     }
 
@@ -3254,15 +3253,22 @@ public class HoodieWriteConfig extends HoodieConfig {
       // Ensure Layout Version is good
       new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
       Objects.requireNonNull(writeConfig.getString(BASE_PATH));
+      WriteConcurrencyMode writeConcurrencyMode = 
writeConfig.getWriteConcurrencyMode();
       if (writeConfig.isEarlyConflictDetectionEnable()) {
-        checkArgument(writeConfig.getString(WRITE_CONCURRENCY_MODE)
-                
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()),
+        checkArgument(writeConcurrencyMode.isOptimisticConcurrencyControl(),
             "To use early conflict detection, set 
hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL");
       }
-      if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
-          
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
+      if (writeConcurrencyMode.supportsMultiWriter()) {
         
checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
-            .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable 
optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
+            .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()),
+            String.format(
+                "To enable %s, set hoodie.cleaner.policy.failed.writes=LAZY",
+                writeConcurrencyMode.name()));
+      }
+      if (writeConcurrencyMode == 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL) {
+        checkArgument(
+            writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ) 
&& writeConfig.isSimpleBucketIndex(),
+            "Non-blocking concurrency control requires the MOR table with 
simple bucket index");
       }
 
       HoodieCleaningPolicy cleaningPolicy = 
HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index c7e27e18de5..d4fc3ea3fb1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
 import static 
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
@@ -246,9 +247,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
 
   private HoodieInstant validateAndGetIndexInstant() {
     // ensure lock provider configured
-    if 
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || 
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
-      throw new HoodieIndexException(String.format("Need to set %s as %s and 
configure lock provider class",
-          WRITE_CONCURRENCY_MODE.key(), 
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+    if (!config.getWriteConcurrencyMode().supportsMultiWriter() || 
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+      throw new HoodieIndexException(String.format("Need to set %s as %s or %s 
and configure lock provider class",
+          WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name(), 
NON_BLOCKING_CONCURRENCY_CONTROL.name()));
     }
 
     return table.getActiveTimeline()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index c8557cbbc4c..42964df04a4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
 import static 
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
@@ -125,9 +126,9 @@ public class ScheduleIndexActionExecutor<T, I, K, O> 
extends BaseActionExecutor<
       throw new HoodieIndexException("Not all index types are valid: " + 
partitionIndexTypes);
     }
     // ensure lock provider configured
-    if 
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || 
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
-      throw new HoodieIndexException(String.format("Need to set %s as %s and 
configure lock provider class",
-          WRITE_CONCURRENCY_MODE.key(), 
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+    if (!config.getWriteConcurrencyMode().supportsMultiWriter() || 
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+      throw new HoodieIndexException(String.format("Need to set %s as %s or %s 
and configure lock provider class",
+          WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name(), 
NON_BLOCKING_CONCURRENCY_CONTROL.name()));
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
index fc4d3dbfdf4..19ee931b633 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
@@ -81,8 +81,7 @@ public abstract class WriteMarkers implements Serializable {
    */
   public Option<Path> create(String partitionPath, String fileName, IOType 
type, HoodieWriteConfig writeConfig,
                              String fileId, HoodieActiveTimeline 
activeTimeline) {
-    if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
-        && writeConfig.isEarlyConflictDetectionEnable()) {
+    if (writeConfig.getWriteConcurrencyMode().isOptimisticConcurrencyControl() 
&& writeConfig.isEarlyConflictDetectionEnable()) {
       HoodieTimeline pendingCompactionTimeline = 
activeTimeline.filterPendingCompactionTimeline();
       HoodieTimeline pendingReplaceTimeline = 
activeTimeline.filterPendingReplaceTimeline();
       // TODO If current is compact or clustering then create marker directly 
without early conflict detection.
@@ -122,7 +121,7 @@ public abstract class WriteMarkers implements Serializable {
   public Option<Path> createIfNotExists(String partitionPath, String fileName, 
IOType type, HoodieWriteConfig writeConfig,
                              String fileId, HoodieActiveTimeline 
activeTimeline) {
     if (writeConfig.isEarlyConflictDetectionEnable()
-        && 
writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+        && 
writeConfig.getWriteConcurrencyMode().isOptimisticConcurrencyControl()) {
       HoodieTimeline pendingCompactionTimeline = 
activeTimeline.filterPendingCompactionTimeline();
       HoodieTimeline pendingReplaceTimeline = 
activeTimeline.filterPendingReplaceTimeline();
       // TODO If current is compact or clustering then create marker directly 
without early conflict detection.
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index f9d5d69aec0..5c93f924ece 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -500,6 +500,68 @@ public class TestHoodieWriteConfig {
     assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", 
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
   }
 
+  @Test
+  public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withIndexConfig(
+            HoodieIndexConfig.newBuilder()
+                .fromProperties(props)
+                .withIndexType(HoodieIndex.IndexType.BUCKET)
+                
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+                .build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+        .build();
+
+    // Verify automatically set hoodie.cleaner.policy.failed.writes=LAZY for 
non-blocking concurrency control
+    verifyConcurrencyControlRelatedConfigs(writeConfig,
+        true, true, true,
+        WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL, 
HoodieFailedWritesCleaningPolicy.LAZY,
+        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+  }
+
+  @Test
+  public void testNonBlockingConcurrencyControlInvalidEarlyConflictDetection() 
{
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+    HoodieWriteConfig.Builder writeConfigBuilder = 
HoodieWriteConfig.newBuilder().withPath("/tmp");
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> writeConfigBuilder.withIndexConfig(
+            HoodieIndexConfig.newBuilder()
+                .fromProperties(props)
+                .withIndexType(HoodieIndex.IndexType.BUCKET)
+                
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+                .build())
+            
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+            .withEarlyConflictDetectionEnable(true)
+            .build(),
+        "To use early conflict detection, set 
hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL");
+  }
+
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testNonBlockingConcurrencyControlInvalidTableTypeOrIndexType(HoodieTableType 
tableType) {
+    TypedProperties props = new TypedProperties();
+    props.put(HoodieTableConfig.TYPE.key(), tableType.name());
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+    HoodieWriteConfig.Builder writeConfigBuilder = 
HoodieWriteConfig.newBuilder().withPath("/tmp");
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> writeConfigBuilder.withIndexConfig(
+            HoodieIndexConfig.newBuilder()
+                .fromProperties(props)
+                .withIndexType(HoodieIndex.IndexType.SIMPLE)
+                .build())
+            
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+            .build(),
+        "Non-blocking concurrency control requires the MOR table with simple 
bucket index");
+  }
+
   private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
     final Properties properties = new Properties();
     configs.forEach(properties::setProperty);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
index c11f3733983..f93a6b9ef0f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.model;
 import org.apache.hudi.common.config.EnumDescription;
 import org.apache.hudi.common.config.EnumFieldDescription;
 
+import java.util.Locale;
+
 /**
  * Different concurrency modes for write operations.
  */
@@ -35,9 +37,23 @@ public enum WriteConcurrencyMode {
   @EnumFieldDescription("Multiple writers can operate on the table with lazy 
conflict resolution "
       + "using locks. This means that only one writer succeeds if multiple 
writers write to the "
       + "same file group.")
-  OPTIMISTIC_CONCURRENCY_CONTROL;
+  OPTIMISTIC_CONCURRENCY_CONTROL,
+
+  // Multiple writer can perform write ops on a MOR table with non-blocking 
conflict resolution
+  @EnumFieldDescription("Multiple writers can operate on the table with 
non-blocking conflict resolution. "
+      + "The writers can write into the same file group with the conflicts 
resolved automatically "
+      + "by the query reader and the compactor.")
+  NON_BLOCKING_CONCURRENCY_CONTROL;
+
+  public boolean supportsMultiWriter() {
+    return this == OPTIMISTIC_CONCURRENCY_CONTROL || this == 
NON_BLOCKING_CONCURRENCY_CONTROL;
+  }
+
+  public static boolean supportsMultiWriter(String name) {
+    return 
WriteConcurrencyMode.valueOf(name.toUpperCase(Locale.ROOT)).supportsMultiWriter();
+  }
 
-  public boolean supportsOptimisticConcurrencyControl() {
+  public boolean isOptimisticConcurrencyControl() {
     return this == OPTIMISTIC_CONCURRENCY_CONTROL;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
index 9ffa71726e4..388503b5032 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -80,7 +80,7 @@ public class OptionsInference {
    * @see ClientIds
    */
   public static void setupClientId(Configuration conf) {
-    if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+    if (OptionsResolver.isMultiWriter(conf)) {
       // explicit client id always has higher priority
       if (!conf.contains(FlinkOptions.WRITE_CLIENT_ID)) {
         try (ClientIds clientIds = ClientIds.builder().conf(conf).build()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 18cc4a23ae4..dded4207b8a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -306,15 +306,14 @@ public class OptionsResolver {
    * Returns whether the writer txn should be guarded by lock.
    */
   public static boolean isLockRequired(Configuration conf) {
-    return conf.getBoolean(FlinkOptions.METADATA_ENABLED) || 
isOptimisticConcurrencyControl(conf);
+    return conf.getBoolean(FlinkOptions.METADATA_ENABLED) || 
isMultiWriter(conf);
   }
 
   /**
-   * Returns whether OCC is enabled.
+   * Returns whether multi-writer is enabled.
    */
-  public static boolean isOptimisticConcurrencyControl(Configuration conf) {
-    return conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-        
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+  public static boolean isMultiWriter(Configuration conf) {
+    return 
WriteConcurrencyMode.supportsMultiWriter(conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
 HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
   }
 
   /**
@@ -371,7 +370,8 @@ public class OptionsResolver {
    * Returns whether this is non-blocking concurrency control.
    */
   public static boolean isNonBlockingConcurrencyControl(Configuration config) {
-    return isMorTable(config) && isSimpleBucketIndexType(config) && 
isOptimisticConcurrencyControl(config);
+    return config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+        
.equalsIgnoreCase(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
   }
 
   public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 860dffe99eb..dfc713244d5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -202,7 +202,7 @@ public class StreamWriteOperatorCoordinator
       initHiveSync();
     }
     // start client id heartbeats for optimistic concurrency control
-    if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+    if (OptionsResolver.isMultiWriter(conf)) {
       initClientIds(conf);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index da712523c16..9364528eb33 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,6 +51,7 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Test cases for stream write.
@@ -520,27 +523,39 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
   // case1: txn2's time range is involved in txn1
   //      |----------- txn1 -----------|
   //              | ----- txn2 ----- |
-  @Test
-  public void testWriteMultiWriterInvolved() throws Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+  @ParameterizedTest
+  @EnumSource(value = WriteConcurrencyMode.class, names = 
{"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
+  public void testWriteMultiWriterInvolved(WriteConcurrencyMode 
writeConcurrencyMode) throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
writeConcurrencyMode.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
 
-    TestHarness pipeline1 = preparePipeline(conf)
-        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
-        .assertEmptyDataFiles();
-    // now start pipeline2 and commit the txn
-    Configuration conf2 = conf.clone();
-    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
-    preparePipeline(conf2)
-        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
-        .assertEmptyDataFiles()
-        .checkpoint(1)
-        .assertNextEvent()
-        .checkpointComplete(1)
-        .checkWrittenData(EXPECTED3, 1);
-    // step to commit the 2nd txn
-    validateConcurrentCommit(pipeline1);
+    if (OptionsResolver.isCowTable(conf) && 
OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+      validateNonBlockingConcurrencyControlConditions();
+    } else {
+      TestHarness pipeline1 = preparePipeline(conf)
+          .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+          .assertEmptyDataFiles();
+      // now start pipeline2 and commit the txn
+      Configuration conf2 = conf.clone();
+      conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+      preparePipeline(conf2)
+          .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+          .assertEmptyDataFiles()
+          .checkpoint(1)
+          .assertNextEvent()
+          .checkpointComplete(1)
+          .checkWrittenData(EXPECTED3, 1);
+      // step to commit the 2nd txn
+      validateConcurrentCommit(pipeline1);
+    }
+  }
+
+  protected void validateNonBlockingConcurrencyControlConditions() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> preparePipeline(conf),
+        "Non-blocking concurrency control requires the MOR table with simple 
bucket index");
   }
 
   private void validateConcurrentCommit(TestHarness pipeline) throws Exception 
{
@@ -562,32 +577,37 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
   // case2: txn2's time range has partial overlap with txn1
   //      |----------- txn1 -----------|
   //                       | ----- txn2 ----- |
-  @Test
-  public void testWriteMultiWriterPartialOverlapping() throws Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+  @ParameterizedTest
+  @EnumSource(value = WriteConcurrencyMode.class, names = 
{"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
+  public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode 
writeConcurrencyMode) throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
writeConcurrencyMode.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
 
-    TestHarness pipeline1 = preparePipeline(conf)
-        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
-        .assertEmptyDataFiles();
-    // now start pipeline2 and suspend the txn commit
-    Configuration conf2 = conf.clone();
-    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
-    TestHarness pipeline2 = preparePipeline(conf2)
-        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
-        .assertEmptyDataFiles();
-
-    // step to commit the 1st txn, should succeed
-    pipeline1.checkpoint(1)
-        .assertNextEvent()
-        .checkpointComplete(1)
-        .checkWrittenData(EXPECTED3, 1);
+    if (OptionsResolver.isCowTable(conf) && 
OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+      validateNonBlockingConcurrencyControlConditions();
+    } else {
+      TestHarness pipeline1 = preparePipeline(conf)
+          .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+          .assertEmptyDataFiles();
+      // now start pipeline2 and suspend the txn commit
+      Configuration conf2 = conf.clone();
+      conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+      TestHarness pipeline2 = preparePipeline(conf2)
+          .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+          .assertEmptyDataFiles();
+
+      // step to commit the 1st txn, should succeed
+      pipeline1.checkpoint(1)
+          .assertNextEvent()
+          .checkpointComplete(1)
+          .checkWrittenData(EXPECTED3, 1);
 
-    // step to commit the 2nd txn
-    // should success for concurrent modification of same fileGroups if using 
non-blocking concurrency control
-    // should throw exception otherwise
-    validateConcurrentCommit(pipeline2);
+      // step to commit the 2nd txn
+      // should success for concurrent modification of same fileGroups if 
using non-blocking concurrency control
+      // should throw exception otherwise
+      validateConcurrentCommit(pipeline2);
+    }
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 816c9a4e655..e07118cb052 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -89,7 +89,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
 
   @Test
   public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() 
throws Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
     // disable schedule compaction in writers
@@ -133,7 +133,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     // There is no base file in partition dir because there is no compaction 
yet.
     pipeline1.assertEmptyDataFiles();
 
-    // schedule compaction outside of all writers
+    // schedule compaction outside all writers
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
       assertNotNull(scheduleInstant.get());
@@ -163,7 +163,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
 
   @Test
   public void testNonBlockingConcurrencyControlWithInflightInstant() throws 
Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     // disable schedule compaction in writers
     conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
@@ -199,7 +199,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     pipeline2.checkpoint(1)
         .assertNextEvent();
 
-    // schedule compaction outside of all writers
+    // schedule compaction outside all writers
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
       assertNotNull(scheduleInstant.get());
@@ -234,8 +234,8 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
   //                       |----- txn2 ------|
   // the txn2 would fail to commit caused by conflict
   @Test
-  public void testBulkInsertInMultiWriter() throws Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+  public void testBulkInsertWithNonBlockingConcurrencyControl() throws 
Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
     // disable schedule compaction in writers
@@ -276,8 +276,8 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
   //      |----------- txn2 -----------|
   // both two txn would success to commit
   @Test
-  public void testBulkInsertInSequence() throws Exception {
-    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+  public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl() 
throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
     conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
     // disable schedule compaction in writers
@@ -316,7 +316,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
     pipeline2.checkWrittenData(tmpSnapshotResult, 1);
 
-    // schedule compaction outside of all writers
+    // schedule compaction outside all writers
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
       assertNotNull(scheduleInstant.get());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b2208b88b53..4d9ff3226b0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -551,7 +551,7 @@ public class TestWriteBase {
     protected String lastCompleteInstant() {
       // If using optimistic concurrency control, fetch last complete instant 
of current writer from ckp metadata
       // because there are multiple write clients commit to the timeline.
-      if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+      if (OptionsResolver.isMultiWriter(conf)) {
         return this.ckpMetadata.lastCompleteInstant();
       } else {
         // fetch the instant from timeline.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 965340c637a..10b77d4b91c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -165,7 +165,7 @@ class DefaultSource extends RelationProvider
 
   def validateMultiWriterConfigs(options: Map[String, String]) : Unit = {
     if (ConfigUtils.resolveEnum(classOf[WriteConcurrencyMode], 
options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
-      WRITE_CONCURRENCY_MODE.defaultValue())) == 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) {
+      WRITE_CONCURRENCY_MODE.defaultValue())).supportsMultiWriter()) {
       // ensure some valid value is set for identifier
       checkState(options.contains(STREAMING_CHECKPOINT_IDENTIFIER.key()), "For 
multi-writer scenarios, please set "
         + STREAMING_CHECKPOINT_IDENTIFIER.key() + ". Each writer should set 
different values for this identifier")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3889657faf2..a63f861480d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -181,7 +181,7 @@ class HoodieSparkSqlWriterInternal {
       } catch {
         case e: HoodieWriteConflictException =>
           val writeConcurrencyMode = 
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-          if 
(writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
 && counter < maxRetry) {
+          if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode) 
&& counter < maxRetry) {
             counter += 1
             log.warn(s"Conflict found. Retrying again for attempt no $counter")
           } else {


Reply via email to