This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f34f44e0db6 [HUDI-7227] Add a test to ensure multiple writers do not 
write base files with bucket index in NBCC (#12039)
f34f44e0db6 is described below

commit f34f44e0db6b25d6e94021a77b6c99c5a67993b1
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Oct 11 14:24:23 2024 -0400

    [HUDI-7227] Add a test to ensure multiple writers do not write base files 
with bucket index in NBCC (#12039)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../TestSparkNonBlockingConcurrencyControl.java    | 130 ++++++++++++++++++++-
 1 file changed, 126 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index 1e113551b53..d729ac9dc33 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -31,11 +31,13 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -48,6 +50,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -63,6 +66,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.FileFilter;
@@ -80,9 +85,11 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Tag("functional")
 public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctionalTestHarness {
@@ -206,6 +213,98 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
     checkWrittenData(result, 1);
   }
 
+  // Validate that multiple writers will only produce base files for bulk 
insert
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig(true);
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+    // there should only be a single filegroup, so we will verify that it is 
consistent
+    String fileID = null;
+
+    // if there is not a bulk insert first, then we will write to log files 
for a filegroup
+    // without a base file. Having a base file adds the possibility of small 
file handling
+    // which we want to ensure doesn't happen.
+    if (bulkInsertFirst) {
+      SparkRDDWriteClient client0 = getHoodieWriteClient(config);
+      List<String> dataset0 = Collections.singletonList("id0,Danny,0,0,par1");
+      String insertTime0 = client0.createNewInstantTime();
+      List<WriteStatus> writeStatuses0 = writeData(client0, insertTime0, 
dataset0, false, WriteOperationType.BULK_INSERT, true);
+      client0.commitStats(
+          insertTime0,
+          context().parallelize(writeStatuses0, 1),
+          
writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+          Option.empty(),
+          metaClient.getCommitActionType());
+      for (WriteStatus status : writeStatuses0) {
+        if (fileID == null) {
+          fileID = status.getFileId();
+        } else {
+          assertEquals(fileID, status.getFileId());
+        }
+        assertFalse(FSUtils.isLogFile(new 
StoragePath(status.getStat().getPath()).getName()));
+      }
+      client0.close();
+    }
+
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+    List<String> dataset1 = Collections.singletonList("id1,Danny,22,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT, true);
+    for (WriteStatus status : writeStatuses1) {
+      if (fileID == null) {
+        fileID = status.getFileId();
+      } else {
+        assertEquals(fileID, status.getFileId());
+      }
+      assertTrue(FSUtils.isLogFile(new 
StoragePath(status.getStat().getPath()).getName()));
+    }
+
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,Danny,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.UPSERT, true);
+    for (WriteStatus status : writeStatuses2) {
+      assertEquals(fileID, status.getFileId());
+      assertTrue(FSUtils.isLogFile(new 
StoragePath(status.getStat().getPath()).getName()));
+    }
+
+    // step to commit the 1st txn
+    client1.commitStats(
+        insertTime1,
+        context().parallelize(writeStatuses1, 1),
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 2nd txn
+    client2.commitStats(
+        insertTime2,
+        context().parallelize(writeStatuses2, 1),
+        
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    client1.close();
+    client2.close();
+
+    metaClient.reloadActiveTimeline();
+    List<HoodieInstant> instants = 
metaClient.getActiveTimeline().getInstants();
+    if (bulkInsertFirst) {
+      assertEquals(3, instants.size());
+      // check that bulk insert finished before the upsert started
+      assertTrue(Long.parseLong(instants.get(0).getCompletionTime()) < 
Long.parseLong(instants.get(1).getTimestamp()));
+      // check that the upserts overlapped in time
+      assertTrue(Long.parseLong(instants.get(1).getCompletionTime()) > 
Long.parseLong(instants.get(2).getTimestamp()));
+      assertTrue(Long.parseLong(instants.get(2).getCompletionTime()) > 
Long.parseLong(instants.get(1).getTimestamp()));
+    } else {
+      assertEquals(2, instants.size());
+      // check that the upserts overlapped in time
+      assertTrue(Long.parseLong(instants.get(0).getCompletionTime()) > 
Long.parseLong(instants.get(1).getTimestamp()));
+      assertTrue(Long.parseLong(instants.get(1).getCompletionTime()) > 
Long.parseLong(instants.get(0).getTimestamp()));
+    }
+  }
+
   // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
   //      |----------- txn1 -----------|
   //                       |----- txn2 ------|
@@ -277,6 +376,14 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
   }
 
   private HoodieWriteConfig createHoodieWriteConfig() {
+    return createHoodieWriteConfig(false);
+  }
+
+  private HoodieWriteConfig createHoodieWriteConfig(boolean fullUpdate) {
+    String payloadClassName = PartialUpdateAvroPayload.class.getName();
+    if (fullUpdate) {
+      payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+    }
     Properties props = getPropertiesForKeyGen(true);
     props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
     String basePath = basePath();
@@ -288,7 +395,7 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
         .withAutoCommit(false)
         .withPayloadConfig(
             HoodiePayloadConfig.newBuilder()
-                .withPayloadClass(PartialUpdateAvroPayload.class.getName())
+                .withPayloadClass(payloadClassName)
                 .withPayloadOrderingField("ts")
                 .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -393,10 +500,15 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
     return record;
   }
 
-  private List<HoodieRecord> str2HoodieRecord(List<String> records) {
+  private List<HoodieRecord> str2HoodieRecord(List<String> records, boolean 
fullUpdate) {
     return records.stream().map(recordStr -> {
       GenericRecord record = str2GenericRecord(recordStr);
-      PartialUpdateAvroPayload payload = new PartialUpdateAvroPayload(record, 
(Long) record.get("ts"));
+      OverwriteWithLatestAvroPayload payload;
+      if (fullUpdate) {
+        payload = new OverwriteWithLatestAvroPayload(record, (Long) 
record.get("ts"));
+      } else {
+        payload = new PartialUpdateAvroPayload(record, (Long) 
record.get("ts"));
+      }
       return new HoodieAvroRecord<>(new HoodieKey((String) record.get("id"), 
(String) record.get("part")), payload);
     }).collect(Collectors.toList());
   }
@@ -407,7 +519,17 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
       List<String> records,
       boolean doCommit,
       WriteOperationType operationType) {
-    List<HoodieRecord> recordList = str2HoodieRecord(records);
+    return writeData(client, instant, records, doCommit, operationType, false);
+  }
+
+  private List<WriteStatus> writeData(
+      SparkRDDWriteClient client,
+      String instant,
+      List<String> records,
+      boolean doCommit,
+      WriteOperationType operationType,
+      boolean fullUpdate) {
+    List<HoodieRecord> recordList = str2HoodieRecord(records, fullUpdate);
     JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(recordList, 2);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     client.startCommitWithTime(instant);

Reply via email to