nsivabalan commented on code in PR #7605:
URL: https://github.com/apache/hudi/pull/7605#discussion_r1063633199
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -740,4 +741,117 @@ public void
testFallbackToListingBasedRollbackForCompletedInstant() throws Excep
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
}
}
+
+ @Test
+ public void testSavepointAndRollbackFailThenSucceed() throws Exception {
+ HoodieWriteConfig cfg =
getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs,
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+
+ /**
+ * Write 1 (only inserts)
+ */
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ /**
+ * Write 2 (updates)
+ */
+ newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+
+ records = dataGen.generateUpdates(newCommitTime, records);
+ statuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+
+ client.savepoint("hoodie-unit-test", "test");
+
+ /**
+ * Write 3 (updates)
+ */
+ newCommitTime = "003";
+ client.startCommitWithTime(newCommitTime);
+
+ records = dataGen.generateUpdates(newCommitTime, records);
+ statuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+ HoodieWriteConfig config = getConfig();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
+
+ List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
+ return view1.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("003"));
+ }).collect(Collectors.toList());
+ assertEquals(3, dataFiles.size(), "The data files for commit 003 should
be present");
+
+ dataFiles = partitionPaths.stream().flatMap(s -> {
+ return view1.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("002"));
+ }).collect(Collectors.toList());
+ assertEquals(3, dataFiles.size(), "The data files for commit 002 should
be present");
+
+ /**
+ * Write 4 (updates)
+ */
+ newCommitTime = "004";
+ client.startCommitWithTime(newCommitTime);
+
+ records = dataGen.generateUpdates(newCommitTime, records);
+ statuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieSparkTable.create(getConfig(), context, metaClient);
+ final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
+
+ dataFiles = partitionPaths.stream().flatMap(s ->
view2.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("004"))).collect(Collectors.toList());
+ assertEquals(3, dataFiles.size(), "The data files for commit 004 should
be present");
+
+ // rolling back to a non existent savepoint must not succeed
+ assertThrows(HoodieRollbackException.class, () -> {
+ client.restoreToSavepoint("001");
+ }, "Rolling back to non-existent savepoint should not be allowed");
+
+ // rollback to savepoint 002
+ HoodieInstant savepoint =
table.getCompletedSavepointTimeline().getInstantsAsStream().findFirst().get();
+ client.restoreToSavepoint(savepoint.getTimestamp());
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieSparkTable.create(getConfig(), context, metaClient);
+ final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
+ dataFiles = partitionPaths.stream().flatMap(s ->
view3.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("002"))).collect(Collectors.toList());
+ assertEquals(3, dataFiles.size(), "The data files for commit 002 be
available");
+
+ dataFiles = partitionPaths.stream().flatMap(s ->
view3.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("003"))).collect(Collectors.toList());
+ assertEquals(0, dataFiles.size(), "The data files for commit 003 should
be rolled back");
+
+ dataFiles = partitionPaths.stream().flatMap(s ->
view3.getAllBaseFiles(s).filter(f ->
f.getCommitTime().equals("004"))).collect(Collectors.toList());
+ assertEquals(0, dataFiles.size(), "The data files for commit 004 should
be rolled back");
+
+ //Delete restore commit from timeline
+ HoodieInstant inst =
table.getActiveTimeline().getRestoreTimeline().getInstants().get(0);
+ File instFile = new
File(table.getMetaClient().getBasePathV2().toString() + "/.hoodie/" +
inst.getFileName());
+ instFile.delete();
+
Review Comment:
reload MetaClient. and then re-instantiate new SparkRddWriteclient
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]