This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0f515d2a1ec [HUDI-6949] Support non-blocking concurrency control for
spark jobs (#9921)
0f515d2a1ec is described below
commit 0f515d2a1ec7a73c5a566a89ffd2dfb30137d842
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Nov 6 20:31:50 2023 +0800
[HUDI-6949] Support non-blocking concurrency control for spark jobs (#9921)
Support non-blocking concurrency control for spark jobs.
- All insert/upsert job always writes to log file instead of base file in
order
to avoid one file slice has multiple base files
- All records with same key always write to the same file group
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../apache/hudi/index/bucket/BucketIdentifier.java | 16 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 6 +-
.../RDDSimpleBucketBulkInsertPartitioner.java | 6 +-
.../action/commit/SparkBucketIndexPartitioner.java | 8 +-
.../TestSparkNonBlockingConcurrencyControl.java | 437 +++++++++++++++++++++
.../hudi/common/model/WriteConcurrencyMode.java | 8 +
.../apache/hudi/configuration/OptionsResolver.java | 3 +-
.../sink/bucket/BucketBulkInsertWriterHelper.java | 8 +-
.../sink/bucket/BucketStreamWriteFunction.java | 4 +-
10 files changed, 481 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 268438c172e..cd8f9f6b629 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2663,6 +2663,10 @@ public class HoodieWriteConfig extends HoodieConfig {
}
}
+ public boolean isNonBlockingConcurrencyControl() {
+ return getWriteConcurrencyMode().isNonBlockingConcurrencyControl();
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ff48a54366c..475ee36c55e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -90,6 +90,14 @@ public class BucketIdentifier implements Serializable {
return String.format("%08d", n);
}
+ public static String newBucketFileIdPrefix(int bucketId, boolean fixed) {
+ return fixed ? newBucketFileIdFixedSuffix(bucketId) :
newBucketFileIdPrefix(bucketId);
+ }
+
+ public static String newBucketFileIdPrefix(String bucketId, boolean fixed) {
+ return fixed ? newBucketFileIdFixedSuffix(bucketId) :
newBucketFileIdPrefix(bucketId);
+ }
+
public static String newBucketFileIdPrefix(int bucketId) {
return newBucketFileIdPrefix(bucketIdStr(bucketId));
}
@@ -102,8 +110,12 @@ public class BucketIdentifier implements Serializable {
return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
}
- public static String newBucketFileIdFixedSuffix(int bucketId) {
- return bucketIdStr(bucketId) + CONSTANT_FILE_ID_SUFFIX;
+ private static String newBucketFileIdFixedSuffix(String bucketId) {
+ return bucketId + CONSTANT_FILE_ID_SUFFIX;
+ }
+
+ private static String newBucketFileIdFixedSuffix(int bucketId) {
+ return newBucketFileIdFixedSuffix(bucketIdStr(bucketId));
}
public static boolean isBucketFileName(String name) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index cc1932ce27f..c00dd09353e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -528,8 +528,10 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
appendDataAndDeleteBlocks(header, true);
recordItr = null;
- writer.close();
-
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
// update final size, once for all log files
// TODO we can actually deduce file size purely from AppendResult (based
on offset and size
// of the appended block)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
index 3fd5fd4f0a4..bbb9eaf4f4e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
@@ -37,9 +37,12 @@ import java.util.stream.Collectors;
public class RDDSimpleBucketBulkInsertPartitioner<T extends
HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> {
+ private final boolean isNonBlockingConcurrencyControl;
+
public RDDSimpleBucketBulkInsertPartitioner(HoodieTable table) {
super(table, null, false);
ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSimpleBucketIndex);
+ this.isNonBlockingConcurrencyControl =
table.getConfig().isNonBlockingConcurrencyControl();
}
@Override
@@ -93,7 +96,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends
HoodieRecordPayload>
// Generate a file that does not exist
for (int i = 0; i < numBuckets; i++) {
if (!existsBucketID.contains(i)) {
- String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i);
+ String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i,
isNonBlockingConcurrencyControl);
fileIdPrefixToBucketIndex.put(fileIdPrefix,
fileIdPfxList.size());
fileIdPfxList.add(fileIdPrefix);
doAppend.add(false);
@@ -104,4 +107,3 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends
HoodieRecordPayload>
}));
}
}
-
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index a246a7150c9..0817d8cb7b1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -68,6 +68,8 @@ public class SparkBucketIndexPartitioner<T> extends
*/
private Map<String, Set<String>> updatePartitionPathFileIds;
+ private final boolean isNonBlockingConcurrencyControl;
+
public SparkBucketIndexPartitioner(WorkloadProfile profile,
HoodieEngineContext context,
HoodieTable table,
@@ -91,6 +93,7 @@ public class SparkBucketIndexPartitioner<T> extends
assignUpdates(profile);
WriteOperationType operationType = profile.getOperationType();
this.isOverwrite = INSERT_OVERWRITE.equals(operationType) ||
INSERT_OVERWRITE_TABLE.equals(operationType);
+ this.isNonBlockingConcurrencyControl =
config.isNonBlockingConcurrencyControl();
}
private void assignUpdates(WorkloadProfile profile) {
@@ -124,7 +127,10 @@ public class SparkBucketIndexPartitioner<T> extends
if (fileIdOption.isPresent()) {
return new BucketInfo(BucketType.UPDATE, fileIdOption.get(),
partitionPath);
} else {
- return new BucketInfo(BucketType.INSERT,
BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
+ // Always write into log file instead of base file if using NB-CC
+ BucketType bucketType = isNonBlockingConcurrencyControl ?
BucketType.UPDATE : BucketType.INSERT;
+ String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId,
isNonBlockingConcurrencyControl);
+ return new BucketInfo(bucketType, fileIdPrefix, partitionPath);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
new file mode 100644
index 00000000000..1e113551b53
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import
org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("functional")
+public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctionalTestHarness {
+
+ String jsonSchema = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n"
+ + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ + " {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n"
+ + " ]\n"
+ + "}";
+
+ private Schema schema;
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+
+ @Test
+ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload()
throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+
+ // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // start the 2nd txn and insert record: [id1,null,23,2,par1], suspend the
tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.INSERT);
+
+ // step to commit the 1st txn
+ client1.commitStats(
+ insertTime1,
+ context().parallelize(writeStatuses1, 1),
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 2nd txn
+ client2.commitStats(
+ insertTime2,
+ context().parallelize(writeStatuses2, 1),
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // There is no base file in partition dir because there is no compaction
yet.
+ assertFalse(fileExists(), "No base data files should have been created");
+
+ // do compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+ client1.compact(compactionTime);
+
+ // result is [(id1,Danny,23,2,par1)]
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ @Test
+ public void testNonBlockingConcurrencyControlWithInflightInstant() throws
Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+
+ // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // start the 2nd txn and insert record: [id1,null,23,2,par1], suspend the
tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ writeData(client2, insertTime2, dataset2, false,
WriteOperationType.INSERT);
+
+ // step to commit the 1st txn
+ client1.commitStats(
+ insertTime1,
+ context().parallelize(writeStatuses1, 1),
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // schedule compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+
+ // step to commit the 3rd txn, insert record: [id3,Julian,53,4,par1] and
commit 3rd txn
+ List<String> dataset3 = Collections.singletonList("id3,Julian,53,4,par1");
+ String insertTime3 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses3 = writeData(client1, insertTime3,
dataset3, false, WriteOperationType.INSERT);
+ client1.commitStats(
+ insertTime3,
+ context().parallelize(writeStatuses3, 1),
+
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // do compaction
+ client1.compact(compactionTime);
+
+ // read optimized result is [(id1,Danny,23,1,par1)]
+ // because 2nd commit is in inflight state and
+ // the data files belongs 3rd commit is not included in the last
compaction.
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,null,1,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+ // |----------- txn1 -----------|
+ // |----- txn2 ------|
+ // the txn2 would fail to commit caused by conflict
+ @Test
+ public void testBulkInsertInMultiWriter() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+ // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // start the 2nd txn and bulk insert record: [id1,null,23,2,par1], suspend
the tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.BULK_INSERT);
+
+ // step to commit the 1st txn
+ client1.commitStats(
+ insertTime1,
+ context().parallelize(writeStatuses1, 1),
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 2nd txn
+ assertThrows(HoodieWriteConflictException.class, () -> {
+ client2.commitStats(
+ insertTime2,
+ context().parallelize(writeStatuses2, 1),
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+ });
+ }
+
+ // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+ // |----- txn1 ------|
+ // |--- txn2 ----|
+ // both two txn would success to commit
+ @Test
+ public void testBulkInsertInSequence() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+
+ // start the 1st txn and bulk insert record: [id1,Danny,null,1,par1],
commit the 1st txn
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ writeData(client1, insertTime1, dataset1, true,
WriteOperationType.BULK_INSERT);
+
+ // start the 1st txn and insert record: [id1,null,23,2,par1], commit the
2nd txn
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ writeData(client2, insertTime2, dataset2, true, WriteOperationType.INSERT);
+
+ // do compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+ client1.compact(compactionTime);
+
+ // result is [(id1,Danny,23,2,par1)]
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ private HoodieWriteConfig createHoodieWriteConfig() {
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+ String basePath = basePath();
+ return HoodieWriteConfig.newBuilder()
+ .forTable("test")
+ .withPath(basePath)
+ .withSchema(jsonSchema)
+ .withParallelism(2, 2)
+ .withAutoCommit(false)
+ .withPayloadConfig(
+ HoodiePayloadConfig.newBuilder()
+ .withPayloadClass(PartialUpdateAvroPayload.class.getName())
+ .withPayloadOrderingField("ts")
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .parquetMaxFileSize(1024).build())
+ .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+ .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+ .withBucketNum("1")
+ .build())
+ .withPopulateMetaFields(true)
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL)
+ // Timeline-server-based markers are not used for multi-writer tests
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
BucketIndexConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .build();
+ }
+
+ private void checkWrittenData(
+ Map<String, String> expected,
+ int partitions) throws IOException {
+ File baseFile = tempDir.toFile();
+ assert baseFile.isDirectory();
+ FileFilter filter = file -> !file.getName().startsWith(".");
+ File[] partitionDirs = baseFile.listFiles(filter);
+ assertNotNull(partitionDirs);
+ assertThat(partitionDirs.length, is(partitions));
+ for (File partitionDir : partitionDirs) {
+ File[] dataFiles = partitionDir.listFiles(filter);
+ assertNotNull(dataFiles);
+ File latestDataFile = Arrays.stream(dataFiles)
+ .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))
+ .orElse(dataFiles[0]);
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(new
Path(latestDataFile.getAbsolutePath())).build();
+ List<String> readBuffer = new ArrayList<>();
+ GenericRecord nextRecord = reader.read();
+ while (nextRecord != null) {
+ readBuffer.add(filterOutVariables(nextRecord));
+ nextRecord = reader.read();
+ }
+ readBuffer.sort(Comparator.naturalOrder());
+ assertThat(readBuffer.toString(),
is(expected.get(partitionDir.getName())));
+ }
+ }
+
+ private static String filterOutVariables(GenericRecord genericRecord) {
+ List<String> fields = new ArrayList<>();
+ fields.add(getFieldValue(genericRecord, "_hoodie_record_key"));
+ fields.add(getFieldValue(genericRecord, "_hoodie_partition_path"));
+ fields.add(getFieldValue(genericRecord, "id"));
+ fields.add(getFieldValue(genericRecord, "name"));
+ fields.add(getFieldValue(genericRecord, "age"));
+ fields.add(genericRecord.get("ts").toString());
+ fields.add(genericRecord.get("part").toString());
+ return String.join(",", fields);
+ }
+
+ private static String getFieldValue(GenericRecord genericRecord, String
fieldName) {
+ if (genericRecord.get(fieldName) != null) {
+ return genericRecord.get(fieldName).toString();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean fileExists() {
+ List<File> dirsToCheck = new ArrayList<>();
+ dirsToCheck.add(tempDir.toFile());
+ while (!dirsToCheck.isEmpty()) {
+ File dir = dirsToCheck.remove(0);
+ for (File file : Objects.requireNonNull(dir.listFiles())) {
+ if (!file.getName().startsWith(".")) {
+ if (file.isDirectory()) {
+ dirsToCheck.add(file);
+ } else {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private GenericRecord str2GenericRecord(String str) {
+ GenericRecord record = new GenericData.Record(schema);
+ String[] fieldValues = str.split(",");
+ ValidationUtils.checkArgument(fieldValues.length == 5, "Valid record must
have 5 fields");
+ record.put("id", StringUtils.isNullOrEmpty(fieldValues[0]) ? null :
fieldValues[0]);
+ record.put("name", StringUtils.isNullOrEmpty(fieldValues[1]) ? null :
fieldValues[1]);
+ record.put("age", StringUtils.isNullOrEmpty(fieldValues[2]) ? null :
Integer.parseInt(fieldValues[2]));
+ record.put("ts", StringUtils.isNullOrEmpty(fieldValues[3]) ? null :
Long.parseLong(fieldValues[3]));
+ record.put("part", StringUtils.isNullOrEmpty(fieldValues[4]) ? null :
fieldValues[4]);
+ return record;
+ }
+
+ private List<HoodieRecord> str2HoodieRecord(List<String> records) {
+ return records.stream().map(recordStr -> {
+ GenericRecord record = str2GenericRecord(recordStr);
+ PartialUpdateAvroPayload payload = new PartialUpdateAvroPayload(record,
(Long) record.get("ts"));
+ return new HoodieAvroRecord<>(new HoodieKey((String) record.get("id"),
(String) record.get("part")), payload);
+ }).collect(Collectors.toList());
+ }
+
+ private List<WriteStatus> writeData(
+ SparkRDDWriteClient client,
+ String instant,
+ List<String> records,
+ boolean doCommit,
+ WriteOperationType operationType) {
+ List<HoodieRecord> recordList = str2HoodieRecord(records);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(recordList, 2);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ client.startCommitWithTime(instant);
+ List<WriteStatus> writeStatuses;
+ switch (operationType) {
+ case INSERT:
+ writeStatuses = client.insert(writeRecords, instant).collect();
+ break;
+ case UPSERT:
+ writeStatuses = client.upsert(writeRecords, instant).collect();
+ break;
+ case BULK_INSERT:
+ writeStatuses = client.bulkInsert(writeRecords, instant).collect();
+ break;
+ default:
+ throw new UnsupportedOperationException(operationType + " is not
supported yet in this test!");
+ }
+ org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
+ if (doCommit) {
+ List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ boolean committed = client.commitStats(instant,
context().parallelize(writeStatuses, 1), writeStats, Option.empty(),
metaClient.getCommitActionType());
+ Assertions.assertTrue(committed);
+ }
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return writeStatuses;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
index f93a6b9ef0f..e86f179540d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
@@ -56,4 +56,12 @@ public enum WriteConcurrencyMode {
public boolean isOptimisticConcurrencyControl() {
return this == OPTIMISTIC_CONCURRENCY_CONTROL;
}
+
+ public boolean isNonBlockingConcurrencyControl() {
+ return this == NON_BLOCKING_CONCURRENCY_CONTROL;
+ }
+
+ public static boolean isNonBlockingConcurrencyControl(String name) {
+ return
WriteConcurrencyMode.valueOf(name.toUpperCase()).isNonBlockingConcurrencyControl();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6167a35f765..59c6e79ddc1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -377,8 +377,7 @@ public class OptionsResolver {
* Returns whether this is non-blocking concurrency control.
*/
public static boolean isNonBlockingConcurrencyControl(Configuration config) {
- return config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-
.equalsIgnoreCase(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
+ return
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
}
public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index c25a69df2c1..1047a4f5c00 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -97,13 +97,7 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
String partition = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys,
numBuckets);
String bucketId = partition + bucketNum;
- return bucketIdToFileId.computeIfAbsent(bucketId, k -> {
- if (needFixedFileIdSuffix) {
- return BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum);
- } else {
- return BucketIdentifier.newBucketFileIdPrefix(bucketNum);
- }
- });
+ return bucketIdToFileId.computeIfAbsent(bucketId, k ->
BucketIdentifier.newBucketFileIdPrefix(bucketNum, needFixedFileIdSuffix));
}
public static RowData rowWithFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean
needFixedFileIdSuffix) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 0cd66460c32..0129396ea52 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -122,9 +122,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
} else if (bucketToFileId.containsKey(bucketNum)) {
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
- String newFileId = isNonBlockingConcurrencyControl
- ? BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum)
- : BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+ String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum,
isNonBlockingConcurrencyControl);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);