This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f515d2a1ec [HUDI-6949] Support non-blocking concurrency control for 
spark jobs (#9921)
0f515d2a1ec is described below

commit 0f515d2a1ec7a73c5a566a89ffd2dfb30137d842
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Nov 6 20:31:50 2023 +0800

    [HUDI-6949] Support non-blocking concurrency control for spark jobs (#9921)
    
    Support non-blocking concurrency control for spark jobs.
    - All insert/upsert job always writes to log file instead of base file in 
order
       to avoid one file slice has multiple base files
    - All records with same key always write to the same file group
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../apache/hudi/index/bucket/BucketIdentifier.java |  16 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   6 +-
 .../RDDSimpleBucketBulkInsertPartitioner.java      |   6 +-
 .../action/commit/SparkBucketIndexPartitioner.java |   8 +-
 .../TestSparkNonBlockingConcurrencyControl.java    | 437 +++++++++++++++++++++
 .../hudi/common/model/WriteConcurrencyMode.java    |   8 +
 .../apache/hudi/configuration/OptionsResolver.java |   3 +-
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |   8 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |   4 +-
 10 files changed, 481 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 268438c172e..cd8f9f6b629 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2663,6 +2663,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
   }
 
+  public boolean isNonBlockingConcurrencyControl() {
+    return getWriteConcurrencyMode().isNonBlockingConcurrencyControl();
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ff48a54366c..475ee36c55e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -90,6 +90,14 @@ public class BucketIdentifier implements Serializable {
     return String.format("%08d", n);
   }
 
+  public static String newBucketFileIdPrefix(int bucketId, boolean fixed) {
+    return fixed ? newBucketFileIdFixedSuffix(bucketId) : 
newBucketFileIdPrefix(bucketId);
+  }
+
+  public static String newBucketFileIdPrefix(String bucketId, boolean fixed) {
+    return fixed ? newBucketFileIdFixedSuffix(bucketId) : 
newBucketFileIdPrefix(bucketId);
+  }
+
   public static String newBucketFileIdPrefix(int bucketId) {
     return newBucketFileIdPrefix(bucketIdStr(bucketId));
   }
@@ -102,8 +110,12 @@ public class BucketIdentifier implements Serializable {
     return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
   }
 
-  public static String newBucketFileIdFixedSuffix(int bucketId) {
-    return bucketIdStr(bucketId) + CONSTANT_FILE_ID_SUFFIX;
+  private static String newBucketFileIdFixedSuffix(String bucketId) {
+    return bucketId + CONSTANT_FILE_ID_SUFFIX;
+  }
+
+  private static String newBucketFileIdFixedSuffix(int bucketId) {
+    return newBucketFileIdFixedSuffix(bucketIdStr(bucketId));
   }
 
   public static boolean isBucketFileName(String name) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index cc1932ce27f..c00dd09353e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -528,8 +528,10 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       appendDataAndDeleteBlocks(header, true);
       recordItr = null;
 
-      writer.close();
-
+      if (writer != null) {
+        writer.close();
+        writer = null;
+      }
       // update final size, once for all log files
       // TODO we can actually deduce file size purely from AppendResult (based 
on offset and size
       //      of the appended block)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
index 3fd5fd4f0a4..bbb9eaf4f4e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
@@ -37,9 +37,12 @@ import java.util.stream.Collectors;
 
 public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> {
 
+  private final boolean isNonBlockingConcurrencyControl;
+
   public RDDSimpleBucketBulkInsertPartitioner(HoodieTable table) {
     super(table, null, false);
     ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSimpleBucketIndex);
+    this.isNonBlockingConcurrencyControl = 
table.getConfig().isNonBlockingConcurrencyControl();
   }
 
   @Override
@@ -93,7 +96,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
           // Generate a file that does not exist
           for (int i = 0; i < numBuckets; i++) {
             if (!existsBucketID.contains(i)) {
-              String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i);
+              String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i, 
isNonBlockingConcurrencyControl);
               fileIdPrefixToBucketIndex.put(fileIdPrefix, 
fileIdPfxList.size());
               fileIdPfxList.add(fileIdPrefix);
               doAppend.add(false);
@@ -104,4 +107,3 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
         }));
   }
 }
-
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index a246a7150c9..0817d8cb7b1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -68,6 +68,8 @@ public class SparkBucketIndexPartitioner<T> extends
    */
   private Map<String, Set<String>> updatePartitionPathFileIds;
 
+  private final boolean isNonBlockingConcurrencyControl;
+
   public SparkBucketIndexPartitioner(WorkloadProfile profile,
                                      HoodieEngineContext context,
                                      HoodieTable table,
@@ -91,6 +93,7 @@ public class SparkBucketIndexPartitioner<T> extends
     assignUpdates(profile);
     WriteOperationType operationType = profile.getOperationType();
     this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || 
INSERT_OVERWRITE_TABLE.equals(operationType);
+    this.isNonBlockingConcurrencyControl = 
config.isNonBlockingConcurrencyControl();
   }
 
   private void assignUpdates(WorkloadProfile profile) {
@@ -124,7 +127,10 @@ public class SparkBucketIndexPartitioner<T> extends
     if (fileIdOption.isPresent()) {
       return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), 
partitionPath);
     } else {
-      return new BucketInfo(BucketType.INSERT, 
BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
+      // Always write into log file instead of base file if using NB-CC
+      BucketType bucketType = isNonBlockingConcurrencyControl ? 
BucketType.UPDATE : BucketType.INSERT;
+      String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId, 
isNonBlockingConcurrencyControl);
+      return new BucketInfo(bucketType, fileIdPrefix, partitionPath);
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
new file mode 100644
index 00000000000..1e113551b53
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import 
org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("functional")
+public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctionalTestHarness {
+
+  String jsonSchema = "{\n"
+      + "  \"type\": \"record\",\n"
+      + "  \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+      + "  \"fields\": [\n"
+      + "    {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", 
\"string\"]},\n"
+      + "    {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", 
\"string\"]},\n"
+      + "    {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", 
\"string\"]},\n"
+      + "    {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", 
\"string\"]},\n"
+      + "    {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", 
\"string\"]},\n"
+      + "    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+      + "    {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n"
+      + "    {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n"
+      + "    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+      + "    {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n"
+      + "  ]\n"
+      + "}";
+
+  private Schema schema;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    schema = new Schema.Parser().parse(jsonSchema);
+  }
+
+  @Test
+  public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() 
throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+
+    // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // start the 2nd txn and insert record: [id1,null,23,2,par1], suspend the 
tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.INSERT);
+
+    // step to commit the 1st txn
+    client1.commitStats(
+        insertTime1,
+        context().parallelize(writeStatuses1, 1),
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 2nd txn
+    client2.commitStats(
+        insertTime2,
+        context().parallelize(writeStatuses2, 1),
+        
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // There is no base file in partition dir because there is no compaction 
yet.
+    assertFalse(fileExists(), "No base data files should have been created");
+
+    // do compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+    client1.compact(compactionTime);
+
+    // result is [(id1,Danny,23,2,par1)]
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  @Test
+  public void testNonBlockingConcurrencyControlWithInflightInstant() throws 
Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+
+    // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // start the 2nd txn and insert record: [id1,null,23,2,par1], suspend the 
tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    writeData(client2, insertTime2, dataset2, false, 
WriteOperationType.INSERT);
+
+    // step to commit the 1st txn
+    client1.commitStats(
+        insertTime1,
+        context().parallelize(writeStatuses1, 1),
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // schedule compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+
+    // step to commit the 3rd txn, insert record: [id3,Julian,53,4,par1] and 
commit 3rd txn
+    List<String> dataset3 = Collections.singletonList("id3,Julian,53,4,par1");
+    String insertTime3 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses3 = writeData(client1, insertTime3, 
dataset3, false, WriteOperationType.INSERT);
+    client1.commitStats(
+        insertTime3,
+        context().parallelize(writeStatuses3, 1),
+        
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // do compaction
+    client1.compact(compactionTime);
+
+    // read optimized result is [(id1,Danny,23,1,par1)]
+    // because 2nd commit is in inflight state and
+    // the data files belongs 3rd commit is not included in the last 
compaction.
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,null,1,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //      |----------- txn1 -----------|
+  //                       |----- txn2 ------|
+  // the txn2 would fail to commit caused by conflict
+  @Test
+  public void testBulkInsertInMultiWriter() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+    // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // start the 2nd txn and bulk insert record: [id1,null,23,2,par1], suspend 
the tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.BULK_INSERT);
+
+    // step to commit the 1st txn
+    client1.commitStats(
+        insertTime1,
+        context().parallelize(writeStatuses1, 1),
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 2nd txn
+    assertThrows(HoodieWriteConflictException.class, () -> {
+      client2.commitStats(
+          insertTime2,
+          context().parallelize(writeStatuses2, 1),
+          
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+          Option.empty(),
+          metaClient.getCommitActionType());
+    });
+  }
+
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //                       |----- txn1 ------|
+  //      |--- txn2 ----|
+  // both two txn would success to commit
+  @Test
+  public void testBulkInsertInSequence() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+
+    // start the 1st txn and bulk insert record: [id1,Danny,null,1,par1], 
commit the 1st txn
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    writeData(client1, insertTime1, dataset1, true, 
WriteOperationType.BULK_INSERT);
+
+    // start the 1st txn and insert record: [id1,null,23,2,par1], commit the 
2nd txn
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    writeData(client2, insertTime2, dataset2, true, WriteOperationType.INSERT);
+
+    // do compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+    client1.compact(compactionTime);
+
+    // result is [(id1,Danny,23,2,par1)]
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  private HoodieWriteConfig createHoodieWriteConfig() {
+    Properties props = getPropertiesForKeyGen(true);
+    props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+    String basePath = basePath();
+    return HoodieWriteConfig.newBuilder()
+        .forTable("test")
+        .withPath(basePath)
+        .withSchema(jsonSchema)
+        .withParallelism(2, 2)
+        .withAutoCommit(false)
+        .withPayloadConfig(
+            HoodiePayloadConfig.newBuilder()
+                .withPayloadClass(PartialUpdateAvroPayload.class.getName())
+                .withPayloadOrderingField("ts")
+                .build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .parquetMaxFileSize(1024).build())
+        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+            
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withBucketNum("1")
+            .build())
+        .withPopulateMetaFields(true)
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+        // Timeline-server-based markers are not used for multi-writer tests
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
BucketIndexConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .build();
+  }
+
+  private void checkWrittenData(
+      Map<String, String> expected,
+      int partitions) throws IOException {
+    File baseFile = tempDir.toFile();
+    assert baseFile.isDirectory();
+    FileFilter filter = file -> !file.getName().startsWith(".");
+    File[] partitionDirs = baseFile.listFiles(filter);
+    assertNotNull(partitionDirs);
+    assertThat(partitionDirs.length, is(partitions));
+    for (File partitionDir : partitionDirs) {
+      File[] dataFiles = partitionDir.listFiles(filter);
+      assertNotNull(dataFiles);
+      File latestDataFile = Arrays.stream(dataFiles)
+          .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))
+          .orElse(dataFiles[0]);
+      ParquetReader<GenericRecord> reader = AvroParquetReader
+          .<GenericRecord>builder(new 
Path(latestDataFile.getAbsolutePath())).build();
+      List<String> readBuffer = new ArrayList<>();
+      GenericRecord nextRecord = reader.read();
+      while (nextRecord != null) {
+        readBuffer.add(filterOutVariables(nextRecord));
+        nextRecord = reader.read();
+      }
+      readBuffer.sort(Comparator.naturalOrder());
+      assertThat(readBuffer.toString(), 
is(expected.get(partitionDir.getName())));
+    }
+  }
+
+  private static String filterOutVariables(GenericRecord genericRecord) {
+    List<String> fields = new ArrayList<>();
+    fields.add(getFieldValue(genericRecord, "_hoodie_record_key"));
+    fields.add(getFieldValue(genericRecord, "_hoodie_partition_path"));
+    fields.add(getFieldValue(genericRecord, "id"));
+    fields.add(getFieldValue(genericRecord, "name"));
+    fields.add(getFieldValue(genericRecord, "age"));
+    fields.add(genericRecord.get("ts").toString());
+    fields.add(genericRecord.get("part").toString());
+    return String.join(",", fields);
+  }
+
+  private static String getFieldValue(GenericRecord genericRecord, String 
fieldName) {
+    if (genericRecord.get(fieldName) != null) {
+      return genericRecord.get(fieldName).toString();
+    } else {
+      return null;
+    }
+  }
+
+  private boolean fileExists() {
+    List<File> dirsToCheck = new ArrayList<>();
+    dirsToCheck.add(tempDir.toFile());
+    while (!dirsToCheck.isEmpty()) {
+      File dir = dirsToCheck.remove(0);
+      for (File file : Objects.requireNonNull(dir.listFiles())) {
+        if (!file.getName().startsWith(".")) {
+          if (file.isDirectory()) {
+            dirsToCheck.add(file);
+          } else {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private GenericRecord str2GenericRecord(String str) {
+    GenericRecord record = new GenericData.Record(schema);
+    String[] fieldValues = str.split(",");
+    ValidationUtils.checkArgument(fieldValues.length == 5, "Valid record must 
have 5 fields");
+    record.put("id", StringUtils.isNullOrEmpty(fieldValues[0]) ? null : 
fieldValues[0]);
+    record.put("name", StringUtils.isNullOrEmpty(fieldValues[1]) ? null : 
fieldValues[1]);
+    record.put("age", StringUtils.isNullOrEmpty(fieldValues[2]) ? null : 
Integer.parseInt(fieldValues[2]));
+    record.put("ts", StringUtils.isNullOrEmpty(fieldValues[3]) ? null : 
Long.parseLong(fieldValues[3]));
+    record.put("part", StringUtils.isNullOrEmpty(fieldValues[4]) ? null : 
fieldValues[4]);
+    return record;
+  }
+
+  private List<HoodieRecord> str2HoodieRecord(List<String> records) {
+    return records.stream().map(recordStr -> {
+      GenericRecord record = str2GenericRecord(recordStr);
+      PartialUpdateAvroPayload payload = new PartialUpdateAvroPayload(record, 
(Long) record.get("ts"));
+      return new HoodieAvroRecord<>(new HoodieKey((String) record.get("id"), 
(String) record.get("part")), payload);
+    }).collect(Collectors.toList());
+  }
+
+  private List<WriteStatus> writeData(
+      SparkRDDWriteClient client,
+      String instant,
+      List<String> records,
+      boolean doCommit,
+      WriteOperationType operationType) {
+    List<HoodieRecord> recordList = str2HoodieRecord(records);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(recordList, 2);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    client.startCommitWithTime(instant);
+    List<WriteStatus> writeStatuses;
+    switch (operationType) {
+      case INSERT:
+        writeStatuses = client.insert(writeRecords, instant).collect();
+        break;
+      case UPSERT:
+        writeStatuses = client.upsert(writeRecords, instant).collect();
+        break;
+      case BULK_INSERT:
+        writeStatuses = client.bulkInsert(writeRecords, instant).collect();
+        break;
+      default:
+        throw new UnsupportedOperationException(operationType + " is not 
supported yet in this test!");
+    }
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
+    if (doCommit) {
+      List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+      boolean committed = client.commitStats(instant, 
context().parallelize(writeStatuses, 1), writeStats, Option.empty(), 
metaClient.getCommitActionType());
+      Assertions.assertTrue(committed);
+    }
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    return writeStatuses;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
index f93a6b9ef0f..e86f179540d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
@@ -56,4 +56,12 @@ public enum WriteConcurrencyMode {
   public boolean isOptimisticConcurrencyControl() {
     return this == OPTIMISTIC_CONCURRENCY_CONTROL;
   }
+
+  public boolean isNonBlockingConcurrencyControl() {
+    return this == NON_BLOCKING_CONCURRENCY_CONTROL;
+  }
+
+  public static boolean isNonBlockingConcurrencyControl(String name) {
+    return 
WriteConcurrencyMode.valueOf(name.toUpperCase()).isNonBlockingConcurrencyControl();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6167a35f765..59c6e79ddc1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -377,8 +377,7 @@ public class OptionsResolver {
    * Returns whether this is non-blocking concurrency control.
    */
   public static boolean isNonBlockingConcurrencyControl(Configuration config) {
-    return config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-        
.equalsIgnoreCase(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
+    return 
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
 HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
   }
 
   public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index c25a69df2c1..1047a4f5c00 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -97,13 +97,7 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
     String partition = keyGen.getPartitionPath(record);
     final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, 
numBuckets);
     String bucketId = partition + bucketNum;
-    return bucketIdToFileId.computeIfAbsent(bucketId, k -> {
-      if (needFixedFileIdSuffix) {
-        return BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum);
-      } else {
-        return BucketIdentifier.newBucketFileIdPrefix(bucketNum);
-      }
-    });
+    return bucketIdToFileId.computeIfAbsent(bucketId, k -> 
BucketIdentifier.newBucketFileIdPrefix(bucketNum, needFixedFileIdSuffix));
   }
 
   public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean 
needFixedFileIdSuffix) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 0cd66460c32..0129396ea52 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -122,9 +122,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     } else if (bucketToFileId.containsKey(bucketNum)) {
       location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
     } else {
-      String newFileId = isNonBlockingConcurrencyControl
-          ? BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum)
-          : BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+      String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum, 
isNonBlockingConcurrencyControl);
       location = new HoodieRecordLocation("I", newFileId);
       bucketToFileId.put(bucketNum, newFileId);
       incBucketIndex.add(bucketId);

Reply via email to