This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f34f44e0db6 [HUDI-7227] Add a test to ensure multiple writers do not
write base files with bucket index in NBCC (#12039)
f34f44e0db6 is described below
commit f34f44e0db6b25d6e94021a77b6c99c5a67993b1
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Oct 11 14:24:23 2024 -0400
[HUDI-7227] Add a test to ensure multiple writers do not write base files
with bucket index in NBCC (#12039)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../TestSparkNonBlockingConcurrencyControl.java | 130 ++++++++++++++++++++-
1 file changed, 126 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index 1e113551b53..d729ac9dc33 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -31,11 +31,13 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -48,6 +50,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -63,6 +66,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.FileFilter;
@@ -80,9 +85,11 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctionalTestHarness {
@@ -206,6 +213,98 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
checkWrittenData(result, 1);
}
+ // Validate that multiple writers will only produce base files for bulk
insert
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig(true);
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+ // there should only be a single filegroup, so we will verify that it is
consistent
+ String fileID = null;
+
+ // if there is not a bulk insert first, then we will write to log files
for a filegroup
+ // without a base file. Having a base file adds the possibility of small
file handling
+ // which we want to ensure doesn't happen.
+ if (bulkInsertFirst) {
+ SparkRDDWriteClient client0 = getHoodieWriteClient(config);
+ List<String> dataset0 = Collections.singletonList("id0,Danny,0,0,par1");
+ String insertTime0 = client0.createNewInstantTime();
+ List<WriteStatus> writeStatuses0 = writeData(client0, insertTime0,
dataset0, false, WriteOperationType.BULK_INSERT, true);
+ client0.commitStats(
+ insertTime0,
+ context().parallelize(writeStatuses0, 1),
+
writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+ for (WriteStatus status : writeStatuses0) {
+ if (fileID == null) {
+ fileID = status.getFileId();
+ } else {
+ assertEquals(fileID, status.getFileId());
+ }
+ assertFalse(FSUtils.isLogFile(new
StoragePath(status.getStat().getPath()).getName()));
+ }
+ client0.close();
+ }
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+ List<String> dataset1 = Collections.singletonList("id1,Danny,22,1,par1");
+ String insertTime1 = client1.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1,
dataset1, false, WriteOperationType.INSERT, true);
+ for (WriteStatus status : writeStatuses1) {
+ if (fileID == null) {
+ fileID = status.getFileId();
+ } else {
+ assertEquals(fileID, status.getFileId());
+ }
+ assertTrue(FSUtils.isLogFile(new
StoragePath(status.getStat().getPath()).getName()));
+ }
+
+ SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+ List<String> dataset2 = Collections.singletonList("id1,Danny,23,2,par1");
+ String insertTime2 = client2.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2,
dataset2, false, WriteOperationType.UPSERT, true);
+ for (WriteStatus status : writeStatuses2) {
+ assertEquals(fileID, status.getFileId());
+ assertTrue(FSUtils.isLogFile(new
StoragePath(status.getStat().getPath()).getName()));
+ }
+
+ // step to commit the 1st txn
+ client1.commitStats(
+ insertTime1,
+ context().parallelize(writeStatuses1, 1),
+
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ // step to commit the 2nd txn
+ client2.commitStats(
+ insertTime2,
+ context().parallelize(writeStatuses2, 1),
+
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(),
+ metaClient.getCommitActionType());
+
+ client1.close();
+ client2.close();
+
+ metaClient.reloadActiveTimeline();
+ List<HoodieInstant> instants =
metaClient.getActiveTimeline().getInstants();
+ if (bulkInsertFirst) {
+ assertEquals(3, instants.size());
+ // check that bulk insert finished before the upsert started
+ assertTrue(Long.parseLong(instants.get(0).getCompletionTime()) <
Long.parseLong(instants.get(1).getTimestamp()));
+ // check that the upserts overlapped in time
+ assertTrue(Long.parseLong(instants.get(1).getCompletionTime()) >
Long.parseLong(instants.get(2).getTimestamp()));
+ assertTrue(Long.parseLong(instants.get(2).getCompletionTime()) >
Long.parseLong(instants.get(1).getTimestamp()));
+ } else {
+ assertEquals(2, instants.size());
+ // check that the upserts overlapped in time
+ assertTrue(Long.parseLong(instants.get(0).getCompletionTime()) >
Long.parseLong(instants.get(1).getTimestamp()));
+ assertTrue(Long.parseLong(instants.get(1).getCompletionTime()) >
Long.parseLong(instants.get(0).getTimestamp()));
+ }
+ }
+
// case1: txn1 is upsert writer, txn2 is bulk_insert writer.
// |----------- txn1 -----------|
// |----- txn2 ------|
@@ -277,6 +376,14 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
}
private HoodieWriteConfig createHoodieWriteConfig() {
+ return createHoodieWriteConfig(false);
+ }
+
+ private HoodieWriteConfig createHoodieWriteConfig(boolean fullUpdate) {
+ String payloadClassName = PartialUpdateAvroPayload.class.getName();
+ if (fullUpdate) {
+ payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+ }
Properties props = getPropertiesForKeyGen(true);
props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
String basePath = basePath();
@@ -288,7 +395,7 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
.withAutoCommit(false)
.withPayloadConfig(
HoodiePayloadConfig.newBuilder()
- .withPayloadClass(PartialUpdateAvroPayload.class.getName())
+ .withPayloadClass(payloadClassName)
.withPayloadOrderingField("ts")
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -393,10 +500,15 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
return record;
}
- private List<HoodieRecord> str2HoodieRecord(List<String> records) {
+ private List<HoodieRecord> str2HoodieRecord(List<String> records, boolean
fullUpdate) {
return records.stream().map(recordStr -> {
GenericRecord record = str2GenericRecord(recordStr);
- PartialUpdateAvroPayload payload = new PartialUpdateAvroPayload(record,
(Long) record.get("ts"));
+ OverwriteWithLatestAvroPayload payload;
+ if (fullUpdate) {
+ payload = new OverwriteWithLatestAvroPayload(record, (Long)
record.get("ts"));
+ } else {
+ payload = new PartialUpdateAvroPayload(record, (Long)
record.get("ts"));
+ }
return new HoodieAvroRecord<>(new HoodieKey((String) record.get("id"),
(String) record.get("part")), payload);
}).collect(Collectors.toList());
}
@@ -407,7 +519,17 @@ public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctiona
List<String> records,
boolean doCommit,
WriteOperationType operationType) {
- List<HoodieRecord> recordList = str2HoodieRecord(records);
+ return writeData(client, instant, records, doCommit, operationType, false);
+ }
+
+ private List<WriteStatus> writeData(
+ SparkRDDWriteClient client,
+ String instant,
+ List<String> records,
+ boolean doCommit,
+ WriteOperationType operationType,
+ boolean fullUpdate) {
+ List<HoodieRecord> recordList = str2HoodieRecord(records, fullUpdate);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(recordList, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
client.startCommitWithTime(instant);