nsivabalan commented on code in PR #13653:
URL: https://github.com/apache/hudi/pull/13653#discussion_r2248186882


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java:
##########
@@ -49,13 +53,15 @@ public static void deleteSavepoint(HoodieTable table, 
String savepointTime) {
   public static void validateSavepointRestore(HoodieTable table, String 
savepointTime) {
     // Make sure the restore was successful
     table.getMetaClient().reloadActiveTimeline();
-    Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+    Option<HoodieInstant> lastInstant = 
Option.fromJavaOptional(table.getActiveTimeline()

Review Comment:
   minor. can we add java docs to L52 to call out what are we looking to 
validate



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java:
##########
@@ -79,6 +79,15 @@ public CompletionTimeBasedComparator(Map<String, String> 
comparableActions) {
 
     @Override
     public int compare(HoodieInstant instant1, HoodieInstant instant2) {
+      if (instant1.getCompletionTime() == null && instant2.getCompletionTime() 
!= null) {

Review Comment:
   did we add UTs for this?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
 
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
-      assertRowNumberEqualsTo(20);
+      assertEquals(commitToRowCount, getRecordCountPerCommit());
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
     }
   }
 
-  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> 
baseRecordsToUpdate) throws IOException {
+  @Test
+  void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      // Run compaction while delta-commit is in-flight
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed after compaction");
+
+      // write one more commit
+      String newCommitTime = client.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      // ensure the compaction instant is still present because it was 
completed before the target of the restore
+      
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+    }
+  }
+
+  @Test
+  void rollbackWithAsyncServices_commitCompletesDuringCompaction() {

Review Comment:
   same here. parametrize w/ both 6 and 8 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java:
##########
@@ -66,4 +72,28 @@ public static void validateSavepointPresence(HoodieTable 
table, String savepoint
       throw new HoodieRollbackException("No savepoint for instantTime " + 
savepointTime);
     }
   }
+
+  private static class SavepointInstantComparator implements 
Comparator<HoodieInstant> {
+    private final boolean tableVersion8OrLater;
+    private final InstantComparator instantComparator;
+
+    public SavepointInstantComparator(boolean tableVersion8OrLater, 
InstantComparator instantComparator) {
+      this.tableVersion8OrLater = tableVersion8OrLater;
+      this.instantComparator = instantComparator;
+    }
+
+    @Override
+    public int compare(HoodieInstant o1, HoodieInstant o2) {
+      if (tableVersion8OrLater) {
+        return instantComparator.completionTimeOrderedComparator().compare(o1, 
o2);
+      } else {
+        // Do to special handling of compaction instants, we need to use 
requested time based comparator for compaction instants but completion time 
based comparator for others
+        if (o1.getAction().equals(HoodieTimeline.COMMIT_ACTION) || 
o2.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {

Review Comment:
   yes. this is not intuitive to understand. can we add some simple 
illustration. 
   t1.dc,.... t2.dc, t11.compaction.req, t12.dc, t11.commit, t13.dc ... dc15. 
   If we are looking to restore to t12, and as we ordering the commits to 
rollback based on completion time, we would rollback t11 compaction as well 
(since t11 completed after t12 completed). but we can't do that. and hence the 
special handling. 
   
   
   but trying to understand why this special handling is not required for table 
8 and above. 
   how are we handling this case for v8 table w/o special handling. 
   From https://github.com/apache/hudi/pull/13653/files#r2246930654, I only see 
we account for completed instant time or requested instant time. 
   
   
   
   



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
 
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
-      assertRowNumberEqualsTo(20);
+      assertEquals(commitToRowCount, getRecordCountPerCommit());
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
     }
   }
 
-  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> 
baseRecordsToUpdate) throws IOException {
+  @Test
+  void rollbackWithAsyncServices_compactionCompletesDuringCommit() {

Review Comment:
   should we parametrize this for version 6 and 8 ? 
   w/ all special handling we are doing in restore, its worth adding tests for 
both table versions. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
     return path.substring(path.indexOf(":") + 1);
   }
 
-  private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
-                                                         String 
basefileExtension,
-                                                         String partitionPath,
-                                                         HoodieStorage 
storage) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+  private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+                                                     String basefileExtension,
+                                                     String partitionPath,
+                                                     HoodieStorage storage) 
throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path {} and 
commit {}", partitionPath, commit);
     StoragePathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
       }
       return false;
     };
-    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
+    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), 
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
   }
 
-  private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
-                                                      String partitionPath, 
String basePath,
-                                                      String 
baseFileExtension, HoodieStorage storage,
-                                                      
Option<HoodieCommitMetadata> commitMetadataOptional,
-                                                      Boolean 
isCommitMetadataCompleted,
-                                                      HoodieTableType 
tableType) throws IOException {
-    // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
-    if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
-      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, storage);
+  private List<StoragePath> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
+                                                  String partitionPath, String 
basePath,
+                                                  String baseFileExtension, 
HoodieStorage storage,
+                                                  Option<HoodieCommitMetadata> 
commitMetadataOptional,
+                                                  boolean 
isCommitMetadataCompleted,
+                                                  HoodieTableType tableType,
+                                                  HoodieTableVersion 
tableVersion) throws IOException {
+    // for MOR tables with version < 8, listing is required to fetch the log 
files associated with base files added by this commit.
+    if (isCommitMetadataCompleted && (tableType == 
HoodieTableType.COPY_ON_WRITE || 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {

Review Comment:
   good catch. this can only work in table version 8 and above, if we ordering 
the commits based on completion time. 
   guess that fix simplified this. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
     return path.substring(path.indexOf(":") + 1);
   }
 
-  private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
-                                                         String 
basefileExtension,
-                                                         String partitionPath,
-                                                         HoodieStorage 
storage) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+  private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+                                                     String basefileExtension,
+                                                     String partitionPath,
+                                                     HoodieStorage storage) 
throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path {} and 
commit {}", partitionPath, commit);
     StoragePathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
       }
       return false;
     };
-    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
+    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), 
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
   }
 
-  private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
-                                                      String partitionPath, 
String basePath,
-                                                      String 
baseFileExtension, HoodieStorage storage,
-                                                      
Option<HoodieCommitMetadata> commitMetadataOptional,
-                                                      Boolean 
isCommitMetadataCompleted,
-                                                      HoodieTableType 
tableType) throws IOException {
-    // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
-    if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
-      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, storage);
+  private List<StoragePath> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
+                                                  String partitionPath, String 
basePath,
+                                                  String baseFileExtension, 
HoodieStorage storage,
+                                                  Option<HoodieCommitMetadata> 
commitMetadataOptional,
+                                                  boolean 
isCommitMetadataCompleted,
+                                                  HoodieTableType tableType,
+                                                  HoodieTableVersion 
tableVersion) throws IOException {
+    // for MOR tables with version < 8, listing is required to fetch the log 
files associated with base files added by this commit.
+    if (isCommitMetadataCompleted && (tableType == 
HoodieTableType.COPY_ON_WRITE || 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
+      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(), baseFileExtension);
     } else {
       return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, storage);
     }
   }
 
-  private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
-                                                             String 
partitionPath,
-                                                             String basePath,
-                                                             
HoodieCommitMetadata commitMetadata,
-                                                             String 
baseFileExtension,
-                                                             HoodieStorage 
storage) throws IOException {
+  private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
+                                                         String partitionPath,
+                                                         String basePath,
+                                                         HoodieCommitMetadata 
commitMetadata,
+                                                         String 
baseFileExtension) {
     StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
         instantToRollback.requestedTime());
-    List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, 
commitMetadata, partitionPath)
-        .filter(entry -> {
-          try {
-            return storage.exists(entry);
-          } catch (Exception e) {
-            LOG.error("Exists check failed for " + entry.toString(), e);
-          }
-          // if any Exception is thrown, do not ignore. let's try to add the 
file of interest to be deleted. we can't miss any files to be rolled back.
-          return true;
-        }).collect(Collectors.toList());
-
-    return storage.listDirectEntries(filePaths, pathFilter);
+    return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)

Review Comment:
   yeah. our rollback execution should be fine even if file does not exist. we 
are good to remove this.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
 
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
-      assertRowNumberEqualsTo(20);
+      assertEquals(commitToRowCount, getRecordCountPerCommit());
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
     }
   }
 
-  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> 
baseRecordsToUpdate) throws IOException {
+  @Test
+  void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      // Run compaction while delta-commit is in-flight
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed after compaction");
+
+      // write one more commit
+      String newCommitTime = client.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      // ensure the compaction instant is still present because it was 
completed before the target of the restore
+      
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+    }
+  }
+
+  @Test
+  void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+      // commit the compaction instant after the delta commit
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // write one more commit
+      String newCommitTime = writeClient.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      // ensure the compaction instant is not present because it was completed 
after the target of the restore
+      
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+  void 
rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTableVersion
 tableVersion) {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(tableVersion);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      // Schedule a compaction
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      // Start a delta commit
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+      // commit the compaction instant after the delta commit
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // write one more commit
+      String newCommitTime = writeClient.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      boolean compactionIsPresent = 
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get()));
+      if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+        // ensure the compaction instant is not present because it was 
completed after the target of the restore
+        assertFalse(compactionIsPresent);
+      } else {
+        // for versions before 8, the compaction instant is not removed 
because the log files of the savepoint instant are associated with this 
compaction instant
+        assertTrue(compactionIsPresent);
+      }
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+  void testMissingFileDoesNotFallRestore(HoodieTableVersion tableVersion) 
throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4)
+        .withInlineCompaction(false)
+        .compactionSmallFileSize(0).build(), tableVersion);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // establish base files
+      String firstCommitTime = client.startCommit();
+      int numRecords = 10;
+      List<HoodieRecord> initialRecords = 
dataGen.generateInserts(firstCommitTime, numRecords);
+      client.commit(firstCommitTime, 
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+      // add updates that go to log files
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> updatedRecords = 
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+      client.commit(secondCommitTime, 
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+      client.savepoint(secondCommitTime, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // add a third delta commit with log and new base files
+      String thirdCommitTime = client.startCommit();
+      List<HoodieRecord> upsertRecords = 
Stream.concat(dataGen.generateUniqueUpdates(thirdCommitTime, 
numRecords).stream(),
+            dataGen.generateInserts(thirdCommitTime, 
numRecords).stream()).collect(Collectors.toList());
+      List<WriteStatus> writeStatuses = 
client.upsert(jsc.parallelize(upsertRecords, 1), thirdCommitTime).collect();
+      client.commit(thirdCommitTime, jsc.parallelize(writeStatuses, 1));
+
+      // delete one base file and one log file to validate both cases are 
handled gracefully
+      boolean deletedLogFile = false;
+      boolean deletedBaseFile = false;
+      for (WriteStatus writeStatus : writeStatuses) {
+        StoragePath path = FSUtils.constructAbsolutePath(basePath, 
writeStatus.getStat().getPath());
+        if (deletedLogFile && deletedBaseFile) {
+          break;
+        }
+        if (FSUtils.isLogFile(path)) {
+          deletedLogFile = true;
+          storage.deleteFile(path);
+        } else {
+          deletedBaseFile = true;
+          storage.deleteFile(path);
+        }
+      }
+      client.restoreToSavepoint(secondCommitTime);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(secondCommitTime, numRecords), 
getRecordCountPerCommit());
+    }
+  }
+
+  private Map<String, Integer> writeInitialCommitsForAsyncServicesTests(int 
numRecords) {
+    Map<String, Integer> commitToRowCount = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      String newCommitTime = writeClient.startCommit();
+      List<HoodieRecord> records = i == 0 ? 
dataGen.generateInserts(newCommitTime, numRecords) : 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      JavaRDD<WriteStatus> writeStatus = i == 0 ? 
writeClient.insert(jsc.parallelize(records, 1), newCommitTime) : 
writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+      writeClient.commit(newCommitTime, writeStatus);
+      if (i == 2) {
+        commitToRowCount.put(newCommitTime, numRecords);
+      }
+    }
+    return commitToRowCount;
+  }
+
+  private HoodieWriteConfig 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion 
tableVersion) {
+    HoodieWriteConfig config = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(false)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
+        
.withProps(Collections.singletonMap(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
 "0"))
+        .build();
+    try {
+      initMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+      return config;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to initialize HoodieTableMetaClient", 
e);
+    }
+  }
+
+  private void validateFilesMetadata(HoodieWriteConfig writeConfig) {

Review Comment:
   this validation def gives us good confidence now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to