This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 31dbcfe0896d [HUDI-9667] Incorporate completion time into restore 
logic (#13653)
31dbcfe0896d is described below

commit 31dbcfe0896d91d5e753873bb6bbb3f4169fc72f
Author: Tim Brown <[email protected]>
AuthorDate: Mon Aug 4 05:31:17 2025 -0400

    [HUDI-9667] Incorporate completion time into restore logic (#13653)
    
    * fix restore sequence to be in completion reverse order, still requested 
time comparison for compaction
    * add a custom comparator for the restore instant sort
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../rollback/ListingBasedRollbackStrategy.java     | 109 ++---
 .../rollback/RestoreInstantComparatorFactory.java  |  61 +++
 .../action/rollback/RestorePlanActionExecutor.java |  35 +-
 .../hudi/table/action/rollback/RollbackHelper.java |   5 +-
 .../table/action/savepoint/SavepointHelpers.java   |  16 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   9 +-
 .../common/table/timeline/BaseHoodieTimeline.java  |  10 +-
 .../table/timeline/CompletionTimeQueryView.java    |   4 +-
 .../hudi/common/table/timeline/HoodieTimeline.java |   8 +-
 .../common/table/timeline/InstantComparator.java   |   2 +-
 .../versioning/common/InstantComparators.java      |   9 +
 .../table/timeline/TestInstantComparators.java     |  56 +++
 .../hudi/common/testutils/HoodieTestUtils.java     |   2 +-
 .../hudi/functional/TestHoodieFileSystemViews.java |  12 +-
 .../TestSavepointRestoreMergeOnRead.java           | 481 +++++++++++++++++++--
 .../hudi/functional/TestRecordLevelIndex.scala     |  40 +-
 16 files changed, 695 insertions(+), 164 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 28905f75f75d..9a5f2e3fc64d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
@@ -122,11 +121,10 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       return context.flatMap(partitionPaths, partitionPath -> {
         List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
 
-        Supplier<List<StoragePathInfo>> filesToDelete = () -> {
+        Supplier<List<StoragePath>> filesToDelete = () -> {
           try {
             return fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath().toString(), baseFileExtension,
-                metaClient.getStorage(),
-                commitMetadataOptional, isCommitMetadataCompleted, tableType);
+                metaClient.getStorage(), commitMetadataOptional, 
isCommitMetadataCompleted, tableType, 
metaClient.getTableConfig().getTableVersion());
           } catch (IOException e) {
             throw new HoodieIOException("Fetching files to delete error", e);
           }
@@ -164,11 +162,10 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
               } else {
                 // if this is part of a restore operation, we should 
rollback/delete entire file slice.
                 // For table version 6, the files can be directly fetched from 
the instant to rollback
-                // For table version 8, the files are computed based on 
completion time. All files completed after
-                // the requested time of instant to rollback are included
-                
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, 
isTableVersionLessThanEight ? filesToDelete.get() :
-                    listAllFilesSinceCommit(instantToRollback.requestedTime(), 
baseFileExtension, partitionPath,
-                        metaClient)));
+                // For table version 8, the log files are not directly 
associated with the base file.
+                // The rollback will iterate in reverse order based on 
completion time so the log files completed
+                // after the compaction will already be queued for removal and 
therefore, only the files from the compaction commit must be deleted.
+                
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, 
filesToDelete.get()));
               }
               break;
             case HoodieTimeline.DELTA_COMMIT_ACTION:
@@ -280,32 +277,11 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     return hoodieRollbackRequests;
   }
 
-  private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
-                                                        String 
baseFileExtension,
-                                                        String partitionPath,
-                                                        HoodieTableMetaClient 
metaClient) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
-    CompletionTimeQueryView completionTimeQueryView = 
metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient);
-    StoragePathFilter filter = (path) -> {
-      if (path.toString().contains(baseFileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return compareTimestamps(commit, LESSER_THAN_OR_EQUALS,
-            fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
-        return completionTimeQueryView.isSlicedAfterOrOn(commit, 
fileCommitTime);
-      }
-      return false;
-    };
-    return metaClient.getStorage()
-        .listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
-  }
-
   @NotNull
-  private List<HoodieRollbackRequest> getHoodieRollbackRequests(String 
partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
+  private List<HoodieRollbackRequest> getHoodieRollbackRequests(String 
partitionPath, List<StoragePath> filesToDeletedStatus) {
     return filesToDeletedStatus.stream()
         .map(pathInfo -> {
-          String dataFileToBeDeleted = pathInfo.getPath().toString();
+          String dataFileToBeDeleted = pathInfo.toString();
           return formatDeletePath(dataFileToBeDeleted);
         })
         .map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
@@ -317,11 +293,11 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     return path.substring(path.indexOf(":") + 1);
   }
 
-  private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
-                                                         String 
basefileExtension,
-                                                         String partitionPath,
-                                                         HoodieStorage 
storage) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+  private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+                                                     String basefileExtension,
+                                                     String partitionPath,
+                                                     HoodieStorage storage) 
throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path {} and 
commit {}", partitionPath, commit);
     StoragePathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
@@ -329,44 +305,33 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       }
       return false;
     };
-    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
+    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), 
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
   }
 
-  private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
-                                                      String partitionPath, 
String basePath,
-                                                      String 
baseFileExtension, HoodieStorage storage,
-                                                      
Option<HoodieCommitMetadata> commitMetadataOptional,
-                                                      Boolean 
isCommitMetadataCompleted,
-                                                      HoodieTableType 
tableType) throws IOException {
-    // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
-    if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
-      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, storage);
+  private List<StoragePath> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
+                                                  String partitionPath, String 
basePath,
+                                                  String baseFileExtension, 
HoodieStorage storage,
+                                                  Option<HoodieCommitMetadata> 
commitMetadataOptional,
+                                                  boolean 
isCommitMetadataCompleted,
+                                                  HoodieTableType tableType,
+                                                  HoodieTableVersion 
tableVersion) throws IOException {
+    // for MOR tables with version < 8, listing is required to fetch the log 
files associated with base files added by this commit.
+    if (isCommitMetadataCompleted && (tableType == 
HoodieTableType.COPY_ON_WRITE || 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
+      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(), baseFileExtension);
     } else {
       return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, storage);
     }
   }
 
-  private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
-                                                             String 
partitionPath,
-                                                             String basePath,
-                                                             
HoodieCommitMetadata commitMetadata,
-                                                             String 
baseFileExtension,
-                                                             HoodieStorage 
storage) throws IOException {
+  private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
+                                                         String partitionPath,
+                                                         String basePath,
+                                                         HoodieCommitMetadata 
commitMetadata,
+                                                         String 
baseFileExtension) {
     StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
         instantToRollback.requestedTime());
-    List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, 
commitMetadata, partitionPath)
-        .filter(entry -> {
-          try {
-            return storage.exists(entry);
-          } catch (Exception e) {
-            LOG.error("Exists check failed for " + entry.toString(), e);
-          }
-          // if any Exception is thrown, do not ignore. let's try to add the 
file of interest to be deleted. we can't miss any files to be rolled back.
-          return true;
-        }).collect(Collectors.toList());
-
-    return storage.listDirectEntries(filePaths, pathFilter);
+    return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
+        .filter(pathFilter::accept).collect(Collectors.toList());
   }
 
   /**
@@ -379,15 +344,15 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
    * @return
    * @throws IOException
    */
-  private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant 
instantToRollback,
-                                                        String partitionPath,
-                                                        String basePath,
-                                                        String 
baseFileExtension,
-                                                        HoodieStorage storage) 
throws IOException {
+  private List<StoragePath> fetchFilesFromListFiles(HoodieInstant 
instantToRollback,
+                                                    String partitionPath,
+                                                    String basePath,
+                                                    String baseFileExtension,
+                                                    HoodieStorage storage) 
throws IOException {
     StoragePathFilter pathFilter = getPathFilter(baseFileExtension, 
instantToRollback.requestedTime());
     List<StoragePath> filePaths = listFilesToBeDeleted(basePath, 
partitionPath);
 
-    return storage.listDirectEntries(filePaths, pathFilter);
+    return storage.listDirectEntries(filePaths, 
pathFilter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
   }
 
   private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
new file mode 100644
index 000000000000..224115bb1de0
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparator;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Comparator specifically for computing the instant order when computing the 
instants to rollback as part of a restore operation.
+ * The order relies on the completion time of the instants, except for 
compaction instants on Merge-on-Read tables. These instants may be completed 
after a
+ * delta-commit but should still be considered earlier since the log files 
from the next delta-commit become associated with the base files from this 
compaction.
+ * For example if we have the following sequence of commits (DC=delta-commit, 
C=compaction):
+ * ... DC-10 starts -> DC-10 ends -> compaction-1 starts -> delta-commit-11 
starts -> delta-commit-11 ends -> compaction-1 ends ...
+ * If we restore to delta-commit-11, we do not roll back the compaction-1 
instant, even though it finished after delta-commit-11.
+ */
+public class RestoreInstantComparatorFactory {
+  private static final Set<String> COMPACTION_ACTIONS = 
Stream.of(HoodieTimeline.COMPACTION_ACTION, 
HoodieTimeline.COMMIT_ACTION).collect(Collectors.toSet());
+
+  public static Comparator<HoodieInstant> 
createComparator(HoodieTableMetaClient metaClient) {
+    InstantComparator instantComparator = 
metaClient.getTimelineLayout().getInstantComparator();
+    HoodieTableType tableType = metaClient.getTableType();
+    if (tableType == HoodieTableType.COPY_ON_WRITE) {
+      return (o1, o2) -> 
instantComparator.completionTimeOrderedComparator().compare(o1, o2);
+    } else {
+      return (o1, o2) -> {
+        // Do to special handling of compaction instants, we need to use 
requested time based comparator for compaction instants
+        // but completion time based comparator for others
+        if (COMPACTION_ACTIONS.contains(o1.getAction()) || 
COMPACTION_ACTIONS.contains(o2.getAction())) {
+          return 
instantComparator.requestedTimeOrderedComparator().compare(o1, o2);
+        } else {
+          return 
instantComparator.completionTimeOrderedComparator().compare(o1, o2);
+        }
+      };
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index 42af5c2f941f..169b8849edbb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -22,30 +22,34 @@ package org.apache.hudi.table.action.rollback;
 import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Comparator;
 import java.util.List;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 
 /**
  * Plans the restore action and add a restore.requested meta file to timeline.
  */
 public class RestorePlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
 
-
   private static final Logger LOG = 
LoggerFactory.getLogger(RestorePlanActionExecutor.class);
 
   public static final Integer RESTORE_PLAN_VERSION_1 = 1;
@@ -65,7 +69,8 @@ public class RestorePlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T,
   @Override
   public Option<HoodieRestorePlan> execute() {
     final HoodieInstant restoreInstant = 
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.RESTORE_ACTION, instantTime);
-    try {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    try (CompletionTimeQueryView completionTimeQueryView = 
metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient))
 {
       // Get all the commits on the timeline after the provided commit time
       // rollback pending clustering instants first before other instants (See 
HUDI-3362)
       List<HoodieInstant> pendingClusteringInstantsToRollback = 
table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline()
@@ -75,10 +80,14 @@ public class RestorePlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T,
               .filter(instant -> GREATER_THAN.test(instant.requestedTime(), 
savepointToRestoreTimestamp))
               .collect(Collectors.toList());
 
-      // Get all the commits on the timeline after the provided commit time
+      // Get all the commits on the timeline after the provided commit's 
completion time unless it is the SOLO_COMMIT_TIMESTAMP which indicates there 
are no commits for the table
+      String completionTime = 
savepointToRestoreTimestamp.equals(SOLO_COMMIT_TIMESTAMP) ? 
savepointToRestoreTimestamp : 
completionTimeQueryView.getCompletionTime(savepointToRestoreTimestamp)
+          .orElseThrow(() -> new HoodieException("Unable to find completion 
time for instant: " + savepointToRestoreTimestamp));
+
+      Predicate<HoodieInstant> instantFilter = 
constructInstantFilter(completionTime);
       List<HoodieInstant> commitInstantsToRollback = 
table.getActiveTimeline().getWriteTimeline()
-              .getReverseOrderedInstants()
-              .filter(instant -> GREATER_THAN.test(instant.requestedTime(), 
savepointToRestoreTimestamp))
+              .getReverseOrderedInstantsByCompletionTime()
+              .filter(instantFilter)
               .filter(instant -> 
!pendingClusteringInstantsToRollback.contains(instant))
               .collect(Collectors.toList());
 
@@ -90,11 +99,17 @@ public class RestorePlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T,
       HoodieRestorePlan restorePlan = new 
HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION, 
savepointToRestoreTimestamp);
       table.getActiveTimeline().saveToRestoreRequested(restoreInstant, 
restorePlan);
       table.getMetaClient().reloadActiveTimeline();
-      LOG.info("Requesting Restore with instant time " + restoreInstant);
+      LOG.info("Requesting Restore with instant time {}", restoreInstant);
       return Option.of(restorePlan);
-    } catch (HoodieIOException e) {
-      LOG.error("Got exception when saving restore requested file", e);
-      throw e;
+    } catch (Exception e) {
+      throw new HoodieException("Unable to restore to instant: " + 
savepointToRestoreTimestamp, e);
     }
   }
+
+  private Predicate<HoodieInstant> constructInstantFilter(String 
completionTime) {
+    HoodieInstant instantToRestoreTo = 
table.getMetaClient().getInstantGenerator()
+        .createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.RESTORE_ACTION, savepointToRestoreTimestamp, completionTime);
+    Comparator<HoodieInstant> comparator = 
RestoreInstantComparatorFactory.createComparator(table.getMetaClient());
+    return instant -> comparator.compare(instant, instantToRestoreTo) > 0;
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
index 4f82bad0e669..94c7ac33ce99 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -131,9 +130,7 @@ public class RollbackHelper implements Serializable {
       List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
       if (!filesToBeDeleted.isEmpty()) {
         List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient, 
filesToBeDeleted, doDelete);
-        List<Pair<String, HoodieRollbackStat>> partitionToRollbackStats = new 
ArrayList<>();
-        rollbackStats.forEach(entry -> 
partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
-        return partitionToRollbackStats.stream();
+        return rollbackStats.stream().map(entry -> 
Pair.of(entry.getPartitionPath(), entry));
       } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
         HoodieLogFormat.Writer writer = null;
         final StoragePath filePath;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
index 149831f573cc..60ce9b390c98 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
@@ -24,10 +24,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.RestoreInstantComparatorFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Comparator;
+
 public class SavepointHelpers {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointHelpers.class);
@@ -36,26 +39,29 @@ public class SavepointHelpers {
     HoodieInstant savePoint = 
table.getMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
     boolean isSavepointPresent = 
table.getCompletedSavepointTimeline().containsInstant(savePoint);
     if (!isSavepointPresent) {
-      LOG.warn("No savepoint present " + savepointTime);
+      LOG.warn("No savepoint present {}", savepointTime);
       return;
     }
 
     table.getActiveTimeline().revertToInflight(savePoint);
     
table.getActiveTimeline().deleteInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
 HoodieTimeline.SAVEPOINT_ACTION,
         savepointTime));
-    LOG.info("Savepoint " + savepointTime + " deleted");
+    LOG.info("Savepoint {} deleted", savepointTime);
   }
 
   public static void validateSavepointRestore(HoodieTable table, String 
savepointTime) {
     // Make sure the restore was successful
     table.getMetaClient().reloadActiveTimeline();
-    Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+    // Validate that the restore has returned the timeline to the anticipated 
state
+    Comparator<HoodieInstant> instantComparator = 
RestoreInstantComparatorFactory.createComparator(table.getMetaClient());
+    Option<HoodieInstant> lastInstant = 
Option.fromJavaOptional(table.getActiveTimeline()
         .getWriteTimeline()
         .filterCompletedAndCompactionInstants()
-        .lastInstant();
+        .getInstantsAsStream()
+        .max(instantComparator));
     ValidationUtils.checkArgument(lastInstant.isPresent());
     
ValidationUtils.checkArgument(lastInstant.get().requestedTime().equals(savepointTime),
-        savepointTime + " is not the last commit after restoring to savepoint, 
last commit was "
+        () -> savepointTime + " is not the last commit after restoring to 
savepoint, last commit was "
             + lastInstant.get().requestedTime());
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 164a7e621478..f08f1d580b3a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -23,12 +23,14 @@ import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.table.timeline.TimelineFactory;
@@ -660,10 +662,15 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
    * @param baseRecordsToUpdate The base records to update
    */
   @SuppressWarnings("rawtypes, unchecked")
-  protected void updateBatchWithoutCommit(String newCommitTime, 
List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+  protected void updateBatchWithoutCommit(String newCommitTime, 
List<HoodieRecord> baseRecordsToUpdate, HoodieTableVersion tableVersion) throws 
IOException {
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withRollbackUsingMarkers(true)
         .withHeartbeatTolerableMisses(0)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
         .build();
 
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
index 0eaedda3492d..331db8713faa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
@@ -27,9 +27,6 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
@@ -59,8 +56,6 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
  */
 public abstract class BaseHoodieTimeline implements HoodieTimeline {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHoodieTimeline.class);
-
   private static final long serialVersionUID = 1L;
 
   private static final String HASHING_ALGORITHM = "SHA-256";
@@ -506,6 +501,11 @@ public abstract class BaseHoodieTimeline implements 
HoodieTimeline {
         .sorted(instantComparator.completionTimeOrderedComparator());
   }
 
+  @Override
+  public Stream<HoodieInstant> getReverseOrderedInstantsByCompletionTime() {
+    return 
getInstantsAsStream().sorted(instantComparator.completionTimeOrderedComparator().reversed());
+  }
+
   @Override
   public boolean isBeforeTimelineStarts(String instant) {
     Option<HoodieInstant> firstNonSavepointCommit = 
getFirstNonSavepointCommit();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 1b4b99146e69..0678b60aba8a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -56,11 +56,11 @@ public interface CompletionTimeQueryView extends 
AutoCloseable {
   /**
    * Queries the completion time with given instant time.
    *
-   * @param instantTime The instant time.
+   * @param requestedTime The time the commit was requested.
    *
    * @return The completion time if the instant finished or empty if it is 
still pending.
    */
-  Option<String> getCompletionTime(String beginTime);
+  Option<String> getCompletionTime(String requestedTime);
 
   /**
    * Queries the instant times with given completion time range.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index c923f697cf16..3a1b35d00d15 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -606,13 +606,19 @@ public interface HoodieTimeline extends 
HoodieInstantReader, Serializable {
    */
   Stream<HoodieInstant> getReverseOrderedInstants();
 
+  /**
+   * Get the stream of instants in reverse order by completion time. Any 
incomplete instants are returned as the first elements of the stream.
+   * @return a stream of sorted instants
+   */
+  Stream<HoodieInstant> getReverseOrderedInstantsByCompletionTime();
+
   /**
    * @return the latest completion time of the instants
    */
   Option<String> getLatestCompletionTime();
 
   /**
-   * Get the stream of instants in order by completion timestamp of actions.
+   * Get the stream of completed instants in order by completion timestamp of 
actions.
    */
   Stream<HoodieInstant> getInstantsOrderedByCompletionTime();
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
index bd1b761150ab..d15831a9fb80 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
@@ -36,7 +36,7 @@ public interface InstantComparator extends Serializable {
   Comparator<HoodieInstant> requestedTimeOrderedComparator();
 
   /**
-   * Returns comparaor that orders primarily based on completion time and 
secondary ordering based on {@link #requestedTimeOrderedComparator()}.
+   * Returns comparator that orders primarily based on completion time and 
secondary ordering based on {@link #requestedTimeOrderedComparator()}.
    * @return
    */
   Comparator<HoodieInstant> completionTimeOrderedComparator();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
index 518b5bc93821..e1b42eeecf16 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
@@ -79,6 +79,15 @@ public class InstantComparators {
 
     @Override
     public int compare(HoodieInstant instant1, HoodieInstant instant2) {
+      if (instant1.getCompletionTime() == null && instant2.getCompletionTime() 
!= null) {
+        return 1; // instant1 is not completed, so it is greater than instant2
+      }
+      if (instant2.getCompletionTime() == null && instant1.getCompletionTime() 
!= null) {
+        return -1; // instant2 is not completed, so it is greater than instant1
+      }
+      if (instant1.getCompletionTime() == null && instant2.getCompletionTime() 
== null) {
+        return timestampBasedComparator.compare(instant1, instant2); // both 
are not completed, compare by requested time
+      }
       int res = 
instant1.getCompletionTime().compareTo(instant2.getCompletionTime());
       if (res == 0) {
         res = timestampBasedComparator.compare(instant1, instant2);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
new file mode 100644
index 000000000000..f8b51f79ff69
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import 
org.apache.hudi.common.table.timeline.versioning.common.InstantComparators;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestInstantComparators {
+  @Test
+  void testCompletionTimeOrdering() {
+    HoodieInstant instant1 = createCompletedHoodieInstant("001", "002");
+    HoodieInstant instant2 = createCompletedHoodieInstant("003", "005");
+    HoodieInstant instant3 = createCompletedHoodieInstant("002", "004");
+    HoodieInstant instant4 = createInflightHoodieInstant("004");
+    HoodieInstant instant5 = createInflightHoodieInstant("009");
+
+    Comparator<HoodieInstant> comparator = new 
InstantComparators.CompletionTimeBasedComparator(Collections.singletonMap(HoodieTimeline.COMPACTION_ACTION,
 HoodieTimeline.COMMIT_ACTION));
+    List<HoodieInstant> instants = Arrays.asList(instant5, instant3, instant1, 
instant4, instant2);
+    instants.sort(comparator);
+    assertEquals(Arrays.asList(instant1, instant3, instant2, instant4, 
instant5), instants);
+  }
+
+  private static HoodieInstant createCompletedHoodieInstant(String 
requestedTime, String completionTime) {
+    return new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, requestedTime, completionTime, 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+  }
+
+  private static HoodieInstant createInflightHoodieInstant(String 
requestedTime) {
+    return new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, requestedTime, 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 79b32c2b1544..6bd86f60a95f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -243,7 +243,7 @@ public class HoodieTestUtils {
     String keyGen = 
properties.getProperty("hoodie.datasource.write.keygenerator.class");
     if (!Objects.equals(keyGen, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
         && 
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
-      builder.setPartitionFields("some_nonexistent_field");
+      builder.setPartitionFields("partition_path");
     }
     builder.fromProperties(properties);
     return builder;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
index 744c331b65f8..6807f6cbcbb2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
@@ -229,7 +229,7 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     }
   }
 
-  private void assertForFSVEquality(HoodieTableFileSystemView fsv1, 
HoodieTableFileSystemView fsv2, boolean enableMdt, Option<HoodieCommitMetadata> 
commitMetadataOpt) {
+  public static void assertForFSVEquality(HoodieTableFileSystemView fsv1, 
HoodieTableFileSystemView fsv2, boolean enableMdt, Option<HoodieCommitMetadata> 
commitMetadataOpt) {
     List<String> allPartitionNames = 
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH);
     fsv1.loadPartitions(allPartitionNames);
     if (enableMdt) {
@@ -255,7 +255,7 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     });
   }
 
-  private void assertBaseFileListEquality(List<HoodieBaseFile> baseFileList1, 
List<HoodieBaseFile> baseFileList2) {
+  static void assertBaseFileListEquality(List<HoodieBaseFile> baseFileList1, 
List<HoodieBaseFile> baseFileList2) {
     assertEquals(baseFileList1.size(), baseFileList2.size());
     Map<String, HoodieBaseFile> fileNameToBaseFileMap1 = new HashMap<>();
     baseFileList1.forEach(entry -> {
@@ -271,14 +271,14 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     });
   }
 
-  public void assertBaseFileEquality(HoodieBaseFile baseFile1, HoodieBaseFile 
baseFile2) {
+  static void assertBaseFileEquality(HoodieBaseFile baseFile1, HoodieBaseFile 
baseFile2) {
     assertEquals(baseFile1.getFileName(), baseFile2.getFileName());
     assertEquals(baseFile1.getFileId(), baseFile2.getFileId());
     assertEquals(baseFile1.getFileLen(), baseFile2.getFileLen());
     assertEquals(baseFile1.getFileSize(), baseFile2.getFileSize());
   }
 
-  private void assertFileSliceListEquality(List<FileSlice> fileSlices1, 
List<FileSlice> fileSlices2, Option<HoodieCommitMetadata> commitMetadataOpt) {
+  static void assertFileSliceListEquality(List<FileSlice> fileSlices1, 
List<FileSlice> fileSlices2, Option<HoodieCommitMetadata> commitMetadataOpt) {
     assertEquals(fileSlices1.size(), fileSlices1.size());
     Map<Pair<String, String>, FileSlice> fileNameToFileSliceMap1 = new 
HashMap<>();
     fileSlices1.forEach(entry -> {
@@ -294,7 +294,7 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     });
   }
 
-  private void assertFileSliceEquality(FileSlice fileSlice1, FileSlice 
fileSlice2, Option<HoodieCommitMetadata> commitMetadataOpt) {
+  static void assertFileSliceEquality(FileSlice fileSlice1, FileSlice 
fileSlice2, Option<HoodieCommitMetadata> commitMetadataOpt) {
     assertEquals(fileSlice1.getBaseFile().isPresent(), 
fileSlice2.getBaseFile().isPresent());
     if (fileSlice1.getBaseFile().isPresent()) {
       assertBaseFileEquality(fileSlice1.getBaseFile().get(), 
fileSlice2.getBaseFile().get());
@@ -324,7 +324,7 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     }
   }
 
-  private void assertLogFileEquality(HoodieLogFile logFile1, HoodieLogFile 
logFile2) {
+  static void assertLogFileEquality(HoodieLogFile logFile1, HoodieLogFile 
logFile2) {
     assertEquals(logFile1.getFileName(), logFile2.getFileName());
     assertEquals(logFile1.getFileId(), logFile2.getFileId());
     assertEquals(logFile1.getLogVersion(), logFile2.getLogVersion());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
index 7e7a1d537dda..24ff620c8d08 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
@@ -21,30 +21,54 @@ package org.apache.hudi.functional;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.functional.TestHoodieFileSystemViews.assertForFSVEquality;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -52,6 +76,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Tag("functional")
 public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase {
+  // Overrides setup to avoid creating a table with the default table version
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initHoodieStorage();
+    initTimelineService();
+  }
 
   /**
    * Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7. 
restore to DC3.
@@ -69,15 +102,13 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
    * snapshot query: total rec matches.
    * checking the row count by updating columns in (val4,val5,val6, val7).
    */
-  @Test
-  void testCleaningDeltaCommits() throws Exception {
-    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit 
triggers compaction
-            .withInlineCompaction(false)
-            .build())
-        .withRollbackUsingMarkers(true)
-        .build();
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  void testCleaningDeltaCommits(HoodieTableVersion tableVersion) throws 
Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit 
triggers compaction
+        .withInlineCompaction(false).build(), tableVersion);
+    Map<String, Integer> commitToRowCount = new HashMap<>();
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
       String savepointCommit = null;
       final int numRecords = 10;
@@ -88,6 +119,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
         List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
         JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
         client.commit(newCommitTime, client.insert(writeRecords, 
newCommitTime));
+        commitToRowCount.put(newCommitTime, numRecords);
         if (i == 3) {
           // trigger savepoint
           savepointCommit = newCommitTime;
@@ -127,17 +159,53 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
             filter).size());
       }
     }
+    validateFilesMetadata(hoodieWriteConfig);
+    assertEquals(commitToRowCount, getRecordCountPerCommit());
   }
 
-  @Test
-  public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws 
IOException {
-    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(4)
-            .withInlineCompaction(true)
-            .build())
-        .withRollbackUsingMarkers(true)
-        .build();
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  public void testRestoreToWithInflightDeltaCommit(HoodieTableVersion 
tableVersion) throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4)
+        .withInlineCompaction(false)
+        .compactionSmallFileSize(0).build(), tableVersion);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // establish base files
+      String firstCommitTime = client.startCommit();
+      int numRecords = 10;
+      List<HoodieRecord> initialRecords = 
dataGen.generateInserts(firstCommitTime, numRecords);
+      client.commit(firstCommitTime, 
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+      // add updates that go to log files
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> updatedRecords = 
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+      client.commit(secondCommitTime, 
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+      // create a savepoint
+      client.savepoint("user1", "Savepoint for 2nd commit");
+      // add a third delta commit
+      String thirdCommitTime = client.startCommit();
+      List<HoodieRecord> updatedRecordsToBeRolledBack = 
dataGen.generateUniqueUpdates(thirdCommitTime, numRecords);
+      client.commit(thirdCommitTime, 
client.upsert(jsc.parallelize(updatedRecordsToBeRolledBack, 1), 
thirdCommitTime));
+      // add a fourth delta commit but leave it in-flight
+      String fourthCommitTime = client.startCommit();
+      List<HoodieRecord> inFlightRecords = 
Stream.concat(dataGen.generateUniqueUpdates(fourthCommitTime, 
numRecords).stream(),
+          dataGen.generateInserts(fourthCommitTime, 
numRecords).stream()).collect(Collectors.toList());
+      // collect result to trigger file creation
+      List<WriteStatus> writes = 
client.upsert(jsc.parallelize(inFlightRecords, 1), fourthCommitTime).collect();
+      // restore to the savepoint
+      client.restoreToSavepoint(secondCommitTime);
+      // Validate metadata and records match expectations
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(secondCommitTime, numRecords), 
getRecordCountPerCommit());
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  public void 
testRestoreWithFileGroupCreatedWithDeltaCommits(HoodieTableVersion 
tableVersion) throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4)
+        .withInlineCompaction(true).build(), tableVersion);
     final int numRecords = 100;
     String firstCommit;
     String secondCommit;
@@ -178,6 +246,11 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
             .compactionSmallFileSize(0)
             .build())
         .withRollbackUsingMarkers(true)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
         .build();
 
     // add 2 more updates which will create log files.
@@ -214,6 +287,71 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
               FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(), 
pPath),
               filter).size());
     }
+    validateFilesMetadata(hoodieWriteConfig);
+    Map<String, Integer> actualCommitToRowCount = getRecordCountPerCommit();
+    assertEquals(Collections.singletonMap(firstCommit, numRecords), 
actualCommitToRowCount);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  public void testRestoreWithUpdatesToClusteredFileGroups(HoodieTableVersion 
tableVersion) throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(
+        
HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(4).withInlineCompaction(false).compactionSmallFileSize(0).build(),
+        
HoodieClusteringConfig.newBuilder().withAsyncClusteringMaxCommits(2).build(), 
tableVersion);
+    final int numRecords = 20;
+    String secondCommit;
+    String clusteringCommit;
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // 1st commit insert
+      String newCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.commit(newCommitTime, client.insert(writeRecords, newCommitTime));
+
+      // 2nd commit with inserts and updates which will create new file slice 
due to small file handling.
+      newCommitTime = client.startCommit();
+      List<HoodieRecord> records2 = 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+      List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+      JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+      client.commit(newCommitTime, 
client.upsert(writeRecords2.union(writeRecords3), newCommitTime));
+      secondCommit = newCommitTime;
+      // add savepoint to 2nd commit
+      client.savepoint(secondCommit, "test user","test comment");
+
+      Option<String> clusteringInstant = 
client.scheduleClustering(Option.empty());
+      clusteringCommit = clusteringInstant.get();
+      client.cluster(clusteringCommit);
+
+      // add new log files on top of the clustered file group
+      newCommitTime = client.startCommit();
+      List<HoodieRecord> updatedRecords = 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      writeClient.commit(newCommitTime, 
writeClient.upsert(jsc.parallelize(updatedRecords, 1), newCommitTime));
+    }
+    assertRowNumberEqualsTo(50);
+    StoragePathFilter filter = path -> 
path.toString().contains(clusteringCommit);
+    for (String pPath : dataGen.getPartitionPaths()) {
+      // verify there is 1 base file and 1 log file created matching the 
clustering timestamp if table version is 6.
+      // For table version 8 and above, the log file will not reference the 
clustering commit.
+      assertEquals(tableVersion == HoodieTableVersion.SIX ? 2 : 1, 
storage.listDirectEntries(
+              FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(), 
pPath), filter)
+          .size());
+    }
+
+    // restore to 2nd commit and remove the clustering instant and delta 
commit that followed it
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      client.restoreToSavepoint(secondCommit);
+    }
+    // verify that entire file slice created w/ base instant time of 
clustering instant is completely rolled back.
+    for (String pPath : dataGen.getPartitionPaths()) {
+      assertEquals(0, storage.listDirectEntries(
+              FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(), 
pPath), filter)
+          .size());
+    }
+    validateFilesMetadata(hoodieWriteConfig);
+    Map<String, Integer> actualCommitToRowCount = getRecordCountPerCommit();
+    assertEquals(Collections.singletonMap(secondCommit, 50), 
actualCommitToRowCount);
   }
 
   /**
@@ -223,17 +361,15 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
    * <P>Expected behaviors: pending compaction after savepoint should also be 
cleaned,
    * the latest file slice should be fully delete, for DC4 a rollback log 
append should be made.
    */
-  @Test
-  void testCleaningPendingCompaction() throws Exception {
-    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit 
triggers compaction
-            .withInlineCompaction(false)
-            .withScheduleInlineCompaction(false)
-            .compactionSmallFileSize(0)
-            .build())
-        .withRollbackUsingMarkers(true)
-        .build();
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  void testCleaningPendingCompaction(HoodieTableVersion tableVersion) throws 
Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit 
triggers compaction
+        .withInlineCompaction(false)
+        .withScheduleInlineCompaction(false)
+        .compactionSmallFileSize(0).build(), tableVersion);
+    Map<String, Integer> commitToRowCount = new HashMap<>();
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
       String savepointCommit = null;
       final int numRecords = 10;
@@ -245,6 +381,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
         JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
         JavaRDD<WriteStatus> writeStatusJavaRDD = client.insert(writeRecords, 
newCommitTime);
         client.commit(newCommitTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
+        commitToRowCount.put(newCommitTime, numRecords);
         if (i == 3) {
           // trigger savepoint
           savepointCommit = newCommitTime;
@@ -261,13 +398,15 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
         if (i == 1) {
           Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
           assertTrue(compactionInstant.isPresent(), "A compaction plan should 
be scheduled");
-          compactWithoutCommit(compactionInstant.get());
+          compactWithoutCommit(compactionInstant.get(), tableVersion);
         }
       }
 
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
-      assertRowNumberEqualsTo(30);
+      assertEquals(commitToRowCount, getRecordCountPerCommit());
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
     }
   }
 
@@ -277,16 +416,14 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
    * <P>Expected behaviors: should roll back DC5, C6 and DC6.
    * No files will be cleaned up. Only rollback log appends.
    */
-  @Test
-  void testCleaningCompletedRollback() throws Exception {
-    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit 
triggers compaction
-            .withInlineCompaction(false)
-            .withScheduleInlineCompaction(false)
-            .build())
-        .withRollbackUsingMarkers(true)
-        .build();
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+  void testCleaningCompletedRollback(HoodieTableVersion tableVersion) throws 
Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit 
triggers compaction
+        .withInlineCompaction(false)
+        .withScheduleInlineCompaction(false).build(), tableVersion);
+    Map<String, Integer> commitToRowCount = new HashMap<>();
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
       String savepointCommit = null;
       final int numRecords = 10;
@@ -300,11 +437,14 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
         client.commit(newCommitTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
         if (i == 2) {
           baseRecordsToUpdate = records;
+        } else {
+          commitToRowCount.put(newCommitTime, numRecords);
         }
       }
 
       // update to generate log files, then a valid compaction plan can be 
scheduled
-      upsertBatch(client, baseRecordsToUpdate);
+      String updateCommitTime = upsertBatch(client, baseRecordsToUpdate);
+      commitToRowCount.put(updateCommitTime, baseRecordsToUpdate.size());
       Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
       assertTrue(compactionInstant.isPresent(), "A compaction plan should be 
scheduled");
       HoodieWriteMetadata result = client.compact(compactionInstant.get());
@@ -316,7 +456,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
       assertRowNumberEqualsTo(20);
       // write a delta_commit but does not commit
       updateBatchWithoutCommit(WriteClientTestUtils.createNewInstantTime(),
-          Objects.requireNonNull(baseRecordsToUpdate, "The records to update 
should not be null"));
+          Objects.requireNonNull(baseRecordsToUpdate, "The records to update 
should not be null"), tableVersion);
       // rollback the delta_commit
       metaClient = HoodieTableMetaClient.reload(metaClient);
       assertTrue(client.rollbackFailedWrites(metaClient), "The last 
delta_commit should be rolled back");
@@ -326,21 +466,239 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
 
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
-      assertRowNumberEqualsTo(20);
+      assertEquals(commitToRowCount, getRecordCountPerCommit());
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+    }
+  }
+
+  @Test
+  void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      // Run compaction while delta-commit is in-flight
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed after compaction");
+
+      // write one more commit
+      String newCommitTime = client.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      // ensure the compaction instant is still present because it was 
completed before the target of the restore
+      
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
     }
   }
 
-  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> 
baseRecordsToUpdate) throws IOException {
+  @Test
+  void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+      // commit the compaction instant after the delta commit
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // write one more commit
+      String newCommitTime = writeClient.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      // ensure the compaction instant is not present because it was completed 
after the target of the restore
+      
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+  void 
rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTableVersion
 tableVersion) {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(tableVersion);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      final int numRecords = 10;
+      writeInitialCommitsForAsyncServicesTests(numRecords);
+      // Schedule a compaction
+      Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
+      // Start a delta commit
+      String inflightCommit = client.startCommit();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+      JavaRDD<WriteStatus> writeStatus = 
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+      HoodieWriteMetadata result = client.compact(compactionInstant.get());
+      // commit the inflight delta commit
+      client.commit(inflightCommit, writeStatus);
+      // commit the compaction instant after the delta commit
+      client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+      client.savepoint(inflightCommit, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // write one more commit
+      String newCommitTime = writeClient.startCommit();
+      records = dataGen.generateInserts(newCommitTime, numRecords);
+      writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+      client.commit(newCommitTime, writeStatus);
+
+      // restore to savepoint
+      client.restoreToSavepoint(inflightCommit);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(inflightCommit, numRecords), 
getRecordCountPerCommit());
+      
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+          .anyMatch(hoodieInstant -> 
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+      assertEquals(tableVersion, 
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+  void testMissingFileDoesNotFallRestore(HoodieTableVersion tableVersion) 
throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(4)
+        .withInlineCompaction(false)
+        .compactionSmallFileSize(0).build(), tableVersion);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // establish base files
+      String firstCommitTime = client.startCommit();
+      int numRecords = 10;
+      List<HoodieRecord> initialRecords = 
dataGen.generateInserts(firstCommitTime, numRecords);
+      client.commit(firstCommitTime, 
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+      // add updates that go to log files
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> updatedRecords = 
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+      client.commit(secondCommitTime, 
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+      client.savepoint(secondCommitTime, "user1", "Savepoint for commit that 
completed during compaction");
+
+      // add a third delta commit with log and new base files
+      String thirdCommitTime = client.startCommit();
+      List<HoodieRecord> upsertRecords = 
Stream.concat(dataGen.generateUniqueUpdates(thirdCommitTime, 
numRecords).stream(),
+            dataGen.generateInserts(thirdCommitTime, 
numRecords).stream()).collect(Collectors.toList());
+      List<WriteStatus> writeStatuses = 
client.upsert(jsc.parallelize(upsertRecords, 1), thirdCommitTime).collect();
+      client.commit(thirdCommitTime, jsc.parallelize(writeStatuses, 1));
+
+      // delete one base file and one log file to validate both cases are 
handled gracefully
+      boolean deletedLogFile = false;
+      boolean deletedBaseFile = false;
+      for (WriteStatus writeStatus : writeStatuses) {
+        StoragePath path = FSUtils.constructAbsolutePath(basePath, 
writeStatus.getStat().getPath());
+        if (deletedLogFile && deletedBaseFile) {
+          break;
+        }
+        if (FSUtils.isLogFile(path)) {
+          deletedLogFile = true;
+          storage.deleteFile(path);
+        } else {
+          deletedBaseFile = true;
+          storage.deleteFile(path);
+        }
+      }
+      client.restoreToSavepoint(secondCommitTime);
+      validateFilesMetadata(hoodieWriteConfig);
+      assertEquals(Collections.singletonMap(secondCommitTime, numRecords), 
getRecordCountPerCommit());
+    }
+  }
+
+  private Map<String, Integer> writeInitialCommitsForAsyncServicesTests(int 
numRecords) {
+    Map<String, Integer> commitToRowCount = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      String newCommitTime = writeClient.startCommit();
+      List<HoodieRecord> records = i == 0 ? 
dataGen.generateInserts(newCommitTime, numRecords) : 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      JavaRDD<WriteStatus> writeStatus = i == 0 ? 
writeClient.insert(jsc.parallelize(records, 1), newCommitTime) : 
writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+      writeClient.commit(newCommitTime, writeStatus);
+      if (i == 2) {
+        commitToRowCount.put(newCommitTime, numRecords);
+      }
+    }
+    return commitToRowCount;
+  }
+
+  private HoodieWriteConfig 
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion 
tableVersion) {
+    HoodieWriteConfig config = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(false)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
+        
.withProps(Collections.singletonMap(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
 "0"))
+        .build();
+    try {
+      initMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+      return config;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to initialize HoodieTableMetaClient", 
e);
+    }
+  }
+
+  private void validateFilesMetadata(HoodieWriteConfig writeConfig) {
+    HoodieTableFileSystemView fileListingBasedView = 
FileSystemViewManager.createInMemoryFileSystemView(context, metaClient,
+        HoodieMetadataConfig.newBuilder().enable(false).build());
+    FileSystemViewStorageConfig viewStorageConfig = 
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getProps())
+        .withStorageType(FileSystemViewStorageType.MEMORY).build();
+    HoodieTableFileSystemView metadataBasedView = (HoodieTableFileSystemView) 
FileSystemViewManager
+        .createViewManager(context, writeConfig.getMetadataConfig(), 
viewStorageConfig, writeConfig.getCommonConfig(),
+            (SerializableFunctionUnchecked<HoodieTableMetaClient, 
HoodieTableMetadata>) v1 ->
+                
metaClient.getTableFormat().getMetadataFactory().create(context, 
metaClient.getStorage(), writeConfig.getMetadataConfig(), 
writeConfig.getBasePath()))
+        .getFileSystemView(basePath);
+    assertForFSVEquality(fileListingBasedView, metadataBasedView, true, 
Option.empty());
+  }
+
+  private String upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> 
baseRecordsToUpdate) throws IOException {
     String newCommitTime = client.startCommit();
     List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 
Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not 
be null"));
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
     client.commit(newCommitTime, client.upsert(writeRecords, newCommitTime), 
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
+    return newCommitTime;
   }
 
-  private void compactWithoutCommit(String compactionInstantTime) {
+  private void compactWithoutCommit(String compactionInstantTime, 
HoodieTableVersion tableVersion) {
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withRollbackUsingMarkers(true)
         .withEmbeddedTimelineServerEnabled(false)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
         .build();
 
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
@@ -353,4 +711,35 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
+
+  private HoodieWriteConfig 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig compactionConfig, 
HoodieTableVersion tableVersion) throws IOException {
+    return getHoodieWriteConfigAndInitializeTable(compactionConfig, 
HoodieClusteringConfig.newBuilder().build(), tableVersion);
+  }
+
+  private HoodieWriteConfig 
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig compactionConfig, 
HoodieClusteringConfig clusteringConfig, HoodieTableVersion tableVersion)
+      throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(compactionConfig)
+        .withClusteringConfig(clusteringConfig)
+        .withRollbackUsingMarkers(true)
+        .withAutoUpgradeVersion(false)
+        .withWriteTableVersion(tableVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+            .build())
+        .build();
+    initMetaClient(HoodieTableType.MERGE_ON_READ, 
hoodieWriteConfig.getProps());
+    return hoodieWriteConfig;
+  }
+
+  private Map<String, Integer> getRecordCountPerCommit() {
+    Dataset<Row> dataset = sparkSession.read().format("hudi").load(basePath);
+    List<Row> rows = dataset.collectAsList();
+    int index = 
dataset.schema().fieldIndex(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName());
+    Map<String, Integer> actualCommitToRowCount = new HashMap<>();
+    rows.forEach(row -> {
+      actualCommitToRowCount.compute(row.getString(index), (k, v) -> (v == 
null) ? 1 : v + 1);
+    });
+    return actualCommitToRowCount;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 9e2325593099..61df5fc76d17 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -20,6 +20,8 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model._
@@ -35,7 +37,7 @@ import org.apache.hudi.util.JavaConversions
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.lit
 import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource}
 import org.junit.jupiter.params.provider.Arguments.arguments
@@ -365,6 +367,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
 
     val lastCleanInstant = 
getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant()
     assertTrue(lastCleanInstant.isPresent)
+    val writeConfig = getWriteConfig(hudiOpts)
+    val client = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), 
writeConfig)
+    client.savepoint("user", "note")
 
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
@@ -372,11 +377,10 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     
assertTrue(getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant().get().requestedTime
       .compareTo(lastCleanInstant.get().requestedTime) > 0)
 
-    var rollbackedInstant: Option[HoodieInstant] = Option.empty
-    while (rollbackedInstant.isEmpty || rollbackedInstant.get.getAction != 
ActionType.clean.name()) {
-      // rollback clean instant
-      rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
-    }
+    client.restoreToSavepoint()
+    client.close()
+    // last commit is no longer present so remove it from the mergedDfList
+    mergedDfList = mergedDfList.take(mergedDfList.size - 1)
     validateDataAndRecordIndices(hudiOpts)
   }
 
@@ -574,12 +578,28 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     val function = () => doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append)
-    executeFunctionNTimes(function, 5)
+    executeFunctionNTimes(function, 3)
 
+    // create a savepoint on the data table before the metadata table clean 
operation
+    
assertFalse(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
+    val writeConfig = getWriteConfig(hudiOpts)
+    val client = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), 
writeConfig)
+    client.savepoint("user", "note")
+
+    // validate that the clean is present in the metadata table timeline
+    var iterations = 0
+    while 
(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isEmpty)
 {
+      doWriteAndValidateDataAndRecordIndex(hudiOpts,
+        operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+        saveMode = SaveMode.Append)
+      iterations += 1
+    }
     
assertTrue(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
-    rollbackLastInstant(hudiOpts)
-    // Rolling back clean instant from MDT
-    rollbackLastInstant(hudiOpts)
+    // restore to the savepoint to force the metadata table state to roll back 
to before the clean
+    client.restoreToSavepoint()
+    client.close()
+    // remove the commits that were created after the savepoint
+    mergedDfList = mergedDfList.take(mergedDfList.size - iterations)
     validateDataAndRecordIndices(hudiOpts)
   }
 

Reply via email to