This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 32a44bb  [HUDI-2970] Add test for archiving replace commit (#4345)
32a44bb is described below

commit 32a44bbe062c997b5a41266290fbe34d6323bfa6
Author: Raymond Xu <[email protected]>
AuthorDate: Mon Dec 20 21:01:59 2021 -0800

    [HUDI-2970] Add test for archiving replace commit (#4345)
---
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java | 103 +++++++++++++++++++++
 .../TestHoodieSparkMergeOnReadTableClustering.java |  12 +--
 ...HoodieSparkMergeOnReadTableIncrementalRead.java |   6 +-
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |   4 +-
 .../SparkClientFunctionalTestHarness.java          |   4 +-
 .../common/testutils/HoodieTestDataGenerator.java  |   3 +-
 6 files changed, 118 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
new file mode 100644
index 0000000..1c66023
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+import static 
org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends 
SparkClientFunctionalTestHarness {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testDeletePartitionAndArchive(boolean metadataEnabled) throws 
IOException {
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+    HoodieWriteConfig writeConfig = getConfigBuilder(true)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 
3).retainCommits(1).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
+         HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
+
+      // 1st write batch; 3 commits for 3 partitions
+      String instantTime1 = HoodieActiveTimeline.createNewInstantTime(1000);
+      client.startCommitWithTime(instantTime1);
+      
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1,
 10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1);
+      String instantTime2 = HoodieActiveTimeline.createNewInstantTime(2000);
+      client.startCommitWithTime(instantTime2);
+      
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2,
 10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2);
+      String instantTime3 = HoodieActiveTimeline.createNewInstantTime(3000);
+      client.startCommitWithTime(instantTime3);
+      
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3,
 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3);
+
+      final HoodieTimeline timeline1 = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+      assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), 
sqlContext(), timeline1, Option.empty()));
+
+      // delete the 1st and the 2nd partition; 1 replace commit
+      final String instantTime4 = 
HoodieActiveTimeline.createNewInstantTime(4000);
+      client.startCommitWithTime(instantTime4, 
HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
+
+      // 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to 
trigger archiving the replace commit
+      for (int i = 5; i < 8; i++) {
+        String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 
1000);
+        client.startCommitWithTime(instantTime);
+        
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime,
 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
+      }
+
+      // verify archived timeline
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
+      assertTrue(archivedTimeline.containsInstant(instantTime1));
+      assertTrue(archivedTimeline.containsInstant(instantTime2));
+      assertTrue(archivedTimeline.containsInstant(instantTime3));
+      assertTrue(archivedTimeline.containsInstant(instantTime4), "should 
contain the replace commit.");
+
+      // verify records
+      final HoodieTimeline timeline2 = 
metaClient.getCommitTimeline().filterCompletedInstants();
+      assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), 
sqlContext(), timeline2, Option.empty()),
+          "should only have the 4 records from the 3rd partition.");
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
index f0ece84..a0ec0de 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -113,7 +113,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
       client.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
-      Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, 
records.subList(0, 200), client, cfg, newCommitTime);
+      Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, 
records.subList(0, 200), client, cfg, newCommitTime);
       assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
 
       /*
@@ -122,7 +122,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
       // we already set small file size to small number to force inserts to go 
into new file.
       newCommitTime = "002";
       client.startCommitWithTime(newCommitTime);
-      dataFiles = insertRecords(metaClient, records.subList(200, 400), client, 
cfg, newCommitTime);
+      dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 
400), client, cfg, newCommitTime);
       assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
 
       if (doUpdates) {
@@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
         newCommitTime = "003";
         client.startCommitWithTime(newCommitTime);
         records = dataGen.generateUpdates(newCommitTime, 100);
-        updateRecords(metaClient, records, client, cfg, newCommitTime);
+        updateRecordsInMORTable(metaClient, records, client, cfg, 
newCommitTime);
       }
 
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
@@ -190,18 +190,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
       String newCommitTime = "001";
       client.startCommitWithTime(newCommitTime);
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
-      Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, 
records.subList(0, 200), client, cfg, newCommitTime);
+      Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, 
records.subList(0, 200), client, cfg, newCommitTime);
       assertTrue(!dataFiles.findAny().isPresent(), "should not have any base 
files");
       newCommitTime = "002";
       client.startCommitWithTime(newCommitTime);
-      dataFiles = insertRecords(metaClient, records.subList(200, 400), client, 
cfg, newCommitTime);
+      dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 
400), client, cfg, newCommitTime);
       assertTrue(!dataFiles.findAny().isPresent(), "should not have any base 
files");
       // run updates
       if (doUpdates) {
         newCommitTime = "003";
         client.startCommitWithTime(newCommitTime);
         records = dataGen.generateUpdates(newCommitTime, 100);
-        updateRecords(metaClient, records, client, cfg, newCommitTime);
+        updateRecordsInMORTable(metaClient, records, client, cfg, 
newCommitTime);
       }
 
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index db55d36..c80374b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -96,7 +96,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead 
extends SparkClientF
       client.startCommitWithTime(commitTime1);
 
       List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 
200);
-      Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001, 
client, cfg, commitTime1);
+      Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, 
records001, client, cfg, commitTime1);
       assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
 
       // verify only one base file shows up with commit time 001
@@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead 
extends SparkClientF
       String updateTime = "004";
       client.startCommitWithTime(updateTime);
       List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
-      updateRecords(metaClient, records004, client, cfg, updateTime);
+      updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime);
 
       // verify RO incremental reads - only one base file shows up because 
updates to into log files
       incrementalROFiles = getROIncrementalFiles(partitionPath, false);
@@ -145,7 +145,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead 
extends SparkClientF
       String insertsTime = "006";
       List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 
200);
       client.startCommitWithTime(insertsTime);
-      dataFiles = insertRecords(metaClient, records006, client, cfg, 
insertsTime);
+      dataFiles = insertRecordsToMORTable(metaClient, records006, client, cfg, 
insertsTime);
       assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
 
       // verify new write shows up in snapshot mode even though there is 
pending compaction
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 254d757..62ce007 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -95,7 +95,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       client.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
-      Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records, 
client, cfg, newCommitTime);
+      Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, 
records, client, cfg, newCommitTime);
       assertTrue(dataFiles.findAny().isPresent(), "should list the base files 
we wrote in the delta commit");
 
       /*
@@ -104,7 +104,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       newCommitTime = "004";
       client.startCommitWithTime(newCommitTime);
       records = dataGen.generateUpdates(newCommitTime, 100);
-      updateRecords(metaClient, records, client, cfg, newCommitTime);
+      updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
 
       String compactionCommitTime = 
client.scheduleCompaction(Option.empty()).get().toString();
       client.compact(compactionCommitTime);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 88c4c13..1ecdd33 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -213,7 +213,7 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
         index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
   }
 
-  protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient 
metaClient, List<HoodieRecord> records,
+  protected Stream<HoodieBaseFile> 
insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> 
records,
                                                  SparkRDDWriteClient client, 
HoodieWriteConfig cfg, String commitTime) throws IOException {
     HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(metaClient);
 
@@ -242,7 +242,7 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
     return dataFilesToRead;
   }
 
-  protected void updateRecords(HoodieTableMetaClient metaClient, 
List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, 
String commitTime) throws IOException {
+  protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, 
List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, 
String commitTime) throws IOException {
     HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(metaClient);
 
     Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 5bf629e..21816a5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -76,7 +76,7 @@ import java.util.stream.Stream;
  * <p>
  * Test data uses a toy Uber trips, data model.
  */
-public class HoodieTestDataGenerator {
+public class HoodieTestDataGenerator implements AutoCloseable {
 
   // based on examination of sample file, the schema produces the following 
per record size
   public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
@@ -860,6 +860,7 @@ public class HoodieTestDataGenerator {
     public String partitionPath;
   }
 
+  @Override
   public void close() {
     existingKeysBySchema.clear();
   }

Reply via email to