This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77786d28627 [HUDI-6153] Changed the rollback mechanism for MDT to 
actual rollbacks rather than appending revert blocks. (#8837)
77786d28627 is described below

commit 77786d28627cafc2982e20df89c2617d5c940811
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jul 20 05:19:58 2023 -0700

    [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks 
rather than appending revert blocks. (#8837)
    
    Prior to this patch, rollbacks in data table (DT) is yet another delta 
commit
    in metadata table (MDT), and we mark all files as invalid in that delta 
commit.
    But as we are adding more high-scale partitions like record indec, we can't
    afford to mark 100k entries as invalid. So, we wanted to trigger an actual
    rollback of the commit of interest in MDT for the delta commit of interest
    (similar to how we do rollback in DT).
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../hudi/cli/integ/ITTestSavepointsCommand.java    |  20 +-
 .../hudi/client/BaseHoodieTableServiceClient.java  |  16 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  51 ++-
 .../metadata/HoodieBackedTableMetadataWriter.java  | 166 ++++++++--
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |   2 +-
 .../hudi/metadata/HoodieTableMetadataWriter.java   |   6 -
 .../hudi/table/upgrade/UpgradeDowngrade.java       |   3 +-
 .../common/testutils/HoodieMetadataTestTable.java  |   6 +-
 .../hudi/utils/TestMetadataConversionUtils.java    |   4 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |   9 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  11 +
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |  17 +
 .../org/apache/hudi/client/TestClientRollback.java |   3 +-
 .../functional/TestHoodieBackedMetadata.java       |  71 ++--
 .../apache/hudi/io/TestHoodieTimelineArchiver.java |  87 ++---
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  19 +-
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  34 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |   2 +-
 .../table/timeline/HoodieDefaultTimeline.java      |   6 +
 .../hudi/common/table/timeline/HoodieTimeline.java |   9 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 358 ++++++++++-----------
 .../hudi/common/testutils/HoodieTestTable.java     |  58 +++-
 .../sink/TestStreamWriteOperatorCoordinator.java   |   7 +-
 .../org/apache/hudi/functional/TestBootstrap.java  |   1 +
 .../apache/hudi/utilities/TestHoodieIndexer.java   |   3 +-
 25 files changed, 601 insertions(+), 368 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
index 9bc368e9522..7bf38338a5d 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -100,9 +101,15 @@ public class ITTestSavepointsCommand extends 
HoodieCLIIntegrationTestBase {
   /**
    * Test case of command 'savepoint rollback'.
    */
-  @Test
+  @Disabled("HUDI-6571") // TODO: Fix this test. Probably need to fix 
HoodieTestDataGenerator to create non-empty commit metadata.
   public void testRollbackToSavepoint() throws IOException {
-    // generate four savepoints
+    // disable metadata table.
+    Object result = shell.evaluate(() ->
+        String.format("metadata delete"));
+
+    assertTrue(ShellEvaluationResultUtil.isSuccess(result));
+
+    // generate four commits
     for (int i = 100; i < 104; i++) {
       String instantTime = String.valueOf(i);
       HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, 
jsc.hadoopConfiguration());
@@ -112,13 +119,14 @@ public class ITTestSavepointsCommand extends 
HoodieCLIIntegrationTestBase {
     String savepoint = "102";
     HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, 
jsc.hadoopConfiguration());
 
-    Object result = shell.evaluate(() ->
+    result = shell.evaluate(() ->
             String.format("savepoint rollback --savepoint %s --sparkMaster 
%s", savepoint, "local"));
 
+    Object finalResult = result;
     assertAll("Command run failed",
-        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(finalResult)),
         () -> assertEquals(
-            String.format("Savepoint \"%s\" rolled back", savepoint), 
result.toString()));
+            String.format("Savepoint \"%s\" rolled back", savepoint), 
finalResult.toString()));
 
     // there is 1 restore instant
     HoodieActiveTimeline timeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
@@ -132,7 +140,7 @@ public class ITTestSavepointsCommand extends 
HoodieCLIIntegrationTestBase {
   /**
    * Test case of command 'savepoint rollback' with metadata table bootstrap.
    */
-  @Test
+  @Disabled("HUDI-6571")
   public void testRollbackToSavepointWithMetadataTableEnable() throws 
IOException {
     // generate for savepoints
     for (int i = 101; i < 105; i++) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 5eb2a8f9ff3..05944e71711 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -861,8 +861,22 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
    */
   @Deprecated
   public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) 
throws HoodieRollbackException {
-    LOG.info("Begin rollback of instant " + commitInstantTime);
     final String rollbackInstantTime = pendingRollbackInfo.map(entry -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
+    return rollback(commitInstantTime, pendingRollbackInfo, 
rollbackInstantTime, skipLocking);
+  }
+
+  /**
+   * @param commitInstantTime   Instant time of the commit
+   * @param pendingRollbackInfo pending rollback instant and plan if rollback 
failed from previous attempt.
+   * @param skipLocking         if this is triggered by another parent 
transaction, locking can be skipped.
+   * @throws HoodieRollbackException if rollback cannot be performed 
successfully
+   * @Deprecated Rollback the inflight record changes with the given commit 
time. This
+   * will be removed in future in favor of {@link 
BaseHoodieWriteClient#restoreToInstant(String, boolean)
+   */
+  @Deprecated
+  public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String 
rollbackInstantTime,
+                          boolean skipLocking) throws HoodieRollbackException {
+    LOG.info("Begin rollback of instant " + commitInstantTime);
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, hadoopConf);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 48aae4ff135..050ad0070da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -74,6 +74,7 @@ import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metrics.HoodieMetrics;
@@ -107,7 +108,6 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 
 /**
  * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
@@ -690,36 +690,52 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param savepointTime Savepoint time to rollback to
    */
   public void restoreToSavepoint(String savepointTime) {
-    boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
-    if (initialMetadataTableIfNecessary) {
+    boolean initializeMetadataTableIfNecessary = 
config.isMetadataTableEnabled();
+    if (initializeMetadataTableIfNecessary) {
       try {
-        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and beforeTimelineStarts
+        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+        // or before the oldest compaction on MDT.
+        // We cannot restore to before the oldest compaction on MDT as we 
don't have the basefiles before that time.
         HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
             .setConf(hadoopConf)
             
.setBasePath(getMetadataTableBasePath(config.getBasePath())).build();
-        // Same as HoodieTableMetadataUtil#processRollbackMetadata
-        HoodieInstant syncedInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+        Option<HoodieInstant> oldestMdtCompaction = 
mdtMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant();
+        boolean deleteMDT = false;
+        if (oldestMdtCompaction.isPresent()) {
+          if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(savepointTime, 
oldestMdtCompaction.get().getTimestamp())) {
+            LOG.warn(String.format("Deleting MDT during restore to %s as the 
savepoint is older than oldest compaction %s on MDT",
+                savepointTime, oldestMdtCompaction.get().getTimestamp()));
+            deleteMDT = true;
+          }
+        }
+
         // The instant required to sync rollback to MDT has been archived and 
the mdt syncing will be failed
         // So that we need to delete the whole MDT here.
-        if 
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
 {
-          HoodieTableMetaClient dataMetaClient = 
HoodieTableMetaClient.builder()
-              .setConf(hadoopConf)
-              .setBasePath(config.getBasePath()).build();
-          deleteMetadataTable(dataMetaClient, getEngineContext(), false);
+        if (!deleteMDT) {
+          HoodieInstant syncedInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+          if 
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
 {
+            LOG.warn(String.format("Deleting MDT during restore to %s as the 
savepoint is older than the MDT timeline %s",
+                savepointTime, 
mdtMetaClient.getCommitsTimeline().firstInstant().get().getTimestamp()));
+            deleteMDT = true;
+          }
+        }
+
+        if (deleteMDT) {
+          HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
           // rollbackToSavepoint action will try to bootstrap MDT at first but 
sync to MDT will fail at the current scenario.
           // so that we need to disable metadata initialized here.
-          initialMetadataTableIfNecessary = false;
+          initializeMetadataTableIfNecessary = false;
         }
       } catch (Exception e) {
         // Metadata directory does not exist
       }
     }
 
-    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty(), initialMetadataTableIfNecessary);
+    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty(), initializeMetadataTableIfNecessary);
     SavepointHelpers.validateSavepointPresence(table, savepointTime);
     ValidationUtils.checkArgument(!config.shouldArchiveBeyondSavepoint(), 
"Restore is not supported when " + 
HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key()
         + " is enabled");
-    restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
+    restoreToInstant(savepointTime, initializeMetadataTableIfNecessary);
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
   }
 
@@ -730,6 +746,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, 
false);
   }
 
+  @Deprecated
+  public boolean rollback(final String commitInstantTime, String 
rollbackInstantTimestamp) throws HoodieRollbackException {
+    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty());
+    Option<HoodiePendingRollbackInfo> pendingRollbackInfo = 
tableServiceClient.getPendingRollbackInfo(table.getMetaClient(), 
commitInstantTime);
+    return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, 
rollbackInstantTimestamp, false);
+  }
+
   /**
    * NOTE : This action requires all writers (ingest and compact) to a table 
to be stopped before proceeding. Revert
    * the (inflight/committed) record changes for all commits after the 
provided instant time.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index df73145a1bb..b63c6a5c649 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -52,9 +54,12 @@ import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -90,6 +95,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -97,13 +103,13 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
 
@@ -433,7 +439,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    */
   private String generateUniqueCommitInstantTime(String initializationTime) {
     // if its initialized via Async indexer, we don't need to alter the init 
time
-    if (initializationTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()) {
+    if (HoodieTableMetadataUtil.isIndexingCommit(initializationTime)) {
       return initializationTime;
     }
     // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
@@ -859,7 +865,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, 
partitionTypes);
 
     // initialize partitions
-    initializeFromFilesystem(indexUptoInstantTime + 
METADATA_INDEXER_TIME_SUFFIX, partitionTypes, Option.empty());
+    
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime),
 partitionTypes, Option.empty());
   }
 
   /**
@@ -906,10 +912,59 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
-    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
-        metadata.getSyncedInstantTime()));
-    closeInternal();
+    dataMetaClient.reloadActiveTimeline();
+
+    // Fetch the commit to restore to (savepointed commit time)
+    HoodieInstant restoreInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, 
instantTime);
+    HoodieInstant requested = 
HoodieTimeline.getRestoreRequestedInstant(restoreInstant);
+    HoodieRestorePlan restorePlan = null;
+    try {
+      restorePlan = TimelineMetadataUtils.deserializeAvroMetadata(
+          
dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), 
HoodieRestorePlan.class);
+    } catch (IOException e) {
+      throw new HoodieIOException("Deserialization of restore plan failed 
whose restore instant time is " + instantTime + " in data table", e);
+    }
+    final String restoreToInstantTime = 
restorePlan.getSavepointToRestoreTimestamp();
+    LOG.info("Triggering restore to " + restoreToInstantTime + " in metadata 
table");
+
+    // fetch the earliest commit to retain and ensure the base file prior to 
the time to restore is present
+    List<HoodieFileGroup> filesGroups = 
metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+
+    boolean cannotRestore = filesGroups.stream().map(fileGroup -> 
fileGroup.getAllFileSlices().map(fileSlice -> 
fileSlice.getBaseInstantTime()).anyMatch(
+        instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1, 
LESSER_THAN_OR_EQUALS, restoreToInstantTime))).anyMatch(canRestore -> 
!canRestore);
+    if (cannotRestore) {
+      throw new HoodieMetadataException(String.format("Can't restore to %s 
since there is no base file in MDT lesser than the commit to restore to. "
+          + "Please delete metadata table and retry", restoreToInstantTime));
+    }
+
+    // Restore requires the existing pipelines to be shutdown. So we can 
safely scan the dataset to find the current
+    // list of files in the filesystem.
+    List<DirectoryInfo> dirInfoList = 
listAllPartitionsFromFilesystem(instantTime);
+    Map<String, DirectoryInfo> dirInfoMap = 
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, 
Function.identity()));
+    dirInfoList.clear();
+
+    getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+    // At this point we have also reverted the cleans which have occurred 
after the restoreToInstantTime. Hence, a sync
+    // is required to bring back those cleans.
+    try {
+      initMetadataReader();
+      Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+      Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+      List<String> partitionsToDelete = new ArrayList<>();
+      fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap, 
partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
+
+      // Even if we don't have any deleted files to sync, we still create an 
empty commit so that we can track the restore has completed.
+      // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
+      // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
+      // or a timestamp with suffix which we use for MDT clean, compaction etc.
+      String syncCommitTime = 
HoodieTableMetadataUtil.createRestoreTimestamp(HoodieActiveTimeline.createNewInstantTime());
+      processAndCommit(syncCommitTime, () -> 
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+          partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
+      closeInternal();
+    } catch (IOException e) {
+      throw new HoodieMetadataException("IOException during MDT restore sync", 
e);
+    }
   }
 
   /**
@@ -921,23 +976,47 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
     if (initialized && metadata != null) {
-      // Is this rollback of an instant that has been synced to the metadata 
table?
-      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
-      boolean wasSynced = 
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
-      if (!wasSynced) {
-        // A compaction may have taken place on metadata table which would 
have included this instant being rolled back.
-        // Revisit this logic to relax the compaction fencing : 
https://issues.apache.org/jira/browse/HUDI-2458
-        Option<String> latestCompaction = metadata.getLatestCompactionTime();
-        if (latestCompaction.isPresent()) {
-          wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
+      // The commit which is being rolled back on the dataset
+      final String commitToRollbackInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+      // Find the deltacommits since the last compaction
+      Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+          
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+
+      // This could be a compaction or deltacommit instant (See 
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+      HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+      HoodieTimeline deltacommitsSinceCompaction = 
deltaCommitsInfo.get().getKey();
+
+      // The deltacommit that will be rolled back
+      HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
+
+      // The commit being rolled back should not be earlier than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+      // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+      if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+        final String compactionInstantTime = compactionInstant.getTimestamp();
+        if 
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, 
compactionInstantTime)) {
+          throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is earlier than the latest compaction %s. "
+                  + "There are %d deltacommits after this compaction: %s", 
commitToRollbackInstantTime, compactionInstantTime,
+              deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
         }
       }
 
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
-          HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
metadataMetaClient.getActiveTimeline(),
-              rollbackMetadata, getRecordsGenerationParams(), instantTime,
-              metadata.getSyncedInstantTime(), wasSynced);
-      commit(instantTime, records);
+      // lets apply a delta commit with DT's rb instant(with special suffix) 
containing following records:
+      // a. any log files as part of RB commit metadata that was added
+      // b. log files added by the commit in DT being rolled back. By rolled 
back, we mean, a rollback block will be added and does not mean it will be 
deleted.
+      // both above list should only be added to FILES partition.
+
+      String rollbackInstantTime = createRollbackTimestamp(instantTime);
+      processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient, 
rollbackMetadata, instantTime));
+
+      if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
+        LOG.info("Rolling back MDT deltacommit " + 
commitToRollbackInstantTime);
+        if (!getWriteClient().rollback(commitToRollbackInstantTime, 
rollbackInstantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + commitToRollbackInstantTime);
+        }
+      } else {
+        LOG.info(String.format("Ignoring rollback of instant %s at %s. The 
commit to rollback is not found in MDT",
+            commitToRollbackInstantTime, instantTime));
+      }
       closeInternal();
     }
   }
@@ -1164,6 +1243,47 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     return true;
   }
 
+  private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, 
DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd,
+                                                           Map<String, 
List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws 
IOException {
+
+    for (String partition : metadata.fetchAllPartitionPaths()) {
+      Path partitionPath = null;
+      if (StringUtils.isNullOrEmpty(partition) && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
+        partitionPath = new Path(dataWriteConfig.getBasePath());
+      } else {
+        partitionPath = new Path(dataWriteConfig.getBasePath(), partition);
+      }
+      final String partitionId = 
HoodieTableMetadataUtil.getPartitionIdentifier(partition);
+      FileStatus[] metadataFiles = 
metadata.getAllFilesInPartition(partitionPath);
+      if (!dirInfoMap.containsKey(partition)) {
+        // Entire partition has been deleted
+        partitionsToDelete.add(partitionId);
+        if (metadataFiles != null && metadataFiles.length > 0) {
+          partitionFilesToDelete.put(partitionId, 
Arrays.stream(metadataFiles).map(f -> 
f.getPath().getName()).collect(Collectors.toList()));
+        }
+      } else {
+        // Some files need to be cleaned and some to be added in the partition
+        Map<String, Long> fsFiles = 
dirInfoMap.get(partition).getFileNameToSizeMap();
+        List<String> mdtFiles = Arrays.stream(metadataFiles).map(mdtFile -> 
mdtFile.getPath().getName()).collect(Collectors.toList());
+        List<String> filesDeleted = Arrays.stream(metadataFiles).map(f -> 
f.getPath().getName())
+            .filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
+        Map<String, Long> filesToAdd = new HashMap<>();
+        // new files could be added to DT due to restore that just happened 
which may not be tracked in RestoreMetadata.
+        dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k,v) -> {
+          if (!mdtFiles.contains(k)) {
+            filesToAdd.put(k,v);
+          }
+        });
+        if (!filesToAdd.isEmpty()) {
+          partitionFilesToAdd.put(partitionId, filesToAdd);
+        }
+        if (!filesDeleted.isEmpty()) {
+          partitionFilesToDelete.put(partitionId, filesDeleted);
+        }
+      }
+    }
+  }
+
   /**
    * Return records that represent update to the record index due to write 
operation on the dataset.
    *
@@ -1282,6 +1402,8 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     return initialized;
   }
 
+  protected abstract BaseHoodieWriteClient getWriteClient();
+
   /**
    * A class which represents a directory and the files and directories inside 
it.
    * <p>
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index f431283ac7a..9d7ecc8be0f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -94,7 +94,7 @@ public class HoodieMetadataWriteUtils {
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
-            .retainCommits(Math.min(writeConfig.getCleanerCommitsRetained(), 
DEFAULT_METADATA_CLEANER_COMMITS_RETAINED))
+            .retainCommits(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED)
             .build())
         // we will trigger archive manually, to ensure only regular writer 
invokes it
         .withArchivalConfig(HoodieArchivalConfig.newBuilder()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index b809a4999e4..395749657f9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -94,11 +93,6 @@ public interface HoodieTableMetadataWriter extends 
Serializable, AutoCloseable {
    */
   void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions);
 
-  /**
-   * It returns write client for metadata table.
-   */
-  BaseHoodieWriteClient getWriteClient();
-
   /**
    * Returns true if the metadata table is initialized.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index b7636a25e7c..a19e067aae1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -151,7 +151,8 @@ public class UpgradeDowngrade {
         fromVersion = prevVersion;
       }
     }
-
+    // Reload the meta client to get the latest table config (which could have 
been updated due to metadata table)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     // Write out the current version in hoodie.properties.updated file
     for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
       metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 7297759f21c..6e6d609c848 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -129,11 +130,12 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
   }
 
   @Override
-  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata) throws IOException {
-    super.addRollback(instantTime, rollbackMetadata);
+  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata, HoodieRollbackPlan rollbackPlan) 
throws IOException {
+    super.addRollback(instantTime, rollbackMetadata, rollbackPlan);
     if (writer != null) {
       writer.update(rollbackMetadata, instantTime);
     }
+    super.addRollbackCompleted(instantTime, rollbackMetadata, false);
     return this;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 1bbe10db0f5..3938df3f3af 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -194,7 +194,9 @@ public class TestMetadataConversionUtils extends 
HoodieCommonTestHarness {
     rollbackMetadata.setPartitionMetadata(partitionMetadataMap);
     rollbackMetadata.setInstantsRollback(Arrays.asList(new 
HoodieInstantInfo("1", HoodieTimeline.COMMIT_ACTION)));
     HoodieTestTable.of(metaClient)
-        .addRollback(instantTime, rollbackMetadata);
+        .addRollback(instantTime, rollbackMetadata, null);
+    HoodieTestTable.of(metaClient)
+        .addRollbackCompleted(instantTime, rollbackMetadata, false);
   }
 
   private void createCommitMetadata(String instantTime) throws Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 9a254409b8d..2346831162b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -37,8 +37,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,8 +123,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   protected void bulkCommit(
-          String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
-          int fileGroupCount) {
+      String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
+      int fileGroupCount) {
     SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
     commitInternal(instantTime, Collections.singletonMap(partitionType, 
records), Option.of(partitioner));
   }
@@ -138,7 +139,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     try (SparkRDDWriteClient writeClient = (SparkRDDWriteClient) 
getWriteClient()) {
       // rollback partially failed writes if any.
       if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
-              && writeClient.rollbackFailedWrites()) {
+          && writeClient.rollbackFailedWrites()) {
         metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
       }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 7cb37c898db..e9d21350c21 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -43,7 +43,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
@@ -88,6 +90,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+
 /**
  * Implementation of a very heavily read-optimized Hoodie Table where, all 
data is stored in base files, with
  * zero read amplification.
@@ -197,6 +201,13 @@ public class HoodieSparkCopyOnWriteTable<T>
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String 
instantTime) {
+    // Delete metadata table to rollback a failed bootstrap. re-attempt of 
bootstrap will re-initialize the mdt.
+    try {
+      LOG.info("Deleting metadata table because we are rolling back failed 
bootstrap. ");
+      deleteMetadataTable(config.getBasePath(), context);
+    } catch (HoodieMetadataException e) {
+      throw new HoodieException("Failed to delete metadata table.", e);
+    }
     new RestorePlanActionExecutor<>(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
     new CopyOnWriteRestoreActionExecutor<>(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index efe0a2cff53..f5929fdc667 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -36,7 +36,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.io.HoodieAppendHandle;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -57,11 +59,16 @@ import 
org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+
 /**
  * Implementation of a more real-time Hoodie Table the provides tradeoffs on 
read and write cost/amplification.
  *
@@ -80,6 +87,8 @@ import java.util.Map;
  */
 public class HoodieSparkMergeOnReadTable<T> extends 
HoodieSparkCopyOnWriteTable<T> implements HoodieCompactionHandler<T> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSparkMergeOnReadTable.class);
+
   HoodieSparkMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext 
context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
   }
@@ -168,6 +177,14 @@ public class HoodieSparkMergeOnReadTable<T> extends 
HoodieSparkCopyOnWriteTable<
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String 
instantTime) {
+    // Delete metadata table to rollback a failed bootstrap. re-attempt of 
bootstrap will re-initialize the mdt.
+    try {
+      LOG.info("Deleting metadata table because we are rolling back failed 
bootstrap. ");
+      deleteMetadataTable(config.getBasePath(), context);
+    } catch (HoodieMetadataException e) {
+      throw new HoodieException("Failed to delete metadata table.", e);
+    }
+
     new RestorePlanActionExecutor<>(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
     new MergeOnReadRestoreActionExecutor<>(context, config, this, instantTime, 
HoodieTimeline.INIT_INSTANT_TS).execute();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index 166f6daf74a..a8b6f77a6a6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -228,7 +228,8 @@ public class TestClientRollback extends 
HoodieClientTestBase {
   @Test
   public void testGetSavepointOldSchema() throws Exception {
     HoodieWriteConfig cfg = 
getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
-        
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+        
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
       HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 5580d07663e..56fabb362b6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -671,6 +671,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             .withMetadataIndexBloomFilterFileGroups(4)
             .withMetadataIndexColumnStats(true)
             .withMetadataIndexBloomFilterFileGroups(2)
+            .withMaxNumDeltaCommitsBeforeCompaction(12) // cannot restore to 
before the oldest compaction on MDT as there are no base files before that time
             .build())
         .build();
     init(tableType, writeConfig);
@@ -1332,16 +1333,13 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         validateMetadata(testTable);
         ++numRollbacks;
       } catch (HoodieMetadataException e) {
-        exceptionRaised = true;
+        // This is expected since we are rolling back commits that are older 
than the latest compaction on the MDT
         break;
       }
     }
-
-    assertFalse(exceptionRaised, "Metadata table should not archive instants 
that are in dataset active timeline");
     // Since each rollback also creates a deltacommit, we can only support 
rolling back of half of the original
     // instants present before rollback started.
-    assertTrue(numRollbacks >= minArchiveCommitsDataset / 2,
-        "Rollbacks of non archived instants should work");
+    assertTrue(numRollbacks >= minArchiveCommitsDataset / 2, "Rollbacks of non 
archived instants should work");
   }
 
   /**
@@ -1591,20 +1589,30 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     extraFiles.put("p1", Collections.singletonList("f10"));
     extraFiles.put("p2", Collections.singletonList("f12"));
     testTable.doRollbackWithExtraFiles("0000004", "0000005", extraFiles);
-    if (!ignoreSpuriousDeletes) {
-      assertThrows(HoodieMetadataException.class, () -> 
validateMetadata(testTable));
-    } else {
-      validateMetadata(testTable);
-    }
+    validateMetadata(testTable);
   }
 
   /**
    * Test several table operations with restore. This test uses 
SparkRDDWriteClient.
    * Once the restore support is ready in HoodieTestTable, then rewrite this 
test.
    */
-  @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperationsWithRestore(HoodieTableType tableType) throws 
Exception {
+  @Test
+  public void testTableOperationsWithRestore() throws Exception {
+    HoodieTableType tableType = COPY_ON_WRITE;
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withRollbackUsingMarkers(false).build();
+    testTableOperationsImpl(engineContext, writeConfig);
+  }
+
+  /**
+   * Test several table operations with restore. This test uses 
SparkRDDWriteClient.
+   * Once the restore support is ready in HoodieTestTable, then rewrite this 
test.
+   */
+  @Test
+  public void testTableOperationsWithRestoreforMOR() throws Exception {
+    HoodieTableType tableType = MERGE_ON_READ;
     init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
@@ -1802,12 +1810,17 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
    * @param writeConfig   - Write config
    */
   private void testTableOperationsImpl(HoodieSparkEngineContext engineContext, 
HoodieWriteConfig writeConfig) throws IOException {
+
+    String newCommitTime = null;
+    List<HoodieRecord> records = new ArrayList<>();
+    List<WriteStatus> writeStatuses = null;
+
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
       // Write 1 (Bulk insert)
-      String newCommitTime = "20210101000100000";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      newCommitTime = "20210101000100000";
+      records = dataGen.generateInserts(newCommitTime, 20);
       client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
@@ -1852,6 +1865,10 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
+    }
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+
       // Compaction
       if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
         newCommitTime = "20210101000700000";
@@ -1860,15 +1877,15 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         validateMetadata(client);
       }
 
-      // Deletes
+      // upserts
       newCommitTime = "20210101000900000";
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
       client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 5);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
 
       // Clean
-      newCommitTime = "20210101000900000";
+      newCommitTime = "20210101001000000";
       client.clean(newCommitTime);
       validateMetadata(client);
 
@@ -2353,10 +2370,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertTrue(oldStatus.getModificationTime() < 
newStatus.getModificationTime());
 
     // Test downgrade by running the downgrader
-    new UpgradeDowngrade(metaClient, writeConfig, context, 
SparkUpgradeDowngradeHelper.getInstance())
-        .run(HoodieTableVersion.TWO, null);
-
-    assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), 
HoodieTableVersion.TWO.versionCode());
+    new UpgradeDowngrade(metaClient, writeConfig, context, 
SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.TWO, null);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(HoodieTableVersion.TWO.versionCode(), 
metaClient.getTableConfig().getTableVersion().versionCode());
     assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not exist");
   }
 
@@ -2522,6 +2538,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertFalse(fs.exists(new Path(getMetadataTableBasePath(basePath)
         + Path.SEPARATOR + 
MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
 
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     // Insert/upsert third batch of records
     client = getHoodieWriteClient(cfg);
     commitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -2544,7 +2561,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         + Path.SEPARATOR + FILES.getPartitionPath())));
     assertTrue(fs.exists(new Path(getMetadataTableBasePath(basePath)
         + Path.SEPARATOR + 
MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
-
   }
 
   /**
@@ -2828,7 +2844,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
   /**
    * Test duplicate operation with same instant timestamp.
-   *
+   * <p>
    * This can happen if the commit on the MDT succeeds but fails on the 
dataset. For some table services like clean,
    * compaction, replace commit, the operation will be retried with the same 
timestamp (taken from inflight). Hence,
    * metadata table will see an additional commit with the same timestamp as a 
previously completed deltacommit.
@@ -3390,6 +3406,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   }
 
   private void changeTableVersion(HoodieTableVersion version) throws 
IOException {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     metaClient.getTableConfig().setTableVersion(version);
     Path propertyFile = new Path(metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index fe89fb927ae..5b8f7cd20c7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -1613,7 +1613,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
         .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))
         .setLoadActiveTimelineOnLoad(true).build();
 
-    for (int i = 1; i <= 17; i++) {
+    for (int i = 1; i <= 18; i++) {
       if (i != 2) {
         testTable.doWriteOperation("000000" + String.format("%02d", i), 
WriteOperationType.UPSERT,
             i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
@@ -1630,77 +1630,77 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
       List<HoodieInstant> metadataTableInstants = 
metadataTableMetaClient.getActiveTimeline()
           .getCommitsTimeline().filterCompletedInstants().getInstants();
 
-      if (i <= 7) {
+      if (i == 1) {
+        // In the metadata table timeline, the first delta commit is 
"00000000000000"
+        assertEquals(i + 1, metadataTableInstants.size());
+        assertTrue(metadataTableInstants.contains(
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+        assertTrue(metadataTableInstants.contains(
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+      } else if (i <= 8) {
         // In the metadata table timeline, the first delta commit is 
"00000000000000"
         // from metadata table init, delta commits "00000001" till "00000007" 
are added
         // later on without archival or compaction
-        assertEquals(i + 1, metadataTableInstants.size());
+        // rollback in DT will also trigger rollback in MDT
+        assertEquals(i, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
             new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
-        IntStream.range(1, i + 1).forEach(j ->
+        // rolled back commits may not be present in MDT timeline (00000001)
+        IntStream.range(2, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
-      } else if (i == 8) {
-        // i == 8
-        // The instant "00000000000000" was archived since it's less than
+      } else if (i == 9) {
+        // i == 9
+        // The instant "00000000000010" was archived since it's less than
         // the earliest commit on the dataset active timeline,
         // the dataset active timeline has instants:
         //   00000002.rollback, 00000007.commit, 00000008.commit
         assertEquals(9, metadataTableInstants.size());
-        assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000007001")));
-        IntStream.range(1, i + 1).forEach(j ->
+        // mdt timeline 00000002, 00000003,..., 00000008, 
000000028001(compaction), 00000009
+        IntStream.range(2, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
-      } else if (i <= 11) {
-        // In the metadata table timeline, the first delta commit is "00000005"
+      } else if (i <= 12) {
+        // In the metadata table timeline, the first delta commit is "00000006"
         // because it equals with the earliest commit on the dataset timeline, 
after archival,
-        // delta commits "00000006" till "00000011" are added later on without 
archival or compaction
-        assertEquals(i - 3, metadataTableInstants.size());
-        assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000007001")));
-        IntStream.range(5, i + 1).forEach(j ->
-            assertTrue(metadataTableInstants.contains(
-                new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION,
-                    "000000" + String.format("%02d", j)))));
-      } else if (i <= 14) {
-        // In the metadata table timeline, the first delta commit is 
"00000007001"
-        // from metadata table compaction, after archival, delta commits 
"00000008"
-        // till "00000014" are added later on without archival or compaction
-        assertEquals(i - 6, metadataTableInstants.size());
+        // delta commits "00000006" till "00000010" are added later on without 
archival or compaction
+        // mdt timeline 00000006, 00000007, 00000008, 00000008.compact, 
00000009, 00000010 for i = 10
+        assertEquals(i - 4, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000007001")));
-        IntStream.range(8, i + 1).forEach(j ->
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000008001")));
+        IntStream.range(6, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION,
                     "000000" + String.format("%02d", j)))));
-      } else if (i == 15) {
-        // Only delta commits "00000008" till "00000015" are in the active 
timeline + 007001 for compaction.
-        assertEquals(9, metadataTableInstants.size());
+      } else if (i <= 16) {
+        // In the metadata table timeline, the first delta commit is 
"00000008001"
+        // from metadata table compaction, after archival, delta commits 
"00000009"
+        // till "00000016" are added later on without archival or compaction
+        // mdt timeline 00000008001, 00000009, 00000010, 00000011, 00000012, 
00000013
+        assertEquals(i - 7, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000007001")));
-        IntStream.range(8, 16).forEach(j ->
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000008001")));
+        IntStream.range(9, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION,
                     "000000" + String.format("%02d", j)))));
-      } else if (i == 16) {
-        // i == 16
-        // dataset timeline has commits "00000015" and "00000016",
-        // the metadata timeline has commits [00000008, 00000016] and 
"00000015001"
+      } else if (i == 17) {
+        // i == 17
+        // commits in MDT [0000009, .... 00000016, 00000016001.compaction, 
00000017]
         assertEquals(10, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000015001")));
-        IntStream.range(8, 17).forEach(j ->
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000016001")));
+        IntStream.range(9, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION,
                     "000000" + String.format("%02d", j)))));
       } else {
-        // i == 17
-        // Only commits [00000013, 00000017] and "00000015001" are on the 
metadata timeline
+        // i == 18
+        // commits in MDT [0000014, .... 00000016, 00000016001.compaction, 
00000017, 00000018]
         assertEquals(6, metadataTableInstants.size());
         assertTrue(metadataTableInstants.contains(
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000015001")));
-        IntStream.range(13, 18).forEach(j ->
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000016001")));
+        IntStream.range(14, i).forEach(j ->
             assertTrue(metadataTableInstants.contains(
                 new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION,
                     "000000" + String.format("%02d", j)))));
@@ -1829,7 +1829,8 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
           .setPartitionMetadata(Collections.emptyMap())
           .setInstantsRollback(Collections.emptyList())
           .build();
-      HoodieTestTable.of(metaClient).addRollback(rollbackTime, 
hoodieRollbackMetadata, isEmpty);
+      HoodieTestTable.of(metaClient).addRollback(rollbackTime, 
hoodieRollbackMetadata, isEmpty, null);
+      HoodieTestTable.of(metaClient).addRollbackCompleted(rollbackTime, 
hoodieRollbackMetadata, isEmpty);
     }
     return new HoodieInstant(inflight, "rollback", rollbackTime);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 46ce709751b..fbc88bb2718 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -482,24 +482,21 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
 
       copyOfRecords.clear();
 
-      // Rollback latest commit first
-      client.restoreToInstant("000", cfg.isMetadataTableEnabled());
+      // Restore to 3rd commit.
+      client.restoreToInstant("003", cfg.isMetadataTableEnabled());
 
-      metaClient = HoodieTableMetaClient.reload(metaClient);
+      metaClient.reloadActiveTimeline();
       allFiles = listAllBaseFilesInPath(hoodieTable);
-      tableView = getHoodieTableFileSystemView(metaClient, 
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      tableView = getHoodieTableFileSystemView(metaClient, 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
 allFiles);
       dataFilesToRead = tableView.getLatestBaseFiles();
-      assertFalse(dataFilesToRead.findAny().isPresent());
+      assertFalse(dataFilesToRead.filter(hoodieBaseFile -> 
hoodieBaseFile.getCommitTime().compareTo("003") > 0).findAny().isPresent());
       TableFileSystemView.SliceView rtView = 
getHoodieTableFileSystemView(metaClient, 
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       List<HoodieFileGroup> fileGroups =
           ((HoodieTableFileSystemView) 
rtView).getAllFileGroups().collect(Collectors.toList());
-      assertTrue(fileGroups.isEmpty());
-
-      // make sure there are no log files remaining
-      assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups()
-          .filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f -> 
f.getLogFiles().count() == 0))
-          .count());
+      assertFalse(fileGroups.isEmpty());
 
+      assertFalse(fileGroups.stream().filter(fileGroup -> 
fileGroup.getAllFileSlices().map(fileSlice
+          -> fileSlice.getBaseInstantTime().compareTo("003") > 0).count() > 
0).findAny().isPresent());
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 60b87020c11..7d8416c368a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -337,6 +337,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
 
     // downgrade to version 3 and check TABLE_CHECKSUM is still present
     new UpgradeDowngrade(metaClient, cfg, context, 
SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.THREE, null);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     assertTableVersionOnDataAndMetadataTable(metaClient, 
HoodieTableVersion.THREE);
     
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
     assertEquals(checksum, 
metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));
@@ -389,7 +390,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     Map<String, String> params = new HashMap<>();
     addNewTableParamsToProps(params, tableName);
     Properties properties = new Properties();
-    params.forEach((k,v) -> properties.setProperty(k, v));
+    params.forEach((k, v) -> properties.setProperty(k, v));
 
     initMetaClient(getTableType(), properties);
     // init config, table and client.
@@ -453,7 +454,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
 
   private void doInsertWithDefaultPartition(SparkRDDWriteClient client) {
     // Write 1 (only inserts)
-    dataGen = new HoodieTestDataGenerator(new 
String[]{DEPRECATED_DEFAULT_PARTITION_PATH});
+    dataGen = new HoodieTestDataGenerator(new String[] 
{DEPRECATED_DEFAULT_PARTITION_PATH});
     String commit1 = "005";
     client.startCommitWithTime(commit1);
     List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
@@ -463,7 +464,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
 
   private void doInsertWithDefaultHiveStylePartition(SparkRDDWriteClient 
client) {
     // Write 1 (only inserts)
-    dataGen = new HoodieTestDataGenerator(new String[]{"partition_path=" + 
DEPRECATED_DEFAULT_PARTITION_PATH});
+    dataGen = new HoodieTestDataGenerator(new String[] {"partition_path=" + 
DEPRECATED_DEFAULT_PARTITION_PATH});
     String commit1 = "005";
     client.startCommitWithTime(commit1);
     List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
@@ -536,7 +537,6 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
             .withMetadataIndexBloomFilter(true)
             .withEnableRecordIndex(true).build())
         .build();
-    HoodieTable table = getHoodieTable(metaClient, config);
     for (MetadataPartitionType partitionType : MetadataPartitionType.values()) 
{
       metaClient.getTableConfig().setMetadataPartitionState(metaClient, 
partitionType, true);
     }
@@ -571,7 +571,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     prepForDowngradeFromVersion(HoodieTableVersion.SIX);
     new UpgradeDowngrade(metaClient, config, context, 
SparkUpgradeDowngradeHelper.getInstance())
         .run(HoodieTableVersion.FIVE, null);
-
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     // validate the relevant table states after downgrade
     assertFalse(Files.exists(recordIndexPartitionPath), "record index 
partition should be deleted.");
     assertEquals(allPartitionsExceptRecordIndex, 
metaClient.getTableConfig().getMetadataPartitions(),
@@ -638,7 +638,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
       // assert marker files
       assertMarkerFilesForDowngrade(table, commitInstant, toVersion == 
HoodieTableVersion.ONE);
     }
-    
+
     // verify hoodie.table.version got downgraded
     metaClient = 
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
         .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
@@ -790,17 +790,17 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
   /**
    * Create two commits and may or may not commit 2nd commit.
    *
-   * @param firstPartitionCommit2FileSlices list to hold file slices in first 
partition.
+   * @param firstPartitionCommit2FileSlices  list to hold file slices in first 
partition.
    * @param secondPartitionCommit2FileSlices list of hold file slices from 
second partition.
-   * @param cfg instance of {@link HoodieWriteConfig}
-   * @param client instance of {@link SparkRDDWriteClient} to use.
-   * @param commitSecondUpsert true if 2nd commit needs to be committed. false 
otherwise.
+   * @param cfg                              instance of {@link 
HoodieWriteConfig}
+   * @param client                           instance of {@link 
SparkRDDWriteClient} to use.
+   * @param commitSecondUpsert               true if 2nd commit needs to be 
committed. false otherwise.
    * @return a pair of list of records from 1st and 2nd batch.
    */
   private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoPartitions(List<FileSlice> 
firstPartitionCommit2FileSlices,
-      List<FileSlice> secondPartitionCommit2FileSlices,
-      HoodieWriteConfig cfg, SparkRDDWriteClient client,
-      boolean commitSecondUpsert) throws IOException {
+                                                                               
             List<FileSlice> secondPartitionCommit2FileSlices,
+                                                                               
             HoodieWriteConfig cfg, SparkRDDWriteClient client,
+                                                                               
             boolean commitSecondUpsert) throws IOException {
     //just generate two partitions
     dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
     //1. prepare data
@@ -857,11 +857,11 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
    * @param table instance of {@link HoodieTable}
    */
   private void prepForUpgradeFromZeroToOne(HoodieTable table) throws 
IOException {
-    List<HoodieInstant> instantsToBeParsed  =
+    List<HoodieInstant> instantsToBeParsed =
         metaClient.getActiveTimeline()
-        .getCommitsTimeline()
-        .getInstantsAsStream()
-        .collect(Collectors.toList());
+            .getCommitsTimeline()
+            .getInstantsAsStream()
+            .collect(Collectors.toList());
     for (HoodieInstant instant : instantsToBeParsed) {
       WriteMarkers writeMarkers =
           WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, 
instant.getTimestamp());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 062f50c21e9..6a1f1845816 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -49,7 +49,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
   // Meta fields are not populated by default for metadata table
   public static final boolean DEFAULT_METADATA_POPULATE_META_FIELDS = false;
   // Default number of commits to retain, without cleaning, on metadata table
-  public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 3;
+  public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 10;
 
   public static final String METADATA_PREFIX = "hoodie.metadata";
   public static final String OPTIMIZED_LOG_BLOCKS_SCAN = 
".optimized.log.blocks.scan.enable";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 6182bc4d4eb..a5d56c91d5e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -168,6 +168,12 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
         s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && 
!s.isCompleted()), details);
   }
 
+  @Override
+  public HoodieTimeline filterRequestedRollbackTimeline() {
+    return new HoodieDefaultTimeline(getInstantsAsStream().filter(
+        s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && 
s.isRequested()), details);
+  }
+
   @Override
   public HoodieTimeline filterPendingCompactionTimeline() {
     return new HoodieDefaultTimeline(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 72a6e910b76..51fe34badd2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -226,6 +226,11 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline filterPendingRollbackTimeline();
 
+  /**
+   * Filter this timeline for requested rollbacks.
+   */
+  HoodieTimeline filterRequestedRollbackTimeline();
+
   /**
    * Create a new Timeline with all the instants after startTs.
    */
@@ -454,6 +459,10 @@ public interface HoodieTimeline extends Serializable {
     return instant.isRequested() ? instant : 
HoodieTimeline.getRequestedInstant(instant);
   }
 
+  static HoodieInstant getRestoreRequestedInstant(HoodieInstant instant) {
+    return instant.isRequested() ? instant : 
HoodieTimeline.getRequestedInstant(instant);
+  }
+
   static HoodieInstant getIndexRequestedInstant(final String timestamp) {
     return new HoodieInstant(State.REQUESTED, INDEXING_ACTION, timestamp);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 56f478e781c..384c96a664e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -78,6 +78,7 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -118,24 +119,35 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
   public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
 
-  // Suffix to use for compaction
-  private static final String COMPACTION_TIMESTAMP_SUFFIX = "001";
+  // Suffix to use for various operations on MDT
+  private enum OperationSuffix {
+    COMPACTION("001"),
+    CLEAN("002"),
+    RESTORE("003"),
+    METADATA_INDEXER("004"),
+    LOG_COMPACTION("005"),
+    ROLLBACK("006");
 
+    static final Set<String> ALL_SUFFIXES = 
Arrays.stream(OperationSuffix.values()).map(o -> 
o.getSuffix()).collect(Collectors.toSet());
 
-  // Suffix to use for clean
-  private static final String CLEAN_TIMESTAMP_SUFFIX = "002";
+    private final String suffix;
 
-  // This suffix used by the delta commits from async indexer 
(`HoodieIndexer`),
-  // when the `indexUptoInstantTime` already exists in the metadata table,
-  // to avoid collision.
-  public static final String METADATA_INDEXER_TIME_SUFFIX = "004";
+    OperationSuffix(String suffix) {
+      this.suffix = suffix;
+    }
 
-  // Suffix to use for log compaction
-  private static final String LOG_COMPACTION_TIMESTAMP_SUFFIX = "005";
+    String getSuffix() {
+      return suffix;
+    }
+
+    static boolean isValidSuffix(String suffix) {
+      return ALL_SUFFIXES.contains(suffix);
+    }
+  }
 
   // This suffix and all after that are used for initialization of the various 
partitions. The unused suffixes lower than this value
   // are reserved for future operations on the MDT.
-  public static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
+  private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
 
   /**
    * Returns whether the files partition of metadata table is ready for read.
@@ -497,7 +509,6 @@ public class HoodieTableMetadataUtil {
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
       HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
           Option.of(new ArrayList<>(deletedFiles)));
-
       records.add(record);
       fileDeleteCount[0] += deletedFiles.size();
       boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
@@ -515,6 +526,46 @@ public class HoodieTableMetadataUtil {
     return records;
   }
 
+  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMissingPartitionRecords(HoodieEngineContext engineContext,
+                                                                        
List<String> deletedPartitions, Map<String, Map<String, Long>> filesAdded,
+                                                                        
Map<String, List<String>> filesDeleted, String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileDeleteCount = {0};
+    int[] filesAddedCount = {0};
+
+    filesAdded.forEach((k,v) -> {
+      String partition = k;
+      Map<String, Long> filestoAdd = v;
+      filesAddedCount[0] += filestoAdd.size();
+      Option<List<String>> filesToDelete = filesDeleted.containsKey(k) ? 
Option.of(filesDeleted.get(k)) : Option.empty();
+      if (filesToDelete.isPresent()) {
+        fileDeleteCount[0] += filesToDelete.get().size();
+      }
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, 
Option.of(filestoAdd), filesToDelete);
+      records.add(record);
+    });
+
+    // there could be partitions which only has missing deleted files.
+    filesDeleted.forEach((k,v) -> {
+      String partition = k;
+      Option<List<String>> filesToDelete = Option.of(v);
+      if (!filesAdded.containsKey(partition)) {
+        fileDeleteCount[0] += filesToDelete.get().size();
+        HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), 
filesToDelete);
+        records.add(record);
+      }
+    });
+
+    if (!deletedPartitions.isEmpty()) {
+      // if there are partitions to be deleted, add them to delete list
+      
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, 
true));
+    }
+
+    LOG.info("Re-adding missing records at " + instantTime + " during Restore. 
#partitions_updated=" + records.size()
+        + ", #files_added=" + filesAddedCount[0] + ", #files_deleted=" + 
fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
+    return Collections.singletonMap(MetadataPartitionType.FILES, 
engineContext.parallelize(records, 1));
+  }
+
   /**
    * Convert clean metadata to bloom filter index records.
    *
@@ -589,99 +640,61 @@ public class HoodieTableMetadataUtil {
         });
   }
 
-  /**
-   * Convert restore action metadata to metadata table records.
-   */
-  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
-      HoodieEngineContext engineContext, HoodieActiveTimeline 
metadataTableTimeline, HoodieRestoreMetadata restoreMetadata,
-      MetadataRecordsGenerationParams recordsGenerationParams, String 
instantTime, Option<String> lastSyncTs) {
-    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
-    final Map<String, Map<String, Long>> partitionToAppendedFiles = new 
HashMap<>();
-    final Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-
-    processRestoreMetadata(metadataTableTimeline, restoreMetadata, 
partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs);
-    final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
-        
engineContext.parallelize(convertFilesToFilesPartitionRecords(partitionToDeletedFiles,
 partitionToAppendedFiles, instantTime, "Restore"), 1);
-    partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecordsRDD);
-
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
 {
-      final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
-          convertFilesToBloomFilterRecords(engineContext, 
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, 
instantTime);
-      partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecordsRDD);
-    }
-
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
 {
-      final HoodieData<HoodieRecord> metadataColumnStatsRDD =
-          convertFilesToColumnStatsRecords(engineContext, 
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
-      partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
metadataColumnStatsRDD);
-    }
-    return partitionToRecordsMap;
-  }
-
-  /**
-   * Aggregates all files deleted and appended to from all rollbacks 
associated with a restore operation then
-   * creates metadata table records for them.
-   *
-   * @param restoreMetadata - Restore action metadata
-   * @return a list of metadata table records
-   */
-  private static void processRestoreMetadata(HoodieActiveTimeline 
metadataTableTimeline,
-                                             HoodieRestoreMetadata 
restoreMetadata,
-                                             Map<String, Map<String, Long>> 
partitionToAppendedFiles,
-                                             Map<String, List<String>> 
partitionToDeletedFiles,
-                                             Option<String> lastSyncTs) {
-    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> 
rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
-        partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)));
-  }
-
   /**
    * Convert rollback action metadata to metadata table records.
+   * <p>
+   * We only need to handle FILES partition here as HUDI rollbacks on MOR 
table may end up adding a new log file. All other partitions
+   * are handled by actual rollback of the deltacommit which added records to 
those partitions.
    */
   public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
-      HoodieEngineContext engineContext, HoodieActiveTimeline 
metadataTableTimeline,
-      HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams 
recordsGenerationParams,
-      String instantTime, Option<String> lastSyncTs, boolean wasSynced) {
-    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
-    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+      HoodieEngineContext engineContext, HoodieTableMetaClient 
dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
 
-    List<HoodieRecord> filesPartitionRecords =
-        convertMetadataToRollbackRecords(metadataTableTimeline, 
rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, 
instantTime, lastSyncTs, wasSynced);
-    final HoodieData<HoodieRecord> rollbackRecordsRDD = 
engineContext.parallelize(filesPartitionRecords, 1);
-    partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD);
+    List<HoodieRecord> filesPartitionRecords = 
convertMetadataToRollbackRecords(rollbackMetadata, instantTime, 
dataTableMetaClient);
+    final HoodieData<HoodieRecord> rollbackRecordsRDD = 
filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData()
+        : engineContext.parallelize(filesPartitionRecords, 
filesPartitionRecords.size());
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
 {
-      final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
-          convertFilesToBloomFilterRecords(engineContext, 
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, 
instantTime);
-      partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecordsRDD);
-    }
+    return Collections.singletonMap(MetadataPartitionType.FILES, 
rollbackRecordsRDD);
+  }
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
 {
-      final HoodieData<HoodieRecord> metadataColumnStatsRDD =
-          convertFilesToColumnStatsRecords(engineContext, 
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
-      partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
metadataColumnStatsRDD);
+  private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient 
dataTableMetaClient, String instantTime,
+                                                    Map<String, Map<String, 
Long>> partitionToFilesMap) {
+    HoodieInstant rollbackInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, 
instantTime);
+    HoodieInstant requested = 
HoodieTimeline.getRollbackRequestedInstant(rollbackInstant);
+    try {
+      HoodieRollbackPlan rollbackPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
+          
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
 HoodieRollbackPlan.class);
+
+      rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> {
+        final String partitionId = 
getPartitionIdentifier(rollbackRequest.getPartitionPath());
+        partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap<>());
+        // fetch only log files that are expected to be RB'd in DT as part of 
this rollback. these log files will not be deleted, but rendered
+        // invalid once rollback is complete.
+        if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
+          Map<String, Long> logFiles = new HashMap<>();
+          rollbackRequest.getLogBlocksToBeDeleted().forEach((k,v) -> {
+            String fileName = k.substring(k.lastIndexOf("/") + 1);
+            // rollback plan may not have size for log files to be rolled 
back. but while merging w/ original commits, the size will get adjusted.
+            logFiles.put(fileName, 1L);
+          });
+          partitionToFilesMap.get(partitionId).putAll(logFiles);
+        }
+      });
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Parsing rollback plan for " + 
rollbackInstant.toString() + " failed ");
     }
-
-    return partitionToRecordsMap;
   }
 
   /**
    * Convert rollback action metadata to files partition records.
+   * Consider only new log files added.
    */
-  private static List<HoodieRecord> 
convertMetadataToRollbackRecords(HoodieActiveTimeline metadataTableTimeline,
-                                                                     
HoodieRollbackMetadata rollbackMetadata,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+  private static List<HoodieRecord> 
convertMetadataToRollbackRecords(HoodieRollbackMetadata rollbackMetadata,
                                                                      String 
instantTime,
-                                                                     
Option<String> lastSyncTs, boolean wasSynced) {
-    processRollbackMetadata(metadataTableTimeline, rollbackMetadata, 
partitionToDeletedFiles,
-        partitionToAppendedFiles, lastSyncTs);
-    if (!wasSynced) {
-      // Since the instant-being-rolled-back was never committed to the 
metadata table, the files added there
-      // need not be deleted. For MOR Table, the rollback appends logBlocks so 
we need to keep the appended files.
-      partitionToDeletedFiles.clear();
-    }
-    return convertFilesToFilesPartitionRecords(partitionToDeletedFiles, 
partitionToAppendedFiles, instantTime, "Rollback");
+                                                                     
HoodieTableMetaClient dataTableMetaClient) {
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles);
+    reAddLogFilesFromRollbackPlan(dataTableMetaClient, instantTime, 
partitionToAppendedFiles);
+    return convertFilesToFilesPartitionRecords(Collections.EMPTY_MAP, 
partitionToAppendedFiles, instantTime, "Rollback");
   }
 
   /**
@@ -690,86 +703,16 @@ public class HoodieTableMetadataUtil {
    * During a rollback files may be deleted (COW, MOR) or rollback blocks be 
appended (MOR only) to files. This
    * function will extract this change file for each partition.
    *
-   * @param metadataTableTimeline    Current timeline of the Metadata Table
    * @param rollbackMetadata         {@code HoodieRollbackMetadata}
-   * @param partitionToDeletedFiles  The {@code Map} to fill with files 
deleted per partition.
    * @param partitionToAppendedFiles The {@code Map} to fill with files 
appended per partition and their sizes.
    */
-  private static void processRollbackMetadata(HoodieActiveTimeline 
metadataTableTimeline,
-                                              HoodieRollbackMetadata 
rollbackMetadata,
-                                              Map<String, List<String>> 
partitionToDeletedFiles,
-                                              Map<String, Map<String, Long>> 
partitionToAppendedFiles,
-                                              Option<String> lastSyncTs) {
+  private static void processRollbackMetadata(HoodieRollbackMetadata 
rollbackMetadata,
+                                              Map<String, Map<String, Long>> 
partitionToAppendedFiles) {
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
-      final String instantToRollback = 
rollbackMetadata.getCommitsRollback().get(0);
       // Has this rollback produced new files?
       boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty();
-      boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && 
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
-
-      // If instant-to-rollback has not been synced to metadata table yet then 
there is no need to update metadata
-      // This can happen in two cases:
-      //  Case 1: Metadata Table timeline is behind the instant-to-rollback.
-      boolean shouldSkip = lastSyncTs.isPresent()
-          && HoodieTimeline.compareTimestamps(instantToRollback, 
HoodieTimeline.GREATER_THAN, lastSyncTs.get());
-
-      if (!hasNonZeroRollbackLogFiles && shouldSkip) {
-        LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, 
given metadata table is already synced upto to %s",
-            instantToRollback, lastSyncTs.get()));
-        return;
-      }
-
-      // Case 2: The instant-to-rollback was never committed to Metadata 
Table. This can happen if the instant-to-rollback
-      // was a failed commit (never completed).
-      //
-      // There are two cases for failed commit that we need to take care of:
-      //   1) The commit was synced to metadata table successfully but the 
dataset meta file switches state failed
-      //   (from INFLIGHT to COMPLETED), the committed files should be rolled 
back thus the rollback metadata
-      //   can not be skipped, usually a failover should be triggered and the 
metadata active timeline expects
-      //   to contain the commit, we could check whether the commit was synced 
to metadata table
-      //   through HoodieActiveTimeline#containsInstant.
-      //
-      //   2) The commit synced to metadata table failed or was never synced 
to metadata table,
-      //      in this case, the rollback metadata should be skipped.
-      //
-      // And in which case,
-      // 
metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())
-      // returns true ?
-      // It is most probably because of compaction rollback, we schedule a 
compaction plan early in the timeline (say t1)
-      // then after a long time schedule and execute the plan then try to 
rollback it.
-      //
-      //     scheduled   execution rollback            compaction actions
-      // -----  t1  -----  t3  ----- t4 -----          dataset timeline
-      //
-      // ----------  t2 (archive) -----------          metadata timeline
-      //
-      // when at time t4, we commit the compaction rollback,the above check 
returns true.
-      HoodieInstant syncedInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback);
-      if 
(metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
 {
-        throw new HoodieMetadataException(String.format("The instant %s 
required to sync rollback of %s has been archived",
-            syncedInstant, instantToRollback));
-      }
-      shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant);
-      if (!hasNonZeroRollbackLogFiles && shouldSkip) {
-        LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, 
since this instant was never committed to Metadata Table",
-            instantToRollback));
-        return;
-      }
-
       final String partition = pm.getPartitionPath();
-      if ((!pm.getSuccessDeleteFiles().isEmpty() || 
!pm.getFailedDeleteFiles().isEmpty()) && !shouldSkip) {
-        if (!partitionToDeletedFiles.containsKey(partition)) {
-          partitionToDeletedFiles.put(partition, new ArrayList<>());
-        }
-
-        // Extract deleted file name from the absolute paths saved in 
getSuccessDeleteFiles()
-        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p 
-> new Path(p).getName())
-            .collect(Collectors.toList());
-        if (!pm.getFailedDeleteFiles().isEmpty()) {
-          deletedFiles.addAll(pm.getFailedDeleteFiles().stream().map(p -> new 
Path(p).getName())
-              .collect(Collectors.toList()));
-        }
-        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
-      }
+      final String partitionId = getPartitionIdentifier(partition);
 
       BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
         // if a file exists in both written log files and rollback log files, 
we want to pick the one that is higher
@@ -778,13 +721,14 @@ public class HoodieTableMetadataUtil {
       };
 
       if (hasRollbackLogFiles) {
-        if (!partitionToAppendedFiles.containsKey(partition)) {
-          partitionToAppendedFiles.put(partition, new HashMap<>());
+        if (!partitionToAppendedFiles.containsKey(partitionId)) {
+          partitionToAppendedFiles.put(partitionId, new HashMap<>());
         }
 
         // Extract appended file name from the absolute paths saved in 
getAppendFiles()
         pm.getRollbackLogFiles().forEach((path, size) -> {
-          partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, fileMergeFn);
+          String fileName = new Path(path).getName();
+          partitionToAppendedFiles.get(partitionId).merge(fileName, size, 
fileMergeFn);
         });
       }
     });
@@ -793,9 +737,9 @@ public class HoodieTableMetadataUtil {
   /**
    * Convert rollback action metadata to files partition records.
    */
-  private static List<HoodieRecord> 
convertFilesToFilesPartitionRecords(Map<String, List<String>> 
partitionToDeletedFiles,
-                                                                        
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                        String 
instantTime, String operation) {
+  protected static List<HoodieRecord> 
convertFilesToFilesPartitionRecords(Map<String, List<String>> 
partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
String instantTime, String operation) {
     List<HoodieRecord> records = new LinkedList<>();
     int[] fileChangeCount = {0, 0}; // deletes, appends
 
@@ -1344,8 +1288,7 @@ public class HoodieTableMetadataUtil {
     // have corresponding completed instant in the data table
     validInstantTimestamps.addAll(
         metadataMetaClient.getActiveTimeline()
-            .filter(instant -> instant.isCompleted()
-                && (isIndexingCommit(instant.getTimestamp()) || 
isLogCompactionInstant(instant)))
+            .filter(instant -> instant.isCompleted() && 
isValidInstant(instant))
             .getInstantsAsStream()
             .map(HoodieInstant::getTimestamp)
             .collect(Collectors.toList()));
@@ -1360,11 +1303,48 @@ public class HoodieTableMetadataUtil {
           validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
         });
 
+    // add restore instants from MDT.
+    
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
+        .getInstants().forEach(instant -> 
validInstantTimestamps.add(instant.getTimestamp()));
+
     // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
     validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 
PARTITION_INITIALIZATION_TIME_SUFFIX));
     return validInstantTimestamps;
   }
 
+  /**
+   * Checks if the Instant is a delta commit and has a valid suffix for 
operations on MDT.
+   *
+   * @param instant {@code HoodieInstant} to check.
+   * @return {@code true} if the instant is valid.
+   */
+  public static boolean isValidInstant(HoodieInstant instant) {
+    // Should be a deltacommit
+    if (!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+      return false;
+    }
+
+    // Check correct length. The timestamp should have a suffix over the 
timeline's timestamp format.
+    final String instantTime = instant.getTimestamp();
+    if (!(instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
OperationSuffix.METADATA_INDEXER.getSuffix().length())) {
+      return false;
+    }
+
+    // Is this a fixed operations suffix
+    final String suffix = instantTime.substring(instantTime.length() - 3);
+    if (OperationSuffix.isValidSuffix(suffix)) {
+      return true;
+    }
+
+    // Is this a index init suffix?
+    if (suffix.compareTo(String.format("%03d", 
PARTITION_INITIALIZATION_TIME_SUFFIX)) >= 0) {
+      return true;
+    }
+
+    return false;
+  }
+
   /**
    * Checks if a delta commit in metadata table is written by async indexer.
    * <p>
@@ -1375,20 +1355,8 @@ public class HoodieTableMetadataUtil {
    * @return {@code true} if from async indexer; {@code false} otherwise.
    */
   public static boolean isIndexingCommit(String instantTime) {
-    return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-            && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
-  }
-
-  /**
-   * This method returns true if the instant provided belongs to Log 
compaction instant.
-   * For metadata table, log compaction instant are created with Suffix "004" 
provided in LOG_COMPACTION_TIMESTAMP_SUFFIX.
-   * @param instant Hoodie completed instant.
-   * @return true for logcompaction instants flase otherwise.
-   */
-  public static boolean isLogCompactionInstant(HoodieInstant instant) {
-    return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)
-        && instant.getTimestamp().length() == MILLIS_INSTANT_ID_LENGTH + 
LOG_COMPACTION_TIMESTAMP_SUFFIX.length()
-        && instant.getTimestamp().endsWith(LOG_COMPACTION_TIMESTAMP_SUFFIX);
+    return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
OperationSuffix.METADATA_INDEXER.getSuffix().length()
+            && 
instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
   }
 
   /**
@@ -1590,14 +1558,26 @@ public class HoodieTableMetadataUtil {
    * Create the timestamp for a clean operation on the metadata table.
    */
   public static String createCleanTimestamp(String timestamp) {
-    return timestamp + CLEAN_TIMESTAMP_SUFFIX;
+    return timestamp + OperationSuffix.CLEAN.getSuffix();
+  }
+
+  public static String createRollbackTimestamp(String timestamp) {
+    return timestamp + OperationSuffix.ROLLBACK.getSuffix();
+  }
+
+  public static String createRestoreTimestamp(String timestamp) {
+    return timestamp + OperationSuffix.RESTORE.getSuffix();
+  }
+
+  public static String createAsyncIndexerTimestamp(String timestamp) {
+    return timestamp + OperationSuffix.METADATA_INDEXER.getSuffix();
   }
 
   /**
    * Create the timestamp for a compaction operation on the metadata table.
    */
   public static String createCompactionTimestamp(String timestamp) {
-    return timestamp + COMPACTION_TIMESTAMP_SUFFIX;
+    return timestamp + OperationSuffix.COMPACTION.getSuffix();
   }
 
   /**
@@ -1614,7 +1594,7 @@ public class HoodieTableMetadataUtil {
    * Create the timestamp for a compaction operation on the metadata table.
    */
   public static String createLogCompactionTimestamp(String timestamp) {
-    return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX;
+    return timestamp + OperationSuffix.LOG_COMPACTION.getSuffix();
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 37bd983f17b..49095683a2b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieSliceInfo;
@@ -368,13 +369,23 @@ public class HoodieTestTable {
     return this;
   }
 
-  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata) throws IOException {
-    return addRollback(instantTime, rollbackMetadata, false);
+  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata,HoodieRollbackPlan rollbackPlan) throws 
IOException {
+    return addRollback(instantTime, rollbackMetadata, false, rollbackPlan);
   }
 
-  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException {
-    createRequestedRollbackFile(basePath, instantTime);
+  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty, HoodieRollbackPlan 
rollbackPlan) throws IOException {
+    if (rollbackPlan != null) {
+      createRequestedRollbackFile(basePath, instantTime, rollbackPlan);
+    } else {
+      createRequestedRollbackFile(basePath, instantTime);
+    }
     createInflightRollbackFile(basePath, instantTime);
+    // createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty);
+    currentInstantTime = instantTime;
+    return this;
+  }
+
+  public HoodieTestTable addRollbackCompleted(String instantTime, 
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException {
     createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty);
     currentInstantTime = instantTime;
     return this;
@@ -386,7 +397,7 @@ public class HoodieTestTable {
     return this;
   }
 
-  public HoodieRollbackMetadata getRollbackMetadata(String 
instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta) throws 
Exception {
+  public HoodieRollbackMetadata getRollbackMetadata(String 
instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta, boolean 
shouldAddRollbackLogFile) throws Exception {
     HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata();
     
rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete));
     rollbackMetadata.setStartRollbackTime(instantTimeToDelete);
@@ -396,11 +407,13 @@ public class HoodieTestTable {
       rollbackPartitionMetadata.setPartitionPath(entry.getKey());
       rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue());
       rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>());
-      long rollbackLogFileSize = 50 + RANDOM.nextInt(500);
-      String fileId = UUID.randomUUID().toString();
-      String logFileName = logFileName(instantTimeToDelete, fileId, 0);
-      FileCreateUtils.createLogFile(basePath, entry.getKey(), 
instantTimeToDelete, fileId, 0, (int) rollbackLogFileSize);
-      rollbackPartitionMetadata.setRollbackLogFiles(singletonMap(logFileName, 
rollbackLogFileSize));
+      if (shouldAddRollbackLogFile) {
+        long rollbackLogFileSize = 50 + RANDOM.nextInt(500);
+        String fileId = UUID.randomUUID().toString();
+        String logFileName = logFileName(instantTimeToDelete, fileId, 0);
+        FileCreateUtils.createLogFile(basePath, entry.getKey(), 
instantTimeToDelete, fileId, 0, (int) rollbackLogFileSize);
+        
rollbackPartitionMetadata.setRollbackLogFiles(singletonMap(logFileName, 
rollbackLogFileSize));
+      }
       partitionMetadataMap.put(entry.getKey(), rollbackPartitionMetadata);
     }
     rollbackMetadata.setPartitionMetadata(partitionMetadataMap);
@@ -819,11 +832,24 @@ public class HoodieTestTable {
       throw new IllegalArgumentException("Instant to rollback not present in 
timeline: " + commitTimeToRollback);
     }
     Map<String, List<String>> partitionFiles = 
getPartitionFiles(commitMetadata.get());
-    HoodieRollbackMetadata rollbackMetadata = 
getRollbackMetadata(commitTimeToRollback, partitionFiles);
+    HoodieRollbackMetadata rollbackMetadata = 
getRollbackMetadata(commitTimeToRollback, partitionFiles, false);
     for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
       deleteFilesInPartition(entry.getKey(), entry.getValue());
     }
-    return addRollback(commitTime, rollbackMetadata);
+    HoodieRollbackPlan rollbackPlan = getHoodieRollbackPlan(commitTime, 
partitionFiles);
+    HoodieTestTable testTable = addRollback(commitTime, rollbackMetadata, 
rollbackPlan);
+    return testTable.addRollbackCompleted(commitTime, rollbackMetadata, false);
+  }
+
+  private HoodieRollbackPlan getHoodieRollbackPlan(String commitTime, 
Map<String, List<String>> partitionFiles) {
+    HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+    List<HoodieRollbackRequest> rollbackRequestList = 
partitionFiles.keySet().stream()
+        .map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, 
EMPTY_STRING,
+                partitionFiles.get(partition), Collections.emptyMap()))
+        .collect(Collectors.toList());
+    rollbackPlan.setRollbackRequests(rollbackRequestList);
+    rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime, 
HoodieTimeline.COMMIT_ACTION));
+    return rollbackPlan;
   }
 
   public HoodieTestTable doRollbackWithExtraFiles(String commitTimeToRollback, 
String commitTime, Map<String, List<String>> extraFiles) throws Exception {
@@ -841,8 +867,10 @@ public class HoodieTestTable {
         partitionFiles.get(entry.getKey()).addAll(entry.getValue());
       }
     }
-    HoodieRollbackMetadata rollbackMetadata = 
getRollbackMetadata(commitTimeToRollback, partitionFiles);
-    return addRollback(commitTime, rollbackMetadata);
+    HoodieRollbackMetadata rollbackMetadata = 
getRollbackMetadata(commitTimeToRollback, partitionFiles, false);
+    HoodieRollbackPlan rollbackPlan = getHoodieRollbackPlan(commitTime, 
partitionFiles);
+    HoodieTestTable testTable = addRollback(commitTime, rollbackMetadata, 
rollbackPlan);
+    return testTable.addRollbackCompleted(commitTime, rollbackMetadata, false);
   }
 
   public HoodieTestTable doRestore(String commitToRestoreTo, String 
restoreTime) throws Exception {
@@ -857,7 +885,7 @@ public class HoodieTestTable {
       }
       Map<String, List<String>> partitionFiles = 
getPartitionFiles(commitMetadata.get());
       rollbackMetadataMap.put(commitInstantToRollback.getTimestamp(),
-          
Collections.singletonList(getRollbackMetadata(commitInstantToRollback.getTimestamp(),
 partitionFiles)));
+          
Collections.singletonList(getRollbackMetadata(commitInstantToRollback.getTimestamp(),
 partitionFiles, false)));
       for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
         deleteFilesInPartition(entry.getKey(), entry.getValue());
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index c4b8f9eb204..ee2f50cb20c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -278,19 +278,18 @@ public class TestStreamWriteOperatorCoordinator {
     instant = mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(10));
-    assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant + "002"));
-    assertThat(completedTimeline.lastInstant().get().getAction(), 
is(HoodieTimeline.CLEAN_ACTION));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(9));
 
     // write another commit
     mockWriteWithMetadata();
+
     // write another commit
     instant = mockWriteWithMetadata();
     // write another commit to trigger compaction
     mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(14));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(13));
     assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
is(instant + "001"));
     assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), 
is(HoodieTimeline.COMMIT_ACTION));
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 7f624033469..d94f065ee0a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -256,6 +256,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
             .withBootstrapParallelism(3)
             .withBootstrapModeSelector(bootstrapModeSelectorClass)
             .withBootstrapModeForRegexMatch(modeForRegexMatch).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
         .build();
 
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 6730ba591d6..e1acde4fcd6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -65,7 +65,6 @@ import static 
org.apache.hudi.common.table.HoodieTableMetaClient.reload;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
 import static 
org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS;
 import static 
org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -186,7 +185,7 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
     HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
         context(), metadataConfig, metaClient.getBasePathV2().toString());
     HoodieTableMetaClient metadataMetaClient = 
metadata.getMetadataMetaClient();
-    String mdtCommitTime = indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX;
+    String mdtCommitTime = 
HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime);
     
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime));
 
     // Reverts both instants to inflight state, to simulate inflight indexing 
instants

Reply via email to