This is an automated email from the ASF dual-hosted git repository.
satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9cb6cb8 [HUDI-1266] Add unit test for validating replacecommit
rollback (#2418)
9cb6cb8 is described below
commit 9cb6cb818968d3ef10b3264714e30b8df957ec9d
Author: satishkotha <[email protected]>
AuthorDate: Fri Jan 29 10:28:08 2021 -0800
[HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
---
.../rollback/HoodieClientRollbackTestBase.java | 62 +++++++++++++++++++++-
.../TestCopyOnWriteRollbackActionExecutor.java | 22 +++++++-
2 files changed, 81 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
index eb0e871..3b0829b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -18,23 +18,26 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
-
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
@@ -96,4 +99,61 @@ public class HoodieClientRollbackTestBase extends
HoodieClientTestBase {
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
}
+
+ protected void insertOverwriteCommitDataWithTwoPartitions(List<FileSlice>
firstPartitionCommit2FileSlices,
+ List<FileSlice>
secondPartitionCommit2FileSlices,
+ HoodieWriteConfig
cfg,
+ boolean
commitSecondInsertOverwrite) throws IOException {
+ //just generate two partitions
+ dataGen = new HoodieTestDataGenerator(new
String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+ HoodieTestDataGenerator.writePartitionMetadata(fs, new
String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH},
basePath);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ /**
+ * Write 1 (upsert)
+ */
+ String newCommitTime = "001";
+ List<HoodieRecord> records =
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+ Assertions.assertNoWriteErrors(statuses.collect());
+ client.commit(newCommitTime, statuses);
+
+ // get fileIds written
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ SyncableFileSystemView fsView =
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+ List<HoodieFileGroup> firstPartitionCommit1FileGroups =
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, firstPartitionCommit1FileGroups.size());
+ Set<String> partition1Commit1FileIds =
firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+ List<HoodieFileGroup> secondPartitionCommit1FileGroups =
fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, secondPartitionCommit1FileGroups.size());
+ Set<String> partition2Commit1FileIds =
secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+
+ /**
+ * Write 2 (one insert_overwrite)
+ */
+ String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION;
+ newCommitTime = "002";
+ records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+ writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime, commitActionType);
+ HoodieWriteResult result = client.insertOverwrite(writeRecords,
newCommitTime);
+ statuses = result.getWriteStatuses();
+ Assertions.assertNoWriteErrors(statuses.collect());
+ if (commitSecondInsertOverwrite) {
+ client.commit(newCommitTime, statuses, Option.empty(), commitActionType,
result.getPartitionToReplaceFileIds());
+ }
+ metaClient.reloadActiveTimeline();
+ // get new fileIds written as part of insert_overwrite
+ fsView = getFileSystemViewWithUnCommittedSlices(metaClient);
+ List<HoodieFileGroup> firstPartitionCommit2FileGroups =
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH)
+ .filter(fg ->
!partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+ List<HoodieFileGroup> secondPartitionCommit2FileGroups =
fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH)
+ .filter(fg ->
!partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
+ assertEquals(1, firstPartitionCommit2FileSlices.size());
+ assertEquals(1, secondPartitionCommit2FileSlices.size());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e14dbf9..030cc3e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -125,6 +124,19 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
}
+ // Verify that rollback works with replacecommit
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCopyOnWriteRollbackWithReplaceCommits(boolean
isUsingMarkers) throws IOException {
+ //1. prepare data and assert data result
+ List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+ List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+ HoodieWriteConfig cfg =
getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+
this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices,
secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ performRollbackAndValidate(isUsingMarkers, cfg, table,
firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers)
throws IOException {
@@ -133,8 +145,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg =
getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices,
secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+ metaClient.reloadActiveTimeline();
HoodieTable table = this.getHoodieTable(metaClient, cfg);
-
+ performRollbackAndValidate(isUsingMarkers, cfg, table,
firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+ }
+
+ private void performRollbackAndValidate(boolean isUsingMarkers,
HoodieWriteConfig cfg, HoodieTable table,
+ List<FileSlice>
firstPartitionCommit2FileSlices,
+ List<FileSlice>
secondPartitionCommit2FileSlices) throws IOException {
//2. rollback
HoodieInstant commitInstant;
if (isUsingMarkers) {