nsivabalan commented on code in PR #7605:
URL: https://github.com/apache/hudi/pull/7605#discussion_r1063633199


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -740,4 +741,117 @@ public void 
testFallbackToListingBasedRollbackForCompletedInstant() throws Excep
       assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
     }
   }
+
+  @Test
+  public void testSavepointAndRollbackFailThenSucceed() throws Exception {
+    HoodieWriteConfig cfg = 
getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
+        
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+
+      /**
+       * Write 1 (only inserts)
+       */
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      /**
+       * Write 2 (updates)
+       */
+      newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+
+      records = dataGen.generateUpdates(newCommitTime, records);
+      statuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      // Verify there are no errors
+      assertNoWriteErrors(statuses);
+
+      client.savepoint("hoodie-unit-test", "test");
+
+      /**
+       * Write 3 (updates)
+       */
+      newCommitTime = "003";
+      client.startCommitWithTime(newCommitTime);
+
+      records = dataGen.generateUpdates(newCommitTime, records);
+      statuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      // Verify there are no errors
+      assertNoWriteErrors(statuses);
+      HoodieWriteConfig config = getConfig();
+      List<String> partitionPaths =
+          FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), 
cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
+      final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
+
+      List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
+        return view1.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("003"));
+      }).collect(Collectors.toList());
+      assertEquals(3, dataFiles.size(), "The data files for commit 003 should 
be present");
+
+      dataFiles = partitionPaths.stream().flatMap(s -> {
+        return view1.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("002"));
+      }).collect(Collectors.toList());
+      assertEquals(3, dataFiles.size(), "The data files for commit 002 should 
be present");
+
+      /**
+       * Write 4 (updates)
+       */
+      newCommitTime = "004";
+      client.startCommitWithTime(newCommitTime);
+
+      records = dataGen.generateUpdates(newCommitTime, records);
+      statuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      // Verify there are no errors
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      table = HoodieSparkTable.create(getConfig(), context, metaClient);
+      final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
+
+      dataFiles = partitionPaths.stream().flatMap(s -> 
view2.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("004"))).collect(Collectors.toList());
+      assertEquals(3, dataFiles.size(), "The data files for commit 004 should 
be present");
+
+      // rolling back to a non existent savepoint must not succeed
+      assertThrows(HoodieRollbackException.class, () -> {
+        client.restoreToSavepoint("001");
+      }, "Rolling back to non-existent savepoint should not be allowed");
+
+      // rollback to savepoint 002
+      HoodieInstant savepoint = 
table.getCompletedSavepointTimeline().getInstantsAsStream().findFirst().get();
+      client.restoreToSavepoint(savepoint.getTimestamp());
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      table = HoodieSparkTable.create(getConfig(), context, metaClient);
+      final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
+      dataFiles = partitionPaths.stream().flatMap(s -> 
view3.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("002"))).collect(Collectors.toList());
+      assertEquals(3, dataFiles.size(), "The data files for commit 002 be 
available");
+
+      dataFiles = partitionPaths.stream().flatMap(s -> 
view3.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("003"))).collect(Collectors.toList());
+      assertEquals(0, dataFiles.size(), "The data files for commit 003 should 
be rolled back");
+
+      dataFiles = partitionPaths.stream().flatMap(s -> 
view3.getAllBaseFiles(s).filter(f -> 
f.getCommitTime().equals("004"))).collect(Collectors.toList());
+      assertEquals(0, dataFiles.size(), "The data files for commit 004 should 
be rolled back");
+
+      //Delete restore commit from timeline
+      HoodieInstant inst =  
table.getActiveTimeline().getRestoreTimeline().getInstants().get(0);
+      File instFile = new 
File(table.getMetaClient().getBasePathV2().toString() + "/.hoodie/" +  
inst.getFileName());
+      instFile.delete();
+

Review Comment:
   reload MetaClient. and then re-instantiate new SparkRddWriteclient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to