This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 160f43c5a20 [HUDI-6480] Flink support non-blocking concurrency control 
(#9850)
160f43c5a20 is described below

commit 160f43c5a208f22d63fe309e00550bb223a8e018
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Oct 16 17:35:57 2023 +0800

    [HUDI-6480] Flink support non-blocking concurrency control (#9850)
    
    * modify existing test cases 'testWriteMultiWriterInvolved' and 
'testWriteMultiWriterPartialOverlapping'
    * Introduce two new test cases to test complex multiple writers with 
compaction
    * modify method name and add more cases
    * scheduling compaction now can use arbitrary instant time
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../apache/hudi/client/utils/TransactionUtils.java |   3 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 ++
 .../apache/hudi/index/bucket/BucketIdentifier.java |   7 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   2 +-
 .../apache/hudi/configuration/OptionsResolver.java |  14 ++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   2 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |   7 +-
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |  15 +-
 .../org/apache/hudi/sink/meta/CkpMetadata.java     |  13 ++
 .../java/org/apache/hudi/util/CompactionUtil.java  |  29 +---
 .../java/org/apache/hudi/util/StreamerUtil.java    |   1 +
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  35 +++--
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java | 158 +++++++++++++++++++++
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  26 ++--
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  13 +-
 .../org/apache/hudi/utils/TestCompactionUtil.java  |   4 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |  18 ++-
 17 files changed, 279 insertions(+), 79 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index d162fe28a62..15f6be8f79a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -67,7 +67,8 @@ public class TransactionUtils {
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
       boolean reloadActiveTimeline,
       Set<String> pendingInstants) throws HoodieWriteConflictException {
-    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+    // Skip to resolve conflict if using non-blocking concurrency control
+    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && 
!config.isNonBlockingConcurrencyControl()) {
       // deal with pendingInstants
       Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = 
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), 
pendingInstants);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index dd9d53bfe89..c9e9b94b1a9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1289,6 +1289,11 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getIndexType() == HoodieIndex.IndexType.BUCKET && 
getBucketIndexEngineType() == 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING;
   }
 
+  public boolean isSimpleBucketIndex() {
+    return HoodieIndex.IndexType.BUCKET.equals(getIndexType())
+        && 
HoodieIndex.BucketIndexEngineType.SIMPLE.equals(getBucketIndexEngineType());
+  }
+
   public boolean isConsistentLogicalTimestampEnabled() {
     return 
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
   }
@@ -2611,6 +2616,12 @@ public class HoodieWriteConfig extends HoodieConfig {
     return props.getInteger(WRITES_FILEID_ENCODING, 
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
+  public boolean isNonBlockingConcurrencyControl() {
+    return getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+        && isSimpleBucketIndex();
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 088e851fd8d..ff48a54366c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -33,6 +33,9 @@ public class BucketIdentifier implements Serializable {
   // Compatible with the spark bucket name
   private static final Pattern BUCKET_NAME = 
Pattern.compile(".*_(\\d+)(?:\\..*)?$");
 
+  // Ensure the same records keys from different writers are desired to be 
distributed into the same bucket
+  private static final String CONSTANT_FILE_ID_SUFFIX = 
"-0000-0000-0000-000000000000";
+
   public static int getBucketId(HoodieRecord record, String indexKeyFields, 
int numBuckets) {
     return getBucketId(record.getKey(), indexKeyFields, numBuckets);
   }
@@ -99,6 +102,10 @@ public class BucketIdentifier implements Serializable {
     return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
   }
 
+  public static String newBucketFileIdFixedSuffix(int bucketId) {
+    return bucketIdStr(bucketId) + CONSTANT_FILE_ID_SUFFIX;
+  }
+
   public static boolean isBucketFileName(String name) {
     return BUCKET_NAME.matcher(name).matches();
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 8378a1db07e..e4126785b24 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -303,7 +303,7 @@ public class HoodieFlinkWriteClient<T> extends
    * should be called before the Driver starts a new transaction.
    */
   public void preTxn(HoodieTableMetaClient metaClient) {
-    if (txnManager.isLockRequired()) {
+    if (txnManager.isLockRequired() && 
!config.isNonBlockingConcurrencyControl()) {
       // refresh the meta client which is reused
       metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 934e22f1139..18cc4a23ae4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -158,6 +158,13 @@ public class OptionsResolver {
     return isBucketIndexType(conf) && 
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
   }
 
+  /**
+   * Returns whether the table index is simple bucket index.
+   */
+  public static boolean isSimpleBucketIndexType(Configuration conf) {
+    return isBucketIndexType(conf) && 
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.SIMPLE);
+  }
+
   /**
    * Returns the default plan strategy class.
    */
@@ -360,6 +367,13 @@ public class OptionsResolver {
     return conf.getBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false);
   }
 
+  /**
+   * Returns whether this is non-blocking concurrency control.
+   */
+  public static boolean isNonBlockingConcurrencyControl(Configuration config) {
+    return isMorTable(config) && isSimpleBucketIndexType(config) && 
isOptimisticConcurrencyControl(config);
+  }
+
   public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
     return 
conf.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue())
         .equalsIgnoreCase(HoodieFailedWritesCleaningPolicy.LAZY.name());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index c580d43c351..fcb2b1dc295 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -460,7 +460,7 @@ public class StreamWriteOperatorCoordinator
   private void scheduleTableServices(Boolean committed) {
     // if compaction is on, schedule the compaction
     if (tableState.scheduleCompaction) {
-      CompactionUtil.scheduleCompaction(metaClient, writeClient, 
tableState.isDeltaTimeCompaction, committed);
+      CompactionUtil.scheduleCompaction(writeClient, 
tableState.isDeltaTimeCompaction, committed);
     }
     // if clustering is on, schedule the clustering
     if (tableState.scheduleClustering) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 63b9c4b3742..0cd66460c32 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -58,6 +58,8 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
 
   private String indexKeyFields;
 
+  private boolean isNonBlockingConcurrencyControl;
+
   /**
    * BucketID to file group mapping in each partition.
    * Map(partition -> Map(bucketId, fileID)).
@@ -85,6 +87,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     super.open(parameters);
     this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
     this.indexKeyFields = OptionsResolver.getIndexKeyField(config);
+    this.isNonBlockingConcurrencyControl = 
OptionsResolver.isNonBlockingConcurrencyControl(config);
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
     this.bucketIndex = new HashMap<>();
@@ -119,7 +122,9 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     } else if (bucketToFileId.containsKey(bucketNum)) {
       location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
     } else {
-      String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+      String newFileId = isNonBlockingConcurrencyControl
+          ? BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum)
+          : BucketIdentifier.newBucketFileIdPrefix(bucketNum);
       location = new HoodieRecordLocation("I", newFileId);
       bucketToFileId.put(bucketNum, newFileId);
       incBucketIndex.add(bucketId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index fec2c07f468..57e823ab21c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -221,16 +221,13 @@ public class HoodieFlinkCompactor {
 
       // checks the compaction plan and do compaction.
       if (cfg.schedule) {
-        Option<String> compactionInstantTimeOption = 
CompactionUtil.getCompactionInstantTime(table.getMetaClient());
-        if (compactionInstantTimeOption.isPresent()) {
-          boolean scheduled = 
writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), 
Option.empty());
-          if (!scheduled) {
-            // do nothing.
-            LOG.info("No compaction plan for this job ");
-            return;
-          }
-          table.getMetaClient().reloadActiveTimeline();
+        boolean scheduled = 
writeClient.scheduleCompaction(Option.empty()).isPresent();
+        if (!scheduled) {
+          // do nothing.
+          LOG.info("No compaction plan for this job ");
+          return;
         }
+        table.getMetaClient().reloadActiveTimeline();
       }
 
       // fetch the instant based on the configured execution sequence
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 90636bf6ac5..6d562412bb5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -200,6 +200,19 @@ public class CkpMetadata implements Serializable, 
AutoCloseable {
     return this.instantCache;
   }
 
+  @Nullable
+  @VisibleForTesting
+  public String lastCompleteInstant() {
+    load();
+    for (int i = this.messages.size() - 1; i >= 0; i--) {
+      CkpMessage ckpMsg = this.messages.get(i);
+      if (ckpMsg.isComplete()) {
+        return ckpMsg.getInstant();
+      }
+    }
+    return null;
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 3a6f7657d2a..7780204ae3d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -48,13 +48,11 @@ public class CompactionUtil {
   /**
    * Schedules a new compaction instant.
    *
-   * @param metaClient          The metadata client
    * @param writeClient         The write client
    * @param deltaTimeCompaction Whether the compaction is trigger by elapsed 
delta time
    * @param committed           Whether the last instant was committed 
successfully
    */
   public static void scheduleCompaction(
-      HoodieTableMetaClient metaClient,
       HoodieFlinkWriteClient<?> writeClient,
       boolean deltaTimeCompaction,
       boolean committed) {
@@ -63,32 +61,7 @@ public class CompactionUtil {
     } else if (deltaTimeCompaction) {
       // if there are no new commits and the compaction trigger strategy is 
based on elapsed delta time,
       // schedules the compaction anyway.
-      metaClient.reloadActiveTimeline();
-      Option<String> compactionInstantTime = 
CompactionUtil.getCompactionInstantTime(metaClient);
-      if (compactionInstantTime.isPresent()) {
-        writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(), 
Option.empty());
-      }
-    }
-  }
-
-  /**
-   * Gets compaction Instant time.
-   */
-  public static Option<String> getCompactionInstantTime(HoodieTableMetaClient 
metaClient) {
-    Option<HoodieInstant> firstPendingInstant = metaClient.getCommitsTimeline()
-        .filterPendingExcludingCompaction().firstInstant();
-    Option<HoodieInstant> lastCompleteInstant = 
metaClient.getActiveTimeline().getWriteTimeline()
-        .filterCompletedAndCompactionInstants().lastInstant();
-    if (firstPendingInstant.isPresent() && lastCompleteInstant.isPresent()) {
-      String firstPendingTimestamp = firstPendingInstant.get().getTimestamp();
-      String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp();
-      // Committed and pending compaction instants should have strictly lower 
timestamps
-      return StreamerUtil.medianInstantTime(firstPendingTimestamp, 
lastCompleteTimestamp);
-    } else if (!lastCompleteInstant.isPresent()) {
-      LOG.info("No instants to schedule the compaction plan");
-      return Option.empty();
-    } else {
-      return Option.of(metaClient.createNewInstantTime());
+      writeClient.scheduleCompaction(Option.empty());
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b4d9bd74092..077d0b52306 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -216,6 +216,7 @@ public class StreamerUtil {
         properties.put(option.key(), option.defaultValue());
       }
     }
+    properties.put(HoodieTableConfig.TYPE.key(), 
conf.getString(FlinkOptions.TABLE_TYPE));
     return new TypedProperties(properties);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 90aa86cd353..da712523c16 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.utils.TestWriteBase;
@@ -538,11 +539,24 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .assertNextEvent()
         .checkpointComplete(1)
         .checkWrittenData(EXPECTED3, 1);
-    // step to commit the 2nd txn, should throw exception
-    // for concurrent modification of same fileGroups
-    pipeline1.checkpoint(1)
-        .assertNextEvent()
-        .checkpointCompleteThrows(1, HoodieWriteConflictException.class, 
"Cannot resolve conflicts");
+    // step to commit the 2nd txn
+    validateConcurrentCommit(pipeline1);
+  }
+
+  private void validateConcurrentCommit(TestHarness pipeline) throws Exception 
{
+    pipeline
+        .checkpoint(1)
+        .assertNextEvent();
+    if (OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+      // NB-CC(non-blocking concurrency control) allows concurrent 
modification of the same fileGroup
+      pipeline
+          .checkpointComplete(1)
+          .checkWrittenData(EXPECTED3, 1);
+    } else {
+      // normal OCC(optimistic concurrency control) should throw exception 
otherwise
+      pipeline
+          .checkpointCompleteThrows(1, HoodieWriteConflictException.class, 
"Cannot resolve conflicts");
+    }
   }
 
   // case2: txn2's time range has partial overlap with txn1
@@ -566,17 +580,14 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
 
     // step to commit the 1st txn, should succeed
     pipeline1.checkpoint(1)
-        .assertNextEvent()
-        .checkpoint(1)
         .assertNextEvent()
         .checkpointComplete(1)
         .checkWrittenData(EXPECTED3, 1);
 
-    // step to commit the 2nd txn, should throw exception
-    // for concurrent modification of same fileGroups
-    pipeline2.checkpoint(1)
-        .assertNextEvent()
-        .checkpointCompleteThrows(1, HoodieWriteConflictException.class, 
"Cannot resolve conflicts");
+    // step to commit the 2nd txn
+    // should success for concurrent modification of same fileGroups if using 
non-blocking concurrency control
+    // should throw exception otherwise
+    validateConcurrentCommit(pipeline2);
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index abc5679367a..79320e1549f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -18,15 +18,31 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.utils.TestData.insertRow;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
 /**
  * Test cases for delta stream write with compaction.
  */
@@ -70,6 +86,148 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     return expected;
   }
 
+  @Test
+  public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() 
throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
+    conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the 
tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), null,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(dataset1)
+        .assertEmptyDataFiles();
+
+    // start pipeline2 and insert record: [id1,null,23,1,par1], suspend the tx 
commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), null, 23,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2)
+        .assertEmptyDataFiles();
+
+    // step to commit the 1st txn
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // step to commit the 2nd txn
+    pipeline2.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // snapshot result is [(id1,Danny,23,2,par1)] after two writers finish to 
commit
+    Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    pipeline2.checkWrittenData(tmpSnapshotResult, 1);
+
+    // There is no base file in partition dir because there is no compaction 
yet.
+    pipeline1.assertEmptyDataFiles();
+
+    // schedule compaction outside of all writers
+    try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
+      assertNotNull(scheduleInstant.get());
+    }
+
+    // step to commit the 3rd txn
+    // it also triggers inline compactor
+    List<RowData> dataset3 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+            TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+    pipeline1.consume(dataset3)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2);
+
+    // snapshot read result is [(id1,Danny,23,2,par1), (id3,Julian,53,4,par1)] 
after three writers finish to commit
+    Map<String, String> finalSnapshotResult = Collections.singletonMap(
+        "par1",
+        "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
+    pipeline1.checkWrittenData(finalSnapshotResult, 1);
+    // read optimized read result is [(id1,Danny,23,2,par1)]
+    // because the data files belongs 3rd commit is not included in the last 
compaction.
+    Map<String, String> readOptimizedResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+  }
+
+  @Test
+  public void testNonBlockingConcurrencyControlWithInflightInstant() throws 
Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    // start pipeline1 and insert record: [id1,Danny,23,1,par1], suspend the 
tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(dataset1)
+        .assertEmptyDataFiles();
+
+    // start pipeline2 and insert record: [id2,Stephen,34,2,par1], suspend the 
tx commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2)
+        .assertEmptyDataFiles();
+
+    // step to commit the 1st txn
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // step to flush the 2nd data, but not commit yet
+    pipeline2.checkpoint(1)
+        .assertNextEvent();
+
+    // schedule compaction outside of all writers
+    try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
+      assertNotNull(scheduleInstant.get());
+    }
+
+    // step to commit the 3rd txn and insert record: [id3,Julian,53,4,par1]
+    // it also triggers inline compactor
+    List<RowData> dataset3 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+            TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+    pipeline1.consume(dataset3)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2);
+
+    // snapshot read result is [(id1,Danny,23,1,par1), (id3,Julian,53,4,par1)] 
after 1st writer and 3rd writer finish to commit
+    // and the data of 2nd writer is not included because it is still in 
inflight state
+    Map<String, String> finalSnapshotResult = Collections.singletonMap(
+        "par1",
+        "[id1,par1,id1,Danny,23,1,par1, id3,par1,id3,Julian,53,4,par1]");
+    pipeline1.checkWrittenData(finalSnapshotResult, 1);
+    // read optimized read result is [(id1,Danny,23,1,par1)]
+    // because 2nd commit is in inflight state and
+    // the data files belongs 3rd commit is not included in the last 
compaction.
+    Map<String, String> readOptimizedResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,1,par1]");
+    TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index a1608674f15..7e66ab0f59e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -155,7 +155,7 @@ public class ITTestHoodieFlinkCompactor {
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
-      String compactionInstantTime = 
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+      String compactionInstantTime = scheduleCompactionPlan(writeClient);
 
       // generate compaction plan
       // should support configurable commit metadata
@@ -226,7 +226,7 @@ public class ITTestHoodieFlinkCompactor {
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
-      String compactionInstantTime = 
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+      String compactionInstantTime = scheduleCompactionPlan(writeClient);
 
       // try to upgrade or downgrade
       if (upgrade) {
@@ -337,8 +337,7 @@ public class ITTestHoodieFlinkCompactor {
 
     HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
 
-    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
-    
compactionInstantTimeList.add(scheduleCompactionPlan(table.getMetaClient(), 
writeClient));
+    compactionInstantTimeList.add(scheduleCompactionPlan(writeClient));
 
     // insert a new record to new partition, so that we can generate a new 
compaction plan
     String insertT1ForNewPartition = "insert into t1 values\n"
@@ -352,8 +351,8 @@ public class ITTestHoodieFlinkCompactor {
     // the reader metadata view is not complete
     writeClient = FlinkWriteClients.createWriteClient(conf);
 
-    table = writeClient.getHoodieTable();
-    
compactionInstantTimeList.add(scheduleCompactionPlan(table.getMetaClient(), 
writeClient));
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    compactionInstantTimeList.add(scheduleCompactionPlan(writeClient));
 
     List<Pair<String, HoodieCompactionPlan>> compactionPlans = new 
ArrayList<>(2);
     for (String compactionInstantTime : compactionInstantTimeList) {
@@ -475,7 +474,7 @@ public class ITTestHoodieFlinkCompactor {
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
-      String compactionInstantTime = 
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+      String compactionInstantTime = scheduleCompactionPlan(writeClient);
 
       // generate compaction plan
       // should support configurable commit metadata
@@ -509,14 +508,9 @@ public class ITTestHoodieFlinkCompactor {
     }
   }
 
-  private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, 
HoodieFlinkWriteClient<?> writeClient) {
-    boolean scheduled = false;
-    // judge whether there are any compaction operations.
-    Option<String> compactionInstantTimeOption = 
CompactionUtil.getCompactionInstantTime(metaClient);
-    if (compactionInstantTimeOption.isPresent()) {
-      scheduled = 
writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), 
Option.empty());
-    }
-    assertTrue(scheduled, "The compaction plan should be scheduled");
-    return compactionInstantTimeOption.get();
+  private String scheduleCompactionPlan(HoodieFlinkWriteClient<?> writeClient) 
{
+    Option<String> compactionInstant = 
writeClient.scheduleCompaction(Option.empty());
+    assertTrue(compactionInstant.isPresent(), "The compaction plan should be 
scheduled");
+    return compactionInstant.get();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 7332a0ae61a..1b301cb3cb1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -524,9 +524,16 @@ public class TestWriteBase {
     }
 
     protected String lastCompleteInstant() {
-      return OptionsResolver.isMorTable(conf)
-          ? TestUtils.getLastDeltaCompleteInstant(basePath)
-          : TestUtils.getLastCompleteInstant(basePath, 
HoodieTimeline.COMMIT_ACTION);
+      // If using optimistic concurrency control, fetch last complete instant 
of current writer from ckp metadata
+      // because there are multiple write clients commit to the timeline.
+      if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+        return this.ckpMetadata.lastCompleteInstant();
+      } else {
+        // fetch the instant from timeline.
+        return OptionsResolver.isMorTable(conf)
+            ? TestUtils.getLastDeltaCompleteInstant(basePath)
+            : TestUtils.getLastCompleteInstant(basePath, 
HoodieTimeline.COMMIT_ACTION);
+      }
     }
 
     public TestHarness checkCompletedInstantCount(int count) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 1c3ca7c1ea1..b30d090072e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -140,7 +140,7 @@ public class TestCompactionUtil {
     TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);
 
     try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
-      CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true);
+      CompactionUtil.scheduleCompaction(writeClient, true, true);
 
       Option<HoodieInstant> pendingCompactionInstant = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
       assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan 
expects to be scheduled");
@@ -150,7 +150,7 @@ public class TestCompactionUtil {
       TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too 
close
       writeClient.startCommit();
 
-      CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false);
+      CompactionUtil.scheduleCompaction(writeClient, true, false);
       int numCompactionCommits = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
       assertThat("Two compaction plan expects to be scheduled", 
numCompactionCommits, is(2));
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 65c8e82ada1..afaf3608049 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -964,16 +964,24 @@ public class TestData {
    */
   private static String filterOutVariables(GenericRecord genericRecord) {
     List<String> fields = new ArrayList<>();
-    fields.add(genericRecord.get("_hoodie_record_key").toString());
-    fields.add(genericRecord.get("_hoodie_partition_path").toString());
-    fields.add(genericRecord.get("uuid").toString());
-    fields.add(genericRecord.get("name").toString());
-    fields.add(genericRecord.get("age").toString());
+    fields.add(getFieldValue(genericRecord, "_hoodie_record_key"));
+    fields.add(getFieldValue(genericRecord, "_hoodie_partition_path"));
+    fields.add(getFieldValue(genericRecord, "uuid"));
+    fields.add(getFieldValue(genericRecord, "name"));
+    fields.add(getFieldValue(genericRecord, "age"));
     fields.add(genericRecord.get("ts").toString());
     fields.add(genericRecord.get("partition").toString());
     return String.join(",", fields);
   }
 
+  private static String getFieldValue(GenericRecord genericRecord, String 
fieldName) {
+    if (genericRecord.get(fieldName) != null) {
+      return genericRecord.get(fieldName).toString();
+    } else {
+      return null;
+    }
+  }
+
   public static BinaryRowData insertRow(Object... fields) {
     return insertRow(TestConfigurations.ROW_TYPE, fields);
   }


Reply via email to