This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a93c6eec678 [HUDI-8866] Unified the file naming rules in NBCC mode
(#12627)
a93c6eec678 is described below
commit a93c6eec6788c4a984eb93a875d97f25211e4509
Author: TheR1sing3un <[email protected]>
AuthorDate: Fri Jan 17 11:44:54 2025 +0800
[HUDI-8866] Unified the file naming rules in NBCC mode (#12627)
---
.../apache/hudi/index/bucket/BucketIdentifier.java | 7 +
.../BucketBulkInsertDataInternalWriterHelper.java | 4 +-
.../action/commit/SparkBucketIndexPartitioner.java | 12 +-
.../sink/bucket/BucketBulkInsertWriterHelper.java | 2 +-
.../sink/bucket/BucketStreamWriteFunction.java | 2 +-
.../TestSparkNonBlockingConcurrencyControl.java | 229 +++++++++++++++++++--
6 files changed, 238 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ea98ecc009d..3c433b7760a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -115,6 +115,13 @@ public class BucketIdentifier implements Serializable {
return newBucketFileIdFixedSuffix(bucketIdStr(bucketId));
}
+ /**
+ * Generate a new file id for NBCC mode, file id is fixed for each bucket
with format: "{bucket_id}-0000-0000-0000-000000000000-0"
+ */
+ public static String newBucketFileIdForNBCC(int bucketId) {
+ return FSUtils.createNewFileId(newBucketFileIdFixedSuffix(bucketId), 0);
+ }
+
public static boolean isBucketFileName(String name) {
return BUCKET_NAME.matcher(name).matches();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index b4b1f03473f..68123c02b79 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -50,6 +50,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles;
protected final String indexKeyFields;
protected final int bucketNum;
+ private final boolean isNonBlockingConcurrencyControl;
public BucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int
taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -64,6 +65,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
this.indexKeyFields =
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
this.bucketNum =
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
this.handles = new HashMap<>();
+ this.isNonBlockingConcurrencyControl =
writeConfig.isNonBlockingConcurrencyControl();
}
public void write(InternalRow row) throws IOException {
@@ -126,6 +128,6 @@ public class BucketBulkInsertDataInternalWriterHelper
extends BulkInsertDataInte
}
protected String getNextBucketFileId(int bucketInt) {
- return BucketIdentifier.newBucketFileIdPrefix(getNextFileId(), bucketInt);
+ return BucketIdentifier.newBucketFileIdPrefix(bucketInt,
isNonBlockingConcurrencyControl);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index 3b3b6667b9c..95fc1e5d951 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -118,6 +119,8 @@ public class SparkBucketIndexPartitioner<T> extends
String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets);
// Insert overwrite always generates new bucket file id
if (isOverwrite) {
+ ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl,
+ "Insert overwrite is not supported with non-blocking concurrency
control");
return new BucketInfo(BucketType.INSERT,
BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
}
Option<String> fileIdOption =
Option.fromJavaOptional(updatePartitionPathFileIds
@@ -128,9 +131,12 @@ public class SparkBucketIndexPartitioner<T> extends
return new BucketInfo(BucketType.UPDATE, fileIdOption.get(),
partitionPath);
} else {
// Always write into log file instead of base file if using NB-CC
- BucketType bucketType = isNonBlockingConcurrencyControl ?
BucketType.UPDATE : BucketType.INSERT;
- String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId,
isNonBlockingConcurrencyControl);
- return new BucketInfo(bucketType, fileIdPrefix, partitionPath);
+ if (isNonBlockingConcurrencyControl) {
+ String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucketNumber);
+ return new BucketInfo(BucketType.UPDATE, fileId, partitionPath);
+ }
+ String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
+ return new BucketInfo(BucketType.INSERT, fileIdPrefix, partitionPath);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index b84c44af832..7685bffb468 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -97,7 +97,7 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
String partition = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys,
numBuckets);
String bucketId = partition + bucketNum;
- return bucketIdToFileId.computeIfAbsent(bucketId, k ->
BucketIdentifier.newBucketFileIdPrefix(bucketNum, needFixedFileIdSuffix));
+ return bucketIdToFileId.computeIfAbsent(bucketId, k ->
needFixedFileIdSuffix ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) :
BucketIdentifier.newBucketFileIdPrefix(bucketNum));
}
public static RowData rowWithFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean
needFixedFileIdSuffix) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 6c31f37cc91..b0461969d97 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -141,7 +141,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
} else if (bucketToFileId.containsKey(bucketNum)) {
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
- String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum,
isNonBlockingConcurrencyControl);
+ String newFileId = isNonBlockingConcurrencyControl ?
BucketIdentifier.newBucketFileIdForNBCC(bucketNum) :
BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index 704d6e8420b..b215fd5b9d4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -36,6 +36,7 @@ import
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -299,12 +300,58 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
}
}
- // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
- // |----------- txn1 -----------|
- // |----- txn2 ------|
- // the txn2 would fail to commit caused by conflict
+ /**
+ * case1:
+ * 1. insert start
+ * 2. insert commit
+ * 3. bulk_insert start
+ * 4. bulk_insert commit
+ *
+ * |------ txn1: insert ------|
+ * |------ txn2: bulk_insert ------|
+ *
+ * both two txn would success to commit
+ */
@Test
- public void testBulkInsertInMultiWriter() throws Exception {
+ public void testBulkInsertAndInsertConcurrentCase1() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+
+ // start the 1st txn and insert record: [id1,Danny,null,1,par1], commit
the 1st txn
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ writeData(client1, insertTime1, dataset1, true, WriteOperationType.INSERT);
+
+ // start the 2nd txn and bulk_insert record: [id1,null,23,2,par1], commit
the 2nd txn
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ writeData(client2, insertTime2, dataset2, true,
WriteOperationType.BULK_INSERT);
+
+ // do compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+ client1.compact(compactionTime);
+
+ // result is [(id1,Danny,23,2,par1)]
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ /**
+ * case2:
+ * 1. insert start
+ * 2. bulk_insert start
+ * 3. insert commit
+ * 4. bulk_insert commit
+ *
+ * |------ txn1: insert ------|
+ * |------ txn2: bulk_insert ------|
+ *
+ * the txn2 should be fail to commit caused by conflict
+ */
+ @Test
+ public void testBulkInsertAndInsertConcurrentCase2() throws Exception {
HoodieWriteConfig config = createHoodieWriteConfig();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
SparkRDDWriteClient client1 = getHoodieWriteClient(config);
@@ -337,12 +384,170 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
});
}
- // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
- // |----- txn1 ------|
- // |--- txn2 ----|
- // both two txn would success to commit
+ /**
+ * case3:
+ * 1. bulk_insert start
+ * 2. insert start
+ * 3. insert commit
+ * 4. bulk_insert commit
+ *
+ * |------ txn2: insert ------|
+ * |---------- txn1: bulk_insert ----------|
+ *
+ * the txn2 should be fail to commit caused by conflict
+ */
+ @Test
+ public void testBulkInsertAndInsertConcurrentCase3() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+ // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend
the tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.BULK_INSERT);
+
+ // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // step to commit the 2nd txn
+ client1.commitStats(
+ insertTime1,
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 1st txn
+ assertThrows(HoodieWriteConflictException.class, () -> {
+ client2.commitStats(
+ insertTime2,
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+ });
+ }
+
+ /**
+ * case4:
+ * 1. insert start
+ * 2. bulk_insert start
+ * 3. bulk_insert commit
+ * 4. insert commit
+ *
+ * |------------ txn1: insert ------------|
+ * |------ txn2: bulk_insert ------|
+ *
+ * both two txn would success to commit
+ */
+ @Test
+ public void testBulkInsertAndInsertConcurrentCase4() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+ // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // start the 2nd txn and bulk insert record: [id1,null,23,2,par1], suspend
the tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.BULK_INSERT);
+
+ // step to commit the 2nd txn
+ client2.commitStats(
+ insertTime2,
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 1st txn
+ client1.commitStats(
+ insertTime1,
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // do compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+ client1.compact(compactionTime);
+
+ // result is [(id1,Danny,23,2,par1)]
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ /**
+ * case5:
+ * 1. bulk_insert start
+ * 2. insert start
+ * 3. bulk_insert commit
+ * 4. insert commit
+ *
+ * |------ txn2: insert ------|
+ * |---------- txn1: bulk_insert ----------|
+ *
+ * both two txn would success to commit
+ */
+ @Test
+ public void testBulkInsertAndInsertConcurrentCase5() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+ // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend
the tx commit
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.BULK_INSERT);
+
+ // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend
the tx commit
+ List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT);
+
+ // step to commit the 1st txn
+ client2.commitStats(
+ insertTime2,
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 2nd txn
+ client1.commitStats(
+ insertTime1,
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // do compaction
+ String compactionTime = (String)
client1.scheduleCompaction(Option.empty()).get();
+ client1.compact(compactionTime);
+
+ // result is [(id1,Danny,23,2,par1)]
+ Map<String, String> result = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ checkWrittenData(result, 1);
+ }
+
+ /**
+ * case6:
+ * 1. bulk_insert start
+ * 2. bulk_insert commit
+ * 3. insert start
+ * 4. insert commit
+ *
+ * |------ txn2: insert ------|
+ * |------ txn1: bulk_insert ------|
+ *
+ * both two txn would success to commit
+ */
@Test
- public void testBulkInsertInSequence() throws Exception {
+ public void testBulkInsertAndInsertConcurrentCase6() throws Exception {
HoodieWriteConfig config = createHoodieWriteConfig();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
@@ -352,7 +557,7 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
String insertTime1 = client1.createNewInstantTime();
writeData(client1, insertTime1, dataset1, true,
WriteOperationType.BULK_INSERT);
- // start the 1st txn and insert record: [id1,null,23,2,par1], commit the
2nd txn
+ // start the 2nd txn and insert record: [id1,null,23,2,par1], commit the
2nd txn
SparkRDDWriteClient client2 = getHoodieWriteClient(config);
List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
String insertTime2 = client2.createNewInstantTime();
@@ -380,6 +585,7 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
String basePath = basePath();
return HoodieWriteConfig.newBuilder()
+
.withProps(Collections.singletonMap(HoodieTableConfig.PRECOMBINE_FIELD.key(),
"ts"))
.forTable("test")
.withPath(basePath)
.withSchema(jsonSchema)
@@ -389,7 +595,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
.withPayloadConfig(
HoodiePayloadConfig.newBuilder()
.withPayloadClass(payloadClassName)
- .withPayloadOrderingField("ts")
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())