This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 54276a957b82 refactor: modularize long test methods in 
TestHoodieClientOnCopyOnWriteStorage (#18377)
54276a957b82 is described below

commit 54276a957b8254510b118ddeee29dfd1f149560b
Author: yaojiejia <[email protected]>
AuthorDate: Mon Mar 30 04:36:29 2026 -0400

    refactor: modularize long test methods in 
TestHoodieClientOnCopyOnWriteStorage (#18377)
    
    * init changes
    
    * minor fix
---
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 496 ++++++++++-----------
 1 file changed, 241 insertions(+), 255 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index fcb08a2e2706..6f873d2d5059 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -60,7 +60,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.table.timeline.TimelineFactory;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -470,61 +469,52 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   @Override
   protected void testMergeHandle(HoodieWriteConfig config) throws IOException {
-    final String instantTime = "007";
     HoodieTableMetaClient metaClient = 
HoodieClientTestUtils.createMetaClient(jsc, basePath);
     HoodieTable table = getHoodieTable(metaClient, config);
     Pair<String, String> partitionAndBaseFilePaths = 
getPartitionAndBaseFilePathsFromLatestCommitMetadata(metaClient);
     String partitionPath = partitionAndBaseFilePaths.getLeft();
     String baseFilePath = partitionAndBaseFilePaths.getRight();
     jsc.parallelize(Arrays.asList(1)).map(e -> {
-
       HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath);
+      performMergeValidationCheck(config, "007", table, partitionPath, 
baseFile, false);
 
-      HoodieWriteMergeHandle handle = null;
-      try {
-        handle = new HoodieWriteMergeHandle(config, instantTime, table, new 
HashMap<>(),
-            partitionPath, FSUtils.getFileId(baseFile.getFileName()), 
baseFile, new SparkTaskContextSupplier(),
-            config.populateMetaFields() ? Option.empty() :
-                Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
-        WriteStatus writeStatus = new WriteStatus(false, 0.0);
-        writeStatus.setStat(new HoodieWriteStat());
-        writeStatus.getStat().setNumWrites(0);
-        handle.performMergeDataValidationCheck(writeStatus);
-      } catch (HoodieCorruptedDataException e1) {
+      config.getProps().setProperty("hoodie.merge.data.validation.enabled", 
"true");
+      HoodieWriteConfig cfg2 = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+      performMergeValidationCheck(cfg2, "006", table, partitionPath, baseFile, 
true);
+      return true;
+    }).collect();
+  }
+
+  private static void performMergeValidationCheck(HoodieWriteConfig cfg, 
String instantTime, HoodieTable table,
+                                           String partitionPath, 
HoodieBaseFile baseFile,
+                                           boolean expectCorruptedException) 
throws Exception {
+    HoodieWriteMergeHandle handle = null;
+    try {
+      Option<BaseKeyGenerator> keyGenOption = cfg.populateMetaFields() ? 
Option.empty()
+          : Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(cfg.getProps()));
+      handle = new HoodieWriteMergeHandle(cfg, instantTime, table, new 
HashMap<>(),
+          partitionPath, FSUtils.getFileId(baseFile.getFileName()), baseFile,
+          new SparkTaskContextSupplier(), keyGenOption);
+      WriteStatus writeStatus = new WriteStatus(false, 0.0);
+      writeStatus.setStat(new HoodieWriteStat());
+      writeStatus.getStat().setNumWrites(0);
+      handle.performMergeDataValidationCheck(writeStatus);
+      if (expectCorruptedException) {
+        fail("Expected HoodieCorruptedDataException was not thrown");
+      }
+    } catch (HoodieCorruptedDataException e) {
+      if (!expectCorruptedException) {
         fail("Exception not expected because merge validation check is 
disabled");
-      } finally {
-        if (handle != null) {
-          handle.close();
-        }
       }
-
-      handle = null;
-      try {
-        final String newInstantTime = "006";
-        config.getProps().setProperty("hoodie.merge.data.validation.enabled", 
"true");
-        HoodieWriteConfig cfg2 = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
-        handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new 
HashMap<>(),
-            partitionPath, FSUtils.getFileId(baseFile.getFileName()), 
baseFile, new SparkTaskContextSupplier(),
-            config.populateMetaFields() ? Option.empty() :
-                Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
-        WriteStatus writeStatus = new WriteStatus(false, 0.0);
-        writeStatus.setStat(new HoodieWriteStat());
-        writeStatus.getStat().setNumWrites(0);
-        handle.performMergeDataValidationCheck(writeStatus);
-        fail("The above line should have thrown an exception");
-      } catch (HoodieCorruptedDataException e2) {
-        // expected
-      } finally {
-        if (handle != null) {
-          try {
-            handle.close();
-          } catch (Exception ex) {
-            // ignore exception from validation check
-          }
+    } finally {
+      if (handle != null) {
+        try {
+          handle.close();
+        } catch (Exception ex) {
+          // ignore exception from validation check
         }
       }
-      return true;
-    }).collect();
+    }
   }
 
   @Test
@@ -719,6 +709,114 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     return Pair.of(recreatedStatuses, inserts);
   }
 
+  private List<WriteStatus> upsertBatchRecords(SparkRDDWriteClient client, 
String commitTime,
+                                               List<HoodieRecord> records, int 
numSlices) throws IOException {
+    WriteClientTestUtils.startCommitWithTime(client, commitTime);
+    List<WriteStatus> statusList = client.upsert(jsc.parallelize(records, 
numSlices), commitTime).collect();
+    client.commit(commitTime, jsc.parallelize(statusList), Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+    assertNoWriteErrors(statusList);
+    return statusList;
+  }
+
+  private void verifyExpandedFile(FileFormatUtils fileUtils, List<WriteStatus> 
statusList,
+                                  String expectedFileId, String 
prevCommitTime, int expectedRecordCount,
+                                  Set<String> keys1, Set<String> keys2, String 
commitTime) throws IOException {
+    assertEquals(1, statusList.size(), "Just 1 file needs to be updated.");
+    assertEquals(expectedFileId, statusList.get(0).getFileId(), "Existing file 
should be expanded");
+    assertEquals(prevCommitTime, statusList.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");
+    StoragePath newFile = new StoragePath(basePath, 
statusList.get(0).getStat().getPath());
+    assertEquals(expectedRecordCount, fileUtils.readRowKeys(storage, 
newFile).size(),
+        "file should contain " + expectedRecordCount + " records");
+    for (GenericRecord record : fileUtils.readAvroRecords(storage, newFile)) {
+      String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      assertEquals(commitTime, 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
+          "only expect " + commitTime);
+      assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
+          "key expected to be part of " + commitTime);
+    }
+  }
+
+  private void verifyTwoFileCommitDistribution(FileFormatUtils fileUtils, 
List<HoodieBaseFile> files,
+                                               String file1Id, Set<String> 
keys2, Set<String> keys3,
+                                               String commitTime3, int 
expectedUpdates) throws IOException {
+    int numTotalInsertsInCommit3 = 0;
+    int numTotalUpdatesInCommit3 = 0;
+    for (HoodieBaseFile file : files) {
+      if (file.getFileName().contains(file1Id)) {
+        assertEquals(commitTime3, file.getCommitTime(), "Existing file should 
be expanded");
+        List<GenericRecord> records = fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath()));
+        for (GenericRecord record : records) {
+          String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+          String recordCommitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+          if (recordCommitTime.equals(commitTime3)) {
+            if (keys2.contains(recordKey)) {
+              keys2.remove(recordKey);
+              numTotalUpdatesInCommit3++;
+            } else {
+              numTotalInsertsInCommit3++;
+            }
+          }
+        }
+        assertEquals(0, keys2.size(), "All keys added in commit 2 must be 
updated in commit3 correctly");
+      } else {
+        assertEquals(commitTime3, file.getCommitTime(), "New file must be 
written for commit 3");
+        List<GenericRecord> records = fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath()));
+        for (GenericRecord record : records) {
+          String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+          assertEquals(commitTime3, 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
+              "only expect commit3");
+          assertTrue(keys3.contains(recordKey), "key expected to be part of 
commit3");
+        }
+        numTotalInsertsInCommit3 += records.size();
+      }
+    }
+    assertEquals(expectedUpdates, numTotalUpdatesInCommit3, "Total updates in 
commit3 must add up");
+    assertEquals(keys3.size(), numTotalInsertsInCommit3, "Total inserts in 
commit3 must add up");
+  }
+
+  private List<String> getFileGroupIds(HoodieSparkCopyOnWriteTable table, 
String partitionPath) {
+    return table.getFileSystemView().getAllFileGroups(partitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+  }
+
+  private void verifyInsertExpandedFile(FileFormatUtils fileUtils, 
List<WriteStatus> statuses,
+                                        String expectedFileId, String 
prevCommitTime, int expectedRecordCount,
+                                        Set<String> keys1, Set<String> keys2,
+                                        String commitTime1, String 
commitTime2) throws IOException {
+    assertEquals(expectedFileId, statuses.get(0).getFileId(), "Existing file 
should be expanded");
+    assertEquals(prevCommitTime, statuses.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");
+    StoragePath newFile = new StoragePath(basePath, 
statuses.get(0).getStat().getPath());
+    assertEquals(expectedRecordCount, fileUtils.readRowKeys(storage, 
newFile).size(),
+        "file should contain " + expectedRecordCount + " records");
+    for (GenericRecord record : fileUtils.readAvroRecords(storage, newFile)) {
+      String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      String recCommitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+      assertTrue(commitTime1.equals(recCommitTime) || 
commitTime2.equals(recCommitTime),
+          "Record expected to be part of commit 1 or commit2");
+      assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
+          "key expected to be part of commit 1 or commit2");
+    }
+  }
+
+  private void verifyAllFilesAtCommit(FileFormatUtils fileUtils, 
List<HoodieBaseFile> files,
+                                      String expectedCommitTime, int 
expectedTotalRecords) throws IOException {
+    int totalRecords = 0;
+    for (HoodieBaseFile file : files) {
+      assertEquals(expectedCommitTime, file.getCommitTime(), "All files must 
be at " + expectedCommitTime);
+      totalRecords += fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath())).size();
+    }
+    assertEquals(expectedTotalRecords, totalRecords, "Total number of records 
must add up");
+  }
+
+  private void insertCommitWithSchema(SparkRDDWriteClient client, 
HoodieTestDataGenerator gen,
+                                      int count, String schema) {
+    String commitTime = client.startCommit();
+    List<HoodieRecord> batch = gen.generateInsertsAsPerSchema(commitTime, 
count, schema);
+    JavaRDD<HoodieRecord> records = 
context.getJavaSparkContext().parallelize(batch, 1);
+    JavaRDD<WriteStatus> writeStatuses = client.insert(records, commitTime);
+    client.commit(commitTime, writeStatuses);
+  }
+
   @Test
   public void testUpdateRejectForClustering() throws IOException {
     final String testPartitionPath = "2016/09/26";
@@ -734,8 +832,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     //1. insert to generate 2 file group
     Pair<JavaRDD<WriteStatus>, List<HoodieRecord>> upsertResult = 
insertBatchRecords(client, "001", 600, 2, 1, SparkRDDWriteClient::upsert);
     List<HoodieRecord> inserts1 = upsertResult.getValue();
-    List<String> fileGroupIds1 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
-        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    List<String> fileGroupIds1 = getFileGroupIds(table, testPartitionPath);
     assertEquals(2, fileGroupIds1.size());
 
     // 2. generate clustering plan for fileGroupIds1 file groups
@@ -746,8 +843,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     // 3. insert one record with no updating reject exception, and not merge 
the small file, just generate a new file group
     insertBatchRecords(client, "003", 1, 1, 1, SparkRDDWriteClient::upsert);
-    List<String> fileGroupIds2 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
-        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    List<String> fileGroupIds2 = getFileGroupIds(table, testPartitionPath);
     assertEquals(3, fileGroupIds2.size());
 
     // 4. update one record for the clustering two file groups, throw reject 
update exception
@@ -765,9 +861,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         insertBatchRecords(client, "005", 1, 1, 1, 
SparkRDDWriteClient::upsert).getKey();
     fileGroupIds2.removeAll(fileGroupIds1);
     assertEquals(fileGroupIds2.get(0), statuses.collect().get(0).getFileId());
-    List<String> firstInsertFileGroupIds4 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
-        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
-    assertEquals(3, firstInsertFileGroupIds4.size());
+    assertEquals(3, getFileGroupIds(table, testPartitionPath).size());
   }
 
   /**
@@ -777,112 +871,43 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   public void testSmallInsertHandlingForUpserts() throws Exception {
     final String testPartitionPath = "2016/09/26";
     final int insertSplitLimit = 100;
-    // setup the small file handling params
     // hold upto 200 records max
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
         TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150));
-
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config);
     FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Inserts => will write file1
     String commitTime1 = "001";
-    WriteClientTestUtils.startCommitWithTime(client, commitTime1);
     List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 
insertSplitLimit); // this writes ~500kb
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
-
-    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
-    List<WriteStatus> statusList = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
-    writeClient.commit(commitTime1, jsc.parallelize(statusList), 
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-    assertNoWriteErrors(statusList);
-
-    assertEquals(1, statusList.size(), "Just 1 file needs to be added.");
-    String file1 = statusList.get(0).getFileId();
+    List<WriteStatus> statusList1 = upsertBatchRecords(client, commitTime1, 
inserts1, 1);
+    assertEquals(1, statusList1.size(), "Just 1 file needs to be added.");
+    String file1 = statusList1.get(0).getFileId();
     assertEquals(100,
-        fileUtils.readRowKeys(storage, new StoragePath(basePath, 
statusList.get(0).getStat().getPath()))
+        fileUtils.readRowKeys(storage, new StoragePath(basePath, 
statusList1.get(0).getStat().getPath()))
             .size(), "file should contain 100 records");
 
     // Update + Inserts such that they just expand file1
     String commitTime2 = "002";
-    WriteClientTestUtils.startCommitWithTime(client, commitTime2);
     List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
     Set<String> keys2 = recordsToRecordKeySet(inserts2);
-    List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
-    insertsAndUpdates2.addAll(inserts2);
+    List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
     insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
-
-    JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 1);
-    statusList = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
-    client.commit(commitTime2, jsc.parallelize(statusList), Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-    assertNoWriteErrors(statusList);
-
-    assertEquals(1, statusList.size(), "Just 1 file needs to be updated.");
-    assertEquals(file1, statusList.get(0).getFileId(), "Existing file should 
be expanded");
-    assertEquals(commitTime1, statusList.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");
-    StoragePath newFile = new StoragePath(basePath, 
statusList.get(0).getStat().getPath());
-    assertEquals(140, fileUtils.readRowKeys(storage, newFile).size(),
-        "file should contain 140 records");
-
-    List<GenericRecord> records = fileUtils.readAvroRecords(storage, newFile);
-    for (GenericRecord record : records) {
-      String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-      assertEquals(commitTime2, 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect 
commit2");
-      assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey), "key 
expected to be part of commit2");
-    }
+    List<WriteStatus> statusList2 = upsertBatchRecords(client, commitTime2, 
insertsAndUpdates2, 1);
+    verifyExpandedFile(fileUtils, statusList2, file1, commitTime1, 140, keys1, 
keys2, commitTime2);
 
     // update + inserts such that file1 is updated and expanded, a new file2 
is created.
     String commitTime3 = "003";
-    WriteClientTestUtils.startCommitWithTime(client, commitTime3);
-    List<HoodieRecord> insertsAndUpdates3 = 
dataGen.generateInserts(commitTime3, 200);
+    List<HoodieRecord> insertsAndUpdates3 = new 
ArrayList<>(dataGen.generateInserts(commitTime3, 200));
     Set<String> keys3 = recordsToRecordKeySet(insertsAndUpdates3);
-    List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, 
inserts2);
-    insertsAndUpdates3.addAll(updates3);
-
-    JavaRDD<HoodieRecord> insertAndUpdatesRDD3 = 
jsc.parallelize(insertsAndUpdates3, 1);
-    statusList = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
-    client.commit(commitTime3, jsc.parallelize(statusList), Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-    assertNoWriteErrors(statusList);
-    assertEquals(2, statusList.size(), "2 files needs to be committed.");
-    HoodieTableMetaClient metadata = createMetaClient();
-
-    HoodieTable table = getHoodieTable(metadata, config);
-    BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
-    List<HoodieBaseFile> files =
-        fileSystemView.getLatestBaseFilesBeforeOrOn(testPartitionPath, 
commitTime3).collect(Collectors.toList());
-    int numTotalInsertsInCommit3 = 0;
-    int numTotalUpdatesInCommit3 = 0;
-    for (HoodieBaseFile file : files) {
-      if (file.getFileName().contains(file1)) {
-        assertEquals(commitTime3, file.getCommitTime(), "Existing file should 
be expanded");
-        records = fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath()));
-        for (GenericRecord record : records) {
-          String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-          String recordCommitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
-          if (recordCommitTime.equals(commitTime3)) {
-            if (keys2.contains(recordKey)) {
-              keys2.remove(recordKey);
-              numTotalUpdatesInCommit3++;
-            } else {
-              numTotalInsertsInCommit3++;
-            }
-          }
-        }
-        assertEquals(0, keys2.size(), "All keys added in commit 2 must be 
updated in commit3 correctly");
-      } else {
-        assertEquals(commitTime3, file.getCommitTime(), "New file must be 
written for commit 3");
-        records = fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath()));
-        for (GenericRecord record : records) {
-          String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-          assertEquals(commitTime3, 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
-              "only expect commit3");
-          assertTrue(keys3.contains(recordKey), "key expected to be part of 
commit3");
-        }
-        numTotalInsertsInCommit3 += records.size();
-      }
-    }
-    assertEquals(numTotalUpdatesInCommit3, inserts2.size(), "Total updates in 
commit3 must add up");
-    assertEquals(numTotalInsertsInCommit3, keys3.size(), "Total inserts in 
commit3 must add up");
+    insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime3, inserts2));
+    List<WriteStatus> statusList3 = upsertBatchRecords(client, commitTime3, 
insertsAndUpdates3, 1);
+    assertEquals(2, statusList3.size(), "2 files needs to be committed.");
+    List<HoodieBaseFile> files = getHoodieTable(createMetaClient(), 
config).getBaseFileOnlyView()
+        .getLatestBaseFilesBeforeOrOn(testPartitionPath, 
commitTime3).collect(Collectors.toList());
+    verifyTwoFileCommitDistribution(fileUtils, files, file1, keys2, keys3, 
commitTime3, inserts2.size());
   }
 
   /**
@@ -916,21 +941,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     insertResult = insertBatchRecords(client, commitTime2, 40, 1, 1, 
SparkRDDWriteClient::insert);
     Set<String> keys2 = recordsToRecordKeySet(insertResult.getRight());
     statuses = insertResult.getLeft().collect();
-    assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be 
expanded");
-    assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), 
"Existing file should be expanded");
-
-    StoragePath newFile = new StoragePath(basePath, 
statuses.get(0).getStat().getPath());
-    assertEquals(140, fileUtils.readRowKeys(storage, newFile).size(),
-        "file should contain 140 records");
-    List<GenericRecord> records = fileUtils.readAvroRecords(storage, newFile);
-    for (GenericRecord record : records) {
-      String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-      String recCommitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
-      assertTrue(commitTime1.equals(recCommitTime) || 
commitTime2.equals(recCommitTime),
-          "Record expected to be part of commit 1 or commit2");
-      assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
-          "key expected to be part of commit 1 or commit2");
-    }
+    verifyInsertExpandedFile(fileUtils, statuses, file1, commitTime1, 140, 
keys1, keys2, commitTime1, commitTime2);
 
     // Lots of inserts such that file1 is updated and expanded, a new file2 is 
created.
     String commitTime3 = "003";
@@ -940,19 +951,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         fileUtils.readRowKeys(storage, new StoragePath(basePath, 
statuses.get(0).getStat().getPath())).size()
             + fileUtils.readRowKeys(storage, new StoragePath(basePath, 
statuses.get(1).getStat().getPath())).size(),
         "file should contain 340 records");
-
-    HoodieTableMetaClient metaClient = createMetaClient();
-    HoodieTable table = getHoodieTable(metaClient, config);
-    List<HoodieBaseFile> files = table.getBaseFileOnlyView()
+    List<HoodieBaseFile> files = getHoodieTable(createMetaClient(), 
config).getBaseFileOnlyView()
         .getLatestBaseFilesBeforeOrOn(testPartitionPath, 
commitTime3).collect(Collectors.toList());
     assertEquals(2, files.size(), "Total of 2 valid data files");
-
-    int totalInserts = 0;
-    for (HoodieBaseFile file : files) {
-      assertEquals(commitTime3, file.getCommitTime(), "All files must be at 
commit 3");
-      totalInserts += fileUtils.readAvroRecords(storage, new 
StoragePath(file.getPath())).size();
-    }
-    assertEquals(340, totalInserts, "Total number of records must add up");
+    verifyAllFilesAtCommit(fileUtils, files, commitTime3, 340);
   }
 
   /**
@@ -972,19 +974,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     // Inserts => will write file1
     String commitTime1 = "001";
-    WriteClientTestUtils.startCommitWithTime(client, commitTime1);
     List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 
insertSplitLimit); // this writes ~500kb
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
     List<String> keysSoFar = new ArrayList<>(keys1);
-    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
-    List<WriteStatus> statusList = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
-    client.commit(commitTime1, jsc.parallelize(statusList), Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-    assertNoWriteErrors(statusList);
-
-    assertEquals(1, statusList.size(), "Just 1 file needs to be added.");
-    String file1 = statusList.get(0).getFileId();
+    List<WriteStatus> statusList1 = upsertBatchRecords(client, commitTime1, 
inserts1, 1);
+    assertEquals(1, statusList1.size(), "Just 1 file needs to be added.");
+    String file1 = statusList1.get(0).getFileId();
     assertEquals(100, getFileUtilsInstance(metaClient).readRowKeys(
-        storage, new StoragePath(basePath, 
statusList.get(0).getStat().getPath())).size(), "file should contain 100 
records");
+        storage, new StoragePath(basePath, 
statusList1.get(0).getStat().getPath())).size(), "file should contain 100 
records");
 
     // Delete 20 among 100 inserted
     testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
@@ -1007,10 +1004,10 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6, 
20);
     List<HoodieKey> hoodieKeysToDelete3 = 
randomSelectAsHoodieKeys(dummyInserts3, 20);
     JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
-    statusList = client.delete(deleteKeys3, commitTime6).collect();
-    client.commit(commitTime6, jsc.parallelize(statusList), Option.empty(), 
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
-    assertNoWriteErrors(statusList);
-    assertEquals(0, statusList.size(), "Just 0 write status for delete.");
+    List<WriteStatus> nonExistentDeleteStatuses = client.delete(deleteKeys3, 
commitTime6).collect();
+    client.commit(commitTime6, jsc.parallelize(nonExistentDeleteStatuses), 
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+    assertNoWriteErrors(nonExistentDeleteStatuses);
+    assertEquals(0, nonExistentDeleteStatuses.size(), "Just 0 write status for 
delete.");
 
     assertTheEntireDatasetHasAllRecordsStill(150);
 
@@ -1211,20 +1208,11 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
     HoodieTestDataGenerator dataGen =
         new HoodieTestDataGenerator(new 
String[]{DEFAULT_FIRST_PARTITION_PATH});
-    String firstCommit = client.startCommit();
-    List<HoodieRecord> firstBatch = 
dataGen.generateInsertsAsPerSchema(firstCommit, 10, schemaStr);
-    JavaRDD<HoodieRecord> records = 
context.getJavaSparkContext().parallelize(firstBatch, 1);
-    JavaRDD<WriteStatus> writeStatuses = client.insert(records, firstCommit);
-    client.commit(firstCommit, writeStatuses);
-
-    // Not create another commit on DEFAULT_SECOND_PARTITION_PATH partition
-    // with schema that contains new columns.
-    String secondCommit = client.startCommit();
+    insertCommitWithSchema(client, dataGen, 10, schemaStr);
+
+    // create another commit with schema that contains new columns.
     String latestSchemaStr = TRIP_EXAMPLE_SCHEMA_EVOLVED_1;
-    List<HoodieRecord> secondBatch = 
dataGen.generateInsertsAsPerSchema(secondCommit, 10, latestSchemaStr);
-    records = context.getJavaSparkContext().parallelize(secondBatch, 1);
-    writeStatuses = client.insert(records, secondCommit);
-    client.commit(secondCommit, writeStatuses);
+    insertCommitWithSchema(client, dataGen, 10, latestSchemaStr);
 
     // Create cluster commit on DEFAULT_FIRST_PARTITION_PATH partition
     // Here pass in precommit validator as SqlQueryEqualityPreCommitValidator 
and check that trip_id is not null.
@@ -1479,15 +1467,9 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     Set<String> deletePartitionReplaceFileIds1 =
         deletePartitionWithCommit(client, commitTime4, 
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
     assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
-    List<HoodieBaseFile> baseFiles = 
HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
-        String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
-    assertEquals(0, baseFiles.size());
-    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
-        String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
-    assertTrue(baseFiles.size() > 0);
-    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
-        String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
-    assertTrue(baseFiles.size() > 0);
+    assertEquals(0, 
getLatestBaseFilesForPartition(DEFAULT_FIRST_PARTITION_PATH).size());
+    
assertTrue(getLatestBaseFilesForPartition(DEFAULT_SECOND_PARTITION_PATH).size() 
> 0);
+    
assertTrue(getLatestBaseFilesForPartition(DEFAULT_THIRD_PARTITION_PATH).size() 
> 0);
 
     // delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
     String commitTime5 = "005";
@@ -1498,11 +1480,15 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     expectedFileId.addAll(batch3Buckets);
     assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
 
-    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
+    assertEquals(0, HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
         String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
         String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
-        String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
-    assertEquals(0, baseFiles.size());
+        String.format("%s/%s/*", basePath, 
DEFAULT_THIRD_PARTITION_PATH)).size());
+  }
+
+  private List<HoodieBaseFile> getLatestBaseFilesForPartition(String 
partitionPath) {
+    return HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
+        String.format("%s/%s/*", basePath, partitionPath));
   }
 
   private Pair<Set<String>, List<HoodieRecord>> testUpdates(String 
instantTime, SparkRDDWriteClient client,
@@ -1664,28 +1650,46 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
         HoodieClientTestUtils.read(jsc, basePath, sqlContext, storage, 
fullPartitionPaths).count(), "Must contain " + totalRecords + " records");
   }
 
-  @Test
-  public void testClusteringCommitInPresenceOfInflightCommit() throws 
Exception {
+  private Properties createOccProperties() {
     Properties properties = getDisabledRowWriterProperties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
-    HoodieLockConfig lockConfig = createLockConfig(new 
PreferWriterConflictResolutionStrategy());
-    HoodieCleanConfig cleanConfig = 
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
-    HoodieWriteConfig insertWriteConfig = getConfigBuilder()
+    return properties;
+  }
+
+  private HoodieWriteConfig buildOccWriteConfig(Properties properties, 
HoodieCleanConfig cleanConfig,
+                                                ConflictResolutionStrategy 
strategy) {
+    return getConfigBuilder()
         .withCleanConfig(cleanConfig)
-        .withLockConfig(lockConfig)
+        .withLockConfig(createLockConfig(strategy))
         
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withProperties(properties)
         .build();
-    SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+  }
 
-    // Create a base commit on a file.
-    int numRecords = 200;
+  private Pair<String, HoodieTestDataGenerator> 
seedTableWithFirstCommit(SparkRDDWriteClient client,
+                                                                          int 
numRecords) throws Exception {
     String firstCommit = WriteClientTestUtils.createNewInstantTime();
-    String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(
+        new String[]{HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH});
     writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
         numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
         1, INSTANT_GENERATOR);
+    return Pair.of(firstCommit, dataGenerator);
+  }
+
+  @Test
+  public void testClusteringCommitInPresenceOfInflightCommit() throws 
Exception {
+    Properties properties = createOccProperties();
+    HoodieLockConfig lockConfig = createLockConfig(new 
PreferWriterConflictResolutionStrategy());
+    HoodieCleanConfig cleanConfig = 
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+    HoodieWriteConfig insertWriteConfig = buildOccWriteConfig(properties, 
cleanConfig,
+        new PreferWriterConflictResolutionStrategy());
+    SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+
+    // Create a base commit on a file.
+    Pair<String, HoodieTestDataGenerator> seed = 
seedTableWithFirstCommit(client, 200);
+    String firstCommit = seed.getLeft();
+    HoodieTestDataGenerator dataGenerator = seed.getRight();
 
     // Do an upsert operation without autocommit.
     String inflightCommit = WriteClientTestUtils.createNewInstantTime();
@@ -1722,25 +1726,16 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
 
   @Test
   public void testIngestionCommitInPresenceOfCompletedClusteringCommit() 
throws Exception {
-    Properties properties = getDisabledRowWriterProperties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    Properties properties = createOccProperties();
     HoodieCleanConfig cleanConfig = 
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
-    HoodieWriteConfig insertWriteConfig = getConfigBuilder()
-        .withCleanConfig(cleanConfig)
-        .withLockConfig(createLockConfig(new 
PreferWriterConflictResolutionStrategy()))
-        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
-        .withProperties(properties)
-        .build();
+    HoodieWriteConfig insertWriteConfig = buildOccWriteConfig(properties, 
cleanConfig,
+        new PreferWriterConflictResolutionStrategy());
     SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
 
     // Create a base commit on a file.
-    int numRecords = 200;
-    String firstCommit = WriteClientTestUtils.createNewInstantTime();
-    String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
-    writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
-        numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
-        1, INSTANT_GENERATOR);
+    Pair<String, HoodieTestDataGenerator> seed = 
seedTableWithFirstCommit(client, 200);
+    String firstCommit = seed.getLeft();
+    HoodieTestDataGenerator dataGenerator = seed.getRight();
 
     // Create and temporarily block a lower timestamp for ingestion.
     String inflightCommit = WriteClientTestUtils.createNewInstantTime();
@@ -1795,24 +1790,16 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
             .filterCompletedInstants().lastInstant().get().requestedTime());
 
     // Do insert-overwrite operation on the existing partitions, without 
committing the data.
-    String secondCommit = WriteClientTestUtils.createNewInstantTime();
-    WriteClientTestUtils.startCommitWithTime(client, secondCommit, 
REPLACE_COMMIT_ACTION);
-    List<HoodieRecord> records1 = dataGen.generateInserts(secondCommit, 10);
-    JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
-    HoodieWriteResult result1 = client.insertOverwrite(writeRecords1, 
secondCommit);
-    assertEquals(secondCommit, metaClient.reloadActiveTimeline()
-        .filterInflightsAndRequested().lastInstant().get().requestedTime());
+    Pair<String, HoodieWriteResult> inflightOverwrite1 = 
startInflightInsertOverwrite(client, 10);
+    String secondCommit = inflightOverwrite1.getLeft();
+    HoodieWriteResult result1 = inflightOverwrite1.getRight();
 
     // Create second writer and do another insert-overwrite operation on the 
existing partitions,
     // without committing the data.
     SparkRDDWriteClient client2 = new SparkRDDWriteClient(context, 
insertWriteConfig);
-    String thirdCommit = WriteClientTestUtils.createNewInstantTime();
-    WriteClientTestUtils.startCommitWithTime(client2, thirdCommit, 
REPLACE_COMMIT_ACTION);
-    List<HoodieRecord> records2 = dataGen.generateInserts(thirdCommit, 10);
-    JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
-    HoodieWriteResult result2 = client2.insertOverwrite(writeRecords2, 
thirdCommit);
-    assertEquals(thirdCommit, metaClient.reloadActiveTimeline()
-        .filterInflightsAndRequested().lastInstant().get().requestedTime());
+    Pair<String, HoodieWriteResult> inflightOverwrite2 = 
startInflightInsertOverwrite(client2, 10);
+    String thirdCommit = inflightOverwrite2.getLeft();
+    HoodieWriteResult result2 = inflightOverwrite2.getRight();
 
     // Complete first insert-overwrite operation.
     client.commit(secondCommit, result1.getWriteStatuses(),
@@ -1884,6 +1871,17 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     }
   }
 
+  private Pair<String, HoodieWriteResult> 
startInflightInsertOverwrite(SparkRDDWriteClient client,
+                                                                        int 
numRecords) {
+    String commitTime = WriteClientTestUtils.createNewInstantTime();
+    WriteClientTestUtils.startCommitWithTime(client, commitTime, 
REPLACE_COMMIT_ACTION);
+    List<HoodieRecord> records = dataGen.generateInserts(commitTime, 
numRecords);
+    HoodieWriteResult result = client.insertOverwrite(jsc.parallelize(records, 
1), commitTime);
+    assertEquals(commitTime, metaClient.reloadActiveTimeline()
+        .filterInflightsAndRequested().lastInstant().get().requestedTime());
+    return Pair.of(commitTime, result);
+  }
+
   protected HoodieInstant createRequestedClusterInstant(HoodieTableMetaClient 
metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws 
IOException {
     HoodieClusteringPlan clusteringPlan =
         
ClusteringUtils.createClusteringPlan(EXECUTION_STRATEGY_CLASS_NAME.defaultValue(),
 STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
@@ -1902,26 +1900,14 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
    */
   @Test
   public void testClusteringFailsOnPendingIngestionRequestedInstant() throws 
Exception {
-    Properties properties = getDisabledRowWriterProperties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    Properties properties = createOccProperties();
     HoodieCleanConfig cleanConfig = 
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+    int numRecords = 200;
 
     // Insert base data with a regular ingestion writer
-    HoodieWriteConfig insertWriteConfig = getConfigBuilder()
-        .withCleanConfig(cleanConfig)
-        .withLockConfig(createLockConfig(new 
PreferWriterConflictResolutionStrategy()))
-        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
-        .withProperties(properties)
-        .build();
-    SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
-
-    int numRecords = 200;
-    String firstCommit = WriteClientTestUtils.createNewInstantTime();
-    String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
-    writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
-        numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
-        1, INSTANT_GENERATOR);
+    SparkRDDWriteClient client = getHoodieWriteClient(
+        buildOccWriteConfig(properties, cleanConfig, new 
PreferWriterConflictResolutionStrategy()));
+    seedTableWithFirstCommit(client, numRecords);
 
     // Simulate an ingestion writer that has created a .requested commit with 
an active heartbeat
     String ingestionRequestedTime = 
WriteClientTestUtils.createNewInstantTime();


Reply via email to