This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 32a44bb [HUDI-2970] Add test for archiving replace commit (#4345)
32a44bb is described below
commit 32a44bbe062c997b5a41266290fbe34d6323bfa6
Author: Raymond Xu <[email protected]>
AuthorDate: Mon Dec 20 21:01:59 2021 -0800
[HUDI-2970] Add test for archiving replace commit (#4345)
---
...dieSparkCopyOnWriteTableArchiveWithReplace.java | 103 +++++++++++++++++++++
.../TestHoodieSparkMergeOnReadTableClustering.java | 12 +--
...HoodieSparkMergeOnReadTableIncrementalRead.java | 6 +-
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 4 +-
.../SparkClientFunctionalTestHarness.java | 4 +-
.../common/testutils/HoodieTestDataGenerator.java | 3 +-
6 files changed, 118 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
new file mode 100644
index 0000000..1c66023
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+import static
org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends
SparkClientFunctionalTestHarness {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws
IOException {
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ HoodieWriteConfig writeConfig = getConfigBuilder(true)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,
3).retainCommits(1).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
+ HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
+
+ // 1st write batch; 3 commits for 3 partitions
+ String instantTime1 = HoodieActiveTimeline.createNewInstantTime(1000);
+ client.startCommitWithTime(instantTime1);
+
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1,
10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1);
+ String instantTime2 = HoodieActiveTimeline.createNewInstantTime(2000);
+ client.startCommitWithTime(instantTime2);
+
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2,
10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2);
+ String instantTime3 = HoodieActiveTimeline.createNewInstantTime(3000);
+ client.startCommitWithTime(instantTime3);
+
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3,
1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3);
+
+ final HoodieTimeline timeline1 =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ assertEquals(21, countRecordsOptionallySince(jsc(), basePath(),
sqlContext(), timeline1, Option.empty()));
+
+ // delete the 1st and the 2nd partition; 1 replace commit
+ final String instantTime4 =
HoodieActiveTimeline.createNewInstantTime(4000);
+ client.startCommitWithTime(instantTime4,
HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
+
+ // 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to
trigger archiving the replace commit
+ for (int i = 5; i < 8; i++) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime(i *
1000);
+ client.startCommitWithTime(instantTime);
+
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime,
1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
+ }
+
+ // verify archived timeline
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
+ assertTrue(archivedTimeline.containsInstant(instantTime1));
+ assertTrue(archivedTimeline.containsInstant(instantTime2));
+ assertTrue(archivedTimeline.containsInstant(instantTime3));
+ assertTrue(archivedTimeline.containsInstant(instantTime4), "should
contain the replace commit.");
+
+ // verify records
+ final HoodieTimeline timeline2 =
metaClient.getCommitTimeline().filterCompletedInstants();
+ assertEquals(4, countRecordsOptionallySince(jsc(), basePath(),
sqlContext(), timeline2, Option.empty()),
+ "should only have the 4 records from the 3rd partition.");
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
index f0ece84..a0ec0de 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -113,7 +113,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
- Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient,
records.subList(0, 200), client, cfg, newCommitTime);
+ Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient,
records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
/*
@@ -122,7 +122,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
// we already set small file size to small number to force inserts to go
into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
- dataFiles = insertRecords(metaClient, records.subList(200, 400), client,
cfg, newCommitTime);
+ dataFiles = insertRecordsToMORTable(metaClient, records.subList(200,
400), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
if (doUpdates) {
@@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
- updateRecords(metaClient, records, client, cfg, newCommitTime);
+ updateRecordsInMORTable(metaClient, records, client, cfg,
newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
@@ -190,18 +190,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
- Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient,
records.subList(0, 200), client, cfg, newCommitTime);
+ Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient,
records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base
files");
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
- dataFiles = insertRecords(metaClient, records.subList(200, 400), client,
cfg, newCommitTime);
+ dataFiles = insertRecordsToMORTable(metaClient, records.subList(200,
400), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base
files");
// run updates
if (doUpdates) {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
- updateRecords(metaClient, records, client, cfg, newCommitTime);
+ updateRecordsInMORTable(metaClient, records, client, cfg,
newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index db55d36..c80374b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -96,7 +96,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead
extends SparkClientF
client.startCommitWithTime(commitTime1);
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1,
200);
- Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001,
client, cfg, commitTime1);
+ Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient,
records001, client, cfg, commitTime1);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
// verify only one base file shows up with commit time 001
@@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead
extends SparkClientF
String updateTime = "004";
client.startCommitWithTime(updateTime);
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
- updateRecords(metaClient, records004, client, cfg, updateTime);
+ updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime);
// verify RO incremental reads - only one base file shows up because
updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
@@ -145,7 +145,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead
extends SparkClientF
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime,
200);
client.startCommitWithTime(insertsTime);
- dataFiles = insertRecords(metaClient, records006, client, cfg,
insertsTime);
+ dataFiles = insertRecordsToMORTable(metaClient, records006, client, cfg,
insertsTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
// verify new write shows up in snapshot mode even though there is
pending compaction
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 254d757..62ce007 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -95,7 +95,7 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
- Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records,
client, cfg, newCommitTime);
+ Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient,
records, client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files
we wrote in the delta commit");
/*
@@ -104,7 +104,7 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
- updateRecords(metaClient, records, client, cfg, newCommitTime);
+ updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
String compactionCommitTime =
client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 88c4c13..1ecdd33 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -213,7 +213,7 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
}
- protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient
metaClient, List<HoodieRecord> records,
+ protected Stream<HoodieBaseFile>
insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord>
records,
SparkRDDWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(metaClient);
@@ -242,7 +242,7 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
return dataFilesToRead;
}
- protected void updateRecords(HoodieTableMetaClient metaClient,
List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg,
String commitTime) throws IOException {
+ protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient,
List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg,
String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(metaClient);
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 5bf629e..21816a5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -76,7 +76,7 @@ import java.util.stream.Stream;
* <p>
* Test data uses a toy Uber trips, data model.
*/
-public class HoodieTestDataGenerator {
+public class HoodieTestDataGenerator implements AutoCloseable {
// based on examination of sample file, the schema produces the following
per record size
public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
@@ -860,6 +860,7 @@ public class HoodieTestDataGenerator {
public String partitionPath;
}
+ @Override
public void close() {
existingKeysBySchema.clear();
}