This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch partition-bucket-index
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3e1687728fece9f93b8c86d75b43a39ca4620e13
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Mar 20 07:50:59 2025 +0530

    [HUDI-9013] Add backwards compatible MDT writer support and reader support 
with tbl v6 (#12948)
---
 .../apache/hudi/cli/commands/MetadataCommand.java  |  10 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  67 ++--
 ...ieBackedTableMetadataWriterTableVersionSix.java | 337 +++++++++++++++++++++
 .../org/apache/hudi/metrics/HoodieMetrics.java     |   9 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |   4 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |   6 +-
 ...ieBackedTableMetadataWriterTableVersionSix.java | 164 ++++++++++
 .../hudi/metadata/SparkMetadataWriterFactory.java  |  55 ++++
 .../org/apache/hudi/table/HoodieSparkTable.java    |   6 +-
 .../functional/TestHoodieBackedTableMetadata.java  |  83 +++--
 .../client/functional/TestHoodieMetadataBase.java  |   3 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  16 +
 .../hudi/common/testutils/HoodieTestUtils.java     |   4 +
 .../procedures/CreateMetadataTableProcedure.scala  |   4 +-
 .../procedures/InitMetadataTableProcedure.scala    |   4 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  11 +-
 .../hudi/functional/ColumnStatIndexTestBase.scala  |  37 ++-
 .../hudi/functional/RecordLevelIndexTestBase.scala |   4 +-
 .../hudi/functional/TestColumnStatsIndex.scala     | 115 ++++---
 .../hudi/functional/TestRecordLevelIndex.scala     |  17 +-
 .../TestRecordLevelIndexTableVersionSix.scala      |  29 ++
 21 files changed, 856 insertions(+), 129 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index cabd9fdcaf6..5b98700d04f 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -37,7 +37,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
@@ -116,7 +116,7 @@ public class MetadataCommand {
   public String create(
       @ShellOption(value = "--sparkMaster", defaultValue = 
SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master
   ) throws Exception {
-    HoodieCLI.getTableMetaClient();
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
     StoragePath metadataPath = new 
StoragePath(getMetadataTableBasePath(HoodieCLI.basePath));
     try {
       List<StoragePathInfo> pathInfoList = 
HoodieCLI.storage.listDirectEntries(metadataPath);
@@ -131,7 +131,7 @@ public class MetadataCommand {
     HoodieTimer timer = HoodieTimer.start();
     HoodieWriteConfig writeConfig = getWriteConfig();
     initJavaSparkContext(Option.of(master));
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new 
HoodieSparkEngineContext(jsc))) {
+    try (HoodieTableMetadataWriter writer = 
SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new 
HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) {
       return String.format("Created Metadata Table in %s (duration=%.2f 
secs)", metadataPath, timer.endTimer() / 1000.0);
     }
   }
@@ -163,7 +163,7 @@ public class MetadataCommand {
   public String init(@ShellOption(value = "--sparkMaster", defaultValue = 
SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
                      @ShellOption(value = {"--readonly"}, defaultValue = 
"false",
                          help = "Open in read-only mode") final boolean 
readOnly) throws Exception {
-    HoodieCLI.getTableMetaClient();
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
     StoragePath metadataPath = new 
StoragePath(getMetadataTableBasePath(HoodieCLI.basePath));
     try {
       HoodieCLI.storage.listDirectEntries(metadataPath);
@@ -176,7 +176,7 @@ public class MetadataCommand {
     if (!readOnly) {
       HoodieWriteConfig writeConfig = getWriteConfig();
       initJavaSparkContext(Option.of(master));
-      try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new 
HoodieSparkEngineContext(jsc))) {
+      try (HoodieTableMetadataWriter writer = 
SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new 
HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) {
         // Empty
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b550b166cf1..d7327734dc9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -120,7 +121,6 @@ import static 
org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
 import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
-import static 
org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
 import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
 import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
 
@@ -133,7 +133,7 @@ import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readS
  */
 public abstract class HoodieBackedTableMetadataWriter<I> implements 
HoodieTableMetadataWriter {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
+  static final Logger LOG = 
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
 
   // Virtual keys support for metadata table. This Field is
   // from the metadata payload schema.
@@ -143,7 +143,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   // Record index has a fixed size schema. This has been calculated based on 
experiments with default settings
   // for block size (1MB), compression (GZ) and disabling the hudi metadata 
fields.
   private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
-  private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
+  protected transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
 
   protected HoodieWriteConfig metadataWriteConfig;
   protected HoodieWriteConfig dataWriteConfig;
@@ -155,8 +155,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   protected StorageConfiguration<?> storageConf;
   protected final transient HoodieEngineContext engineContext;
   protected final List<MetadataPartitionType> enabledPartitionTypes;
+
   // Is the MDT bootstrapped and ready to be read from
-  private boolean initialized = false;
+  boolean initialized = false;
   private HoodieTableFileSystemView metadataView;
 
   /**
@@ -193,6 +194,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
   }
 
+  List<MetadataPartitionType> getEnabledPartitions(TypedProperties 
writeConfigProps, HoodieTableMetaClient metaClient) {
+    return MetadataPartitionType.getEnabledPartitions(writeConfigProps, 
metaClient);
+  }
+
   abstract HoodieTable getTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient);
 
   private void mayBeReinitMetadataReader() {
@@ -276,7 +281,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
       // Otherwise, we use the timestamp of the latest completed action.
       String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
-      initializeFromFilesystem(initializationTime, metadataPartitionsToInit);
+      if (!initializeFromFilesystem(initializationTime, 
metadataPartitionsToInit, inflightInstantTimestamp)) {
+        LOG.error("Failed to initialize MDT from filesystem");
+        return false;
+      }
       metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
       return true;
     } catch (IOException e) {
@@ -339,14 +347,22 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return false;
   }
 
+  boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, 
Option<String> inflightInstantTimestamp) {
+    return true;
+  }
+
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
    * @param initializationTime       - Timestamp to use for the commit
    * @param partitionsToInit         - List of MDT partitions to initialize
+   * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private void initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit) throws IOException {
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit, Option<String> 
inflightInstantTimestamp) throws IOException {
     Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
+    if (!shouldInitializeFromFilesystem(pendingDataInstants, 
inflightInstantTimestamp)) {
+      return false;
+    }
 
     // FILES partition is always required and is initialized first
     boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
@@ -496,6 +512,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       long totalInitTime = partitionInitTimer.endTimer();
       LOG.info("Initializing {} index in metadata table took {} in ms", 
partitionTypeName, totalInitTime);
     }
+    return true;
   }
 
   /**
@@ -513,7 +530,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @param initializationTime Timestamp from dataset to use for initialization
    * @return a unique timestamp for MDT
    */
-  private String generateUniqueInstantTime(String initializationTime) {
+  String generateUniqueInstantTime(String initializationTime) {
     // If it's initialized via Async indexer, we don't need to alter the init 
time.
     // otherwise yields the timestamp on the fly.
     // This function would be called multiple times in a single application if 
multiple indexes are being
@@ -799,13 +816,17 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         .collect(Collectors.toSet());
   }
 
+  String getTimelineHistoryPath() {
+    return TIMELINE_HISTORY_PATH.defaultValue();
+  }
+
   private HoodieTableMetaClient initializeMetaClient() throws IOException {
     HoodieTableMetaClient.newTableBuilder()
         .setTableType(HoodieTableType.MERGE_ON_READ)
         .setTableName(dataWriteConfig.getTableName() + 
METADATA_TABLE_NAME_SUFFIX)
         // MT version should match DT, such that same readers can read both.
         .setTableVersion(dataWriteConfig.getWriteVersion())
-        .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
+        .setArchiveLogFolder(getTimelineHistoryPath())
         .setPayloadClassName(HoodieMetadataPayload.class.getName())
         .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
         .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
@@ -1005,7 +1026,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * Interface to assist in converting commit metadata to List of 
HoodieRecords to be written to metadata table.
    * Updates of different commit metadata uses the same method to convert to 
HoodieRecords and hence.
    */
-  private interface ConvertMetadataFunction {
+  interface ConvertMetadataFunction {
     Map<String, HoodieData<HoodieRecord>> convertMetadata();
   }
 
@@ -1015,7 +1036,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @param instantTime             instant time of interest.
    * @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
    */
-  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+  void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
     Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
     if (initialized && metadata != null) {
       // convert metadata and filter only the entries whose partition path are 
in partitionsToUpdate
@@ -1067,7 +1088,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, 
partitionPaths);
 
     // initialize partitions
-    initializeFromFilesystem(instantTime, partitionTypes);
+    initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
   /**
@@ -1248,7 +1269,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // We cannot create a deltaCommit at instantTime now because a future 
(rollback) block has already been written to the logFiles.
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
       // or a new timestamp which we use for MDT clean, compaction etc.
-      String syncCommitTime = writeClient.createNewInstantTime(false);
+      String syncCommitTime = createRestoreInstantTime();
       processAndCommit(syncCommitTime, () -> 
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
           partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
       closeInternal();
@@ -1257,6 +1278,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
+  String createRestoreInstantTime() {
+    return writeClient.createNewInstantTime(false);
+  }
+
   /**
    * Update from {@code HoodieRollbackMetadata}.
    *
@@ -1493,12 +1518,12 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
         return;
       }
       // Check and run clean operations.
-      cleanIfNecessary(writeClient);
+      cleanIfNecessary(writeClient, lastInstant.get().requestedTime());
       // Do timeline validation before scheduling compaction/logCompaction 
operations.
-      if (validateCompactionScheduling()) {
+      if (validateCompactionScheduling(inFlightInstantTimestamp, 
lastInstant.get().requestedTime())) {
         String latestDeltacommitTime = lastInstant.get().requestedTime();
         LOG.info("Latest deltacommit time found is {}, running compaction 
operations.", latestDeltacommitTime);
-        compactIfNecessary(writeClient);
+        compactIfNecessary(writeClient, Option.of(latestDeltacommitTime));
       }
       writeClient.archive();
       LOG.info("All the table services operations on MDT completed 
successfully");
@@ -1552,7 +1577,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * 2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
    * deltacommit.
    */
-  protected void compactIfNecessary(BaseHoodieWriteClient writeClient) {
+  void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient, 
Option<String> latestDeltaCommitTimeOpt) {
     // IMPORTANT: Trigger compaction with max instant time that is smaller 
than(or equals) the earliest pending instant from DT.
     // The compaction planner will manage to filter out the log files that 
finished with greater completion time.
     // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more 
details.
@@ -1586,7 +1611,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) {
+  protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
     Option<HoodieInstant> lastCompletedCompactionInstant = 
metadataMetaClient.getActiveTimeline()
         .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
     if (lastCompletedCompactionInstant.isPresent()
@@ -1603,14 +1628,18 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(metadataMetaClient.createNewInstantTime(false));
+    writeClient.clean(createCleanInstantTime(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
+  String createCleanInstantTime(String instantTime) {
+    return metadataMetaClient.createNewInstantTime(false);
+  }
+
   /**
    * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
    */
-  protected boolean validateCompactionScheduling() {
+  boolean validateCompactionScheduling(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
     // Under the log compaction scope, the sequence of the log-compaction and 
compaction needs to be ensured because metadata items such as RLI
     // only has proc-time ordering semantics. For "ensured", it means the 
completion sequence of the log-compaction/compaction is the same as the start 
sequence.
     if (metadataWriteConfig.isLogCompactionEnabled()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
new file mode 100644
index 00000000000..33e5070b8fb
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
+
+/**
+ * HoodieBackedTableMetadataWriter for tables with version 6. The class 
derives most of the functionality from HoodieBackedTableMetadataWriter
+ * and overrides some behaviour to make it compatible for version 6.
+ */
+public abstract class HoodieBackedTableMetadataWriterTableVersionSix<I> 
extends HoodieBackedTableMetadataWriter<I> {
+
+  private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10;
+
+  /**
+   * Hudi backed table metadata writer.
+   *
+   * @param storageConf                Storage configuration to use for the 
metadata writer
+   * @param writeConfig                Writer config
+   * @param failedWritesCleaningPolicy Cleaning policy on failed writes
+   * @param engineContext              Engine context
+   * @param inflightInstantTimestamp   Timestamp of any instant in progress
+   */
+  protected 
HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?> 
storageConf,
+                                                           HoodieWriteConfig 
writeConfig,
+                                                           
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                           HoodieEngineContext 
engineContext,
+                                                           Option<String> 
inflightInstantTimestamp) {
+    super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
+  }
+
+  @Override
+  List<MetadataPartitionType> getEnabledPartitions(TypedProperties 
writeConfigProps, HoodieTableMetaClient metaClient) {
+    return MetadataPartitionType.getEnabledPartitions(writeConfigProps, 
metaClient).stream()
+        .filter(partition -> 
!partition.equals(MetadataPartitionType.SECONDARY_INDEX))
+        .filter(partition -> 
!partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
+        .filter(partition -> 
!partition.equals(MetadataPartitionType.PARTITION_STATS))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, 
Option<String> inflightInstantTimestamp) {
+    if (pendingDataInstants.stream()
+        .anyMatch(i -> !inflightInstantTimestamp.isPresent() || 
!i.equals(inflightInstantTimestamp.get()))) {
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+      LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: {}",
+          Arrays.toString(pendingDataInstants.toArray()));
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  String generateUniqueInstantTime(String initializationTime) {
+    // if its initialized via Async indexer, we don't need to alter the init 
time
+    HoodieTimeline dataIndexTimeline = 
dataMetaClient.getActiveTimeline().filter(instant -> 
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+    if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline, 
initializationTime)) {
+      return initializationTime;
+    }
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
+  }
+
+  /**
+   * Create the timestamp for an index initialization operation on the 
metadata table.
+   * <p>
+   * Since many MDT partitions can be initialized one after other the offset 
parameter controls generating a
+   * unique timestamp.
+   */
+  private String createIndexInitTimestamp(String timestamp, int offset) {
+    return String.format("%s%03d", timestamp, 
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
+  }
+
+  @Override
+  String getTimelineHistoryPath() {
+    return ARCHIVELOG_FOLDER.defaultValue();
+  }
+
+  /**
+   * Validates the timeline for both main and metadata tables to ensure 
compaction on MDT can be scheduled.
+   */
+  @Override
+  boolean validateCompactionScheduling(Option<String> 
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+    // we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
+    // Whenever you want to change this logic, please ensure all below 
scenarios are considered.
+    // a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+    // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, the latest compaction instant time in 
MDT represents
+    // any instants before that is already synced with metadata table.
+    // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
+    // instant before c4 is synced with metadata table.
+    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+    if (!pendingInstants.isEmpty()) {
+      checkNumDeltaCommits(metadataMetaClient, 
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+      LOG.info(String.format(
+          "Cannot compact metadata table as there are %d inflight instants in 
data table before latest deltacommit in metadata table: %s. Inflight instants 
in data table: %s",
+          pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, 
Arrays.toString(pendingInstants.toArray())));
+      return false;
+    }
+
+    // Check if there are any pending compaction or log compaction instants in 
the timeline.
+    // If pending compact/logCompaction operations are found abort scheduling 
new compaction/logCompaction operations.
+    Option<HoodieInstant> pendingLogCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+    Option<HoodieInstant> pendingCompactionInstant =
+        
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+    if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
+      LOG.warn(String.format("Not scheduling compaction or logCompaction, 
since a pending compaction instant %s or logCompaction %s instant is present",
+          pendingCompactionInstant, pendingLogCompactionInstant));
+      return false;
+    }
+    return true;
+  }
+
+  private void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int 
maxNumDeltaCommitsWhenPending) {
+    final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
+    Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
+        .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+    int numDeltaCommits = lastCompaction.isPresent()
+        ? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+        : activeTimeline.getDeltaCommitTimeline().countInstants();
+    if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+      throw new HoodieMetadataException(String.format("Metadata table's 
deltacommits exceeded %d: "
+              + "this is likely caused by a pending instant in the data table. 
Resolve the pending instant "
+              + "or adjust `%s`, then restart the pipeline.",
+          maxNumDeltaCommitsWhenPending, 
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+    }
+  }
+
+  /**
+   * Update from {@code HoodieRollbackMetadata}.
+   *
+   * @param rollbackMetadata {@code HoodieRollbackMetadata}
+   * @param instantTime      Timestamp at which the rollback was performed
+   */
+  @Override
+  public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
+    if (initialized && metadata != null) {
+      // The commit which is being rolled back on the dataset
+      final String commitToRollbackInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+      // Find the deltacommits since the last compaction
+      Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+          
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+
+      // This could be a compaction or deltacommit instant (See 
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+      HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+      HoodieTimeline deltacommitsSinceCompaction = 
deltaCommitsInfo.get().getKey();
+
+      // The deltacommit that will be rolled back
+      HoodieInstant deltaCommitInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime,
+          InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+
+      validateRollbackVersionSix(commitToRollbackInstantTime, 
compactionInstant, deltacommitsSinceCompaction);
+
+      // lets apply a delta commit with DT's rb instant containing following 
records:
+      // a. any log files as part of RB commit metadata that was added
+      // b. log files added by the commit in DT being rolled back. By rolled 
back, we mean, a rollback block will be added and does not mean it will be 
deleted.
+      // both above list should only be added to FILES partition.
+      processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient, 
rollbackMetadata, instantTime));
+
+      String rollbackInstantTime = createRollbackTimestamp(instantTime);
+      if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
+        LOG.info("Rolling back MDT deltacommit " + 
commitToRollbackInstantTime);
+        if (!getWriteClient().rollback(commitToRollbackInstantTime, 
rollbackInstantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + commitToRollbackInstantTime);
+        }
+      } else {
+        LOG.info("Ignoring rollback of instant {} at {}. The commit to 
rollback is not found in MDT",
+            commitToRollbackInstantTime, instantTime);
+      }
+      closeInternal();
+    }
+  }
+
+  private void validateRollbackVersionSix(
+      String commitToRollbackInstantTime,
+      HoodieInstant compactionInstant,
+      HoodieTimeline deltacommitsSinceCompaction) {
+    // The commit being rolled back should not be earlier than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+    // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+      final String compactionInstantTime = compactionInstant.requestedTime();
+      if (compareTimestamps(commitToRollbackInstantTime, 
LESSER_THAN_OR_EQUALS, compactionInstantTime)) {
+        throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is earlier than the latest compaction %s. "
+                + "There are %d deltacommits after this compaction: %s", 
commitToRollbackInstantTime, compactionInstantTime,
+            deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
+      }
+    }
+  }
+
+  /**
+   * Perform a compaction on the Metadata Table.
+   * <p>
+   * Cases to be handled:
+   * 1. We cannot perform compaction if there are previous inflight operations 
on the dataset. This is because
+   * a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
+   * <p>
+   * 2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   * deltacommit.
+   */
+  @Override
+  void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient, 
Option<String> latestDeltaCommitTimeOpt) {
+    // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
+    // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
+    // metadata table.
+    final String compactionInstantTime = 
createCompactionTimestamp(latestDeltaCommitTimeOpt.get());
+
+    // we need to avoid checking compaction w/ same instant again.
+    // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
+    // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
+    // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
+    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
+      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+      writeClient.compact(compactionInstantTime);
+    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+      // Schedule and execute log compaction with suffixes based on the same 
instant time. This ensures that any future
+      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
+      // metadata table.
+      final String logCompactionInstantTime = 
createLogCompactionTimestamp(latestDeltaCommitTimeOpt.get());
+      if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
+        LOG.info("Log compaction with same {} time is already present in the 
timeline.", logCompactionInstantTime);
+      } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
+        LOG.info("Log compaction is scheduled for timestamp {}", 
logCompactionInstantTime);
+        writeClient.logCompact(logCompactionInstantTime);
+      }
+    }
+  }
+
+  @Override
+  String createCleanInstantTime(String instantTime) {
+    return createCleanTimestamp(instantTime);
+  }
+
+  @Override
+  String createRestoreInstantTime() {
+    return createRestoreTimestamp(writeClient.createNewInstantTime(false));
+  }
+
+  /**
+   * Create the timestamp for a compaction operation on the metadata table.
+   */
+  private String createCompactionTimestamp(String timestamp) {
+    return timestamp + getCompactionOperationSuffix();
+  }
+
+  /**
+   * Create the timestamp for a compaction operation on the metadata table.
+   */
+  private String createLogCompactionTimestamp(String timestamp) {
+    return timestamp + getLogCompactionOperationSuffix();
+  }
+
+  private String createRollbackTimestamp(String timestamp) {
+    return timestamp + getRollbackOperationSuffix();
+  }
+
+  private String createCleanTimestamp(String timestamp) {
+    return timestamp + getCleanOperationSuffix();
+  }
+
+  private String createRestoreTimestamp(String timestamp) {
+    return timestamp + getRestoreOperationSuffix();
+  }
+
+  private String getCompactionOperationSuffix() {
+    return "001";
+  }
+
+  private String getLogCompactionOperationSuffix() {
+    return "005";
+  }
+
+  private String getRollbackOperationSuffix() {
+    return "006";
+  }
+
+  private String getCleanOperationSuffix() {
+    return "002";
+  }
+
+  private String getRestoreOperationSuffix() {
+    return "003";
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 025a0edf9fa..0146a2baf1b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Set;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH;
+
 /**
  * Wrapper for metrics-related operations.
  */
@@ -454,7 +456,12 @@ public class HoodieMetrics {
     HoodieTimeline filteredInstants = 
activeTimeline.filterCompletedInstants().filter(instant -> 
validActions.contains(instant.getAction()));
     Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
     if (hoodieInstantOption.isPresent()) {
-      updateMetric(action, metricName, 
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+      String requestedTime = hoodieInstantOption.get().requestedTime();
+      if (hoodieInstantOption.get().requestedTime().length() > 
MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) {
+        // If requested instant is in MDT with table version six, it can 
contain suffix
+        requestedTime = requestedTime.substring(0, requestedTime.length() - 3);
+      }
+      updateMetric(action, metricName, Long.parseLong(requestedTime));
     }
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index ad7aca9ec6a..a88a8fa1073 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -133,7 +133,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
     }
 
-    compactIfNecessary(writeClient);
+    compactIfNecessary(writeClient, Option.empty());
 
     if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
       // if this is a new commit being applied to metadata for the first time
@@ -170,7 +170,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
     // reload timeline
     metadataMetaClient.reloadActiveTimeline();
-    cleanIfNecessary(writeClient);
+    cleanIfNecessary(writeClient, "");
     writeClient.archive();
 
     // Update total size of the metadata and count of base/log files
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 58b2d98ffd7..2331596b0eb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -42,7 +42,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -300,8 +300,8 @@ public class SparkRDDWriteClient<T> extends
       metrics.emitMetadataEnablementMetrics(true, isMetadataColStatsAvailable, 
isMetadataBloomFilterAvailable, isMetadataRliAvailable);
     }
 
-    try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(
-        context.getStorageConf(), config, context, inFlightInstantTimestamp)) {
+    try (HoodieTableMetadataWriter writer = SparkMetadataWriterFactory.create(
+        context.getStorageConf(), config, context, inFlightInstantTimestamp, 
tableConfig)) {
       if (writer.isInitialized()) {
         writer.performTableServices(inFlightInstantTimestamp);
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
new file mode 100644
index 00000000000..81df778d2dd
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieSparkIndexClient;
+import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
+
+public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends 
HoodieBackedTableMetadataWriterTableVersionSix<JavaRDD<HoodieRecord>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class);
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 HoodieEngineContext context,
+                                                 Option<String> 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedTableMetadataWriterTableVersionSix(
+        conf, writeConfig, EAGER, context, inflightInstantTimestamp);
+  }
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 Option<String> 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedTableMetadataWriterTableVersionSix(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp);
+  }
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig,
+                                                 HoodieEngineContext context) {
+    return create(conf, writeConfig, context, Option.empty());
+  }
+
+  SparkHoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?> 
hadoopConf,
+                                       HoodieWriteConfig writeConfig,
+                                       HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
+                                       HoodieEngineContext engineContext,
+                                       Option<String> 
inflightInstantTimestamp) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
+  }
+
+  @Override
+  protected void initRegistry() {
+    if (metadataWriteConfig.isMetricsOn()) {
+      Registry registry;
+      if (metadataWriteConfig.isExecutorMetricsEnabled() && 
metadataWriteConfig.getMetricsReporterType() != MetricsReporterType.INMEMORY) {
+        registry = Registry.getRegistry("HoodieMetadata", 
DistributedRegistry.class.getName());
+        HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext) engineContext;
+        ((DistributedRegistry) 
registry).register(sparkEngineContext.getJavaSparkContext());
+      } else {
+        registry = Registry.getRegistry("HoodieMetadata");
+      }
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), 
dataMetaClient.getStorage()));
+    } else {
+      this.metrics = Option.empty();
+    }
+  }
+
+  @Override
+  protected void commit(String instantTime, Map<String, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
+  }
+
+  @Override
+  protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
+    return HoodieJavaRDD.getJavaRDD(records);
+  }
+
+  @Override
+  protected void bulkCommit(
+      String instantTime, String partitionName, HoodieData<HoodieRecord> 
records,
+      int fileGroupCount) {
+    SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
+    commitInternal(instantTime, Collections.singletonMap(partitionName, 
records), true, Option.of(partitioner));
+  }
+
+  @Override
+  public void deletePartitions(String instantTime, List<MetadataPartitionType> 
partitions) {
+    List<String> partitionsToDrop = 
partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
+    LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
+
+    SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
+    String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
+    writeClient.startCommitWithTime(instantTime, actionType);
+    writeClient.deletePartitions(partitionsToDrop, instantTime);
+  }
+
+  @Override
+  protected HoodieTable getTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+  }
+
+  @Override
+  public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> 
initializeWriteClient() {
+    return new SparkRDDWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
+  }
+
+  @Override
+  protected EngineType getEngineType() {
+    return EngineType.SPARK;
+  }
+
+  @Override
+  protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) 
{
+    new HoodieSparkIndexClient(dataWriteConfig, 
engineContext).createOrUpdateColumnStatsIndexDefinition(dataMetaClient, 
columnsToIndex);
+  }
+
+  @Override
+  protected HoodieData<HoodieRecord> 
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+                                                               
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
StorageConfiguration<?> storageConf, String instantTime) {
+    throw new HoodieNotSupportedException("Expression index not supported for 
Java metadata table writer yet.");
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
new file mode 100644
index 00000000000..3c6c9c4c1c3
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StorageConfiguration;
+
+/**
+ * Factory class for generating SparkHoodieBackedTableMetadataWriter based on 
the table version.
+ */
+public class SparkMetadataWriterFactory {
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context,
+                                                 Option<String> 
inflightInstantTimestamp, HoodieTableConfig tableConfig) {
+    if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
+      return SparkHoodieBackedTableMetadataWriterTableVersionSix.create(conf, 
writeConfig, context, inflightInstantTimestamp);
+    } else {
+      return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig, 
context, inflightInstantTimestamp);
+    }
+  }
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context, 
Option<String> inflightInstantTimestamp, HoodieTableConfig tableConfig) {
+    if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
+      return new SparkHoodieBackedTableMetadataWriterTableVersionSix(conf, 
writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp);
+    } else {
+      return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, 
failedWritesCleaningPolicy, context, inflightInstantTimestamp);
+    }
+  }
+
+  public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieTableConfig 
tableConfig) {
+    return create(conf, writeConfig, context, Option.empty(), tableConfig);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 0ff6d8ab551..ac6428350dc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -36,7 +36,7 @@ import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
 import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 
 import org.apache.hadoop.conf.Configuration;
@@ -105,9 +105,9 @@ public abstract class HoodieSparkTable<T>
       // Create the metadata table writer. First time after the upgrade this 
creation might trigger
       // metadata table bootstrapping. Bootstrapping process could fail and 
checking the table
       // existence after the creation is needed.
-      HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(
+      HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(
           getContext().getStorageConf(), config, failedWritesCleaningPolicy, 
getContext(),
-          Option.of(triggeringInstantTimestamp));
+          Option.of(triggeringInstantTimestamp), metaClient.getTableConfig());
       try {
         if (isMetadataTableExists || metaClient.getStorage().exists(
             
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 0fabbe27e15..67d633f8fa8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -27,11 +27,13 @@ import 
org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -59,8 +61,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,15 +102,22 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieBackedTableMetadata.class);
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testTableOperations(boolean reuseReaders) throws Exception {
+  @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+  public void testTableOperations(boolean reuseReaders, int tableVersion) 
throws Exception {
     HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
-    init(tableType);
+    initPath();
+    HoodieWriteConfig config = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, 
false, true, false)
+        .build();
+    config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
+    init(tableType, config);
     doWriteInsertAndUpsert(testTable);
 
     // trigger an upsert
     doWriteOperation(testTable, "0000003");
     verifyBaseMetadataTable(reuseReaders);
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   /**
@@ -119,11 +127,15 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
    * @throws Exception
    */
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) 
throws Exception {
+  @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+  public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse, int 
tableVersion) throws Exception {
     final int taskNumber = 3;
     HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
-    init(tableType);
+    initPath();
+    HoodieWriteConfig config = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, 
false, true, false)
+        .build();
+    config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
+    init(tableType, config);
     testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
     HoodieBackedTableMetadata tableMetadata = new 
HoodieBackedTableMetadata(context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), reuse);
     assertTrue(tableMetadata.enabled());
@@ -161,6 +173,11 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     executors.awaitTermination(5, TimeUnit.MINUTES);
     assertFalse(flag.get());
     assertEquals(filesNumber.get(), taskNumber);
+
+    // validate table version
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws 
Exception {
@@ -207,29 +224,47 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
    * the right key generator class name.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testMetadataTableKeyGenerator(final HoodieTableType tableType) 
throws Exception {
-    init(tableType);
+  @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", 
"MERGE_ON_READ,8"})
+  public void testMetadataTableKeyGenerator(final HoodieTableType tableType, 
int tableVersion) throws Exception {
+    initPath();
+    HoodieWriteConfig config = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, 
false, true, false)
+        .build();
+    config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
+    init(tableType, config);
 
     HoodieBackedTableMetadata tableMetadata = new 
HoodieBackedTableMetadata(context,
         storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), 
false);
 
     assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(),
         
tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName());
+
+    // validate table version
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   /**
    * [HUDI-2852] Table metadata returns empty for non-exist partition.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testNotExistPartition(final HoodieTableType tableType) throws 
Exception {
-    init(tableType);
+  @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", 
"MERGE_ON_READ,8"})
+  public void testNotExistPartition(final HoodieTableType tableType, int 
tableVersion) throws Exception {
+    initPath();
+    HoodieWriteConfig config = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, 
false, true, false)
+        .build();
+    config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
+    init(tableType, config);
     HoodieBackedTableMetadata tableMetadata = new 
HoodieBackedTableMetadata(context,
         storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), 
false);
     List<StoragePathInfo> allFilesInPartition = 
tableMetadata.getAllFilesInPartition(
         new StoragePath(writeConfig.getBasePath() + "dummy"));
     assertEquals(allFilesInPartition.size(), 0);
+
+    // validate table version
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   /**
@@ -240,8 +275,8 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
    * 3. Verify table services like compaction benefit from record key 
deduplication feature.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType 
tableType) throws Exception {
+  @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", 
"MERGE_ON_READ,8"})
+  public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType 
tableType, int tableVersion) throws Exception {
     initPath();
     writeConfig = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
@@ -249,6 +284,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
             .withMaxNumDeltaCommitsBeforeCompaction(3)
             .build())
         .build();
+    writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
     init(tableType, writeConfig);
 
     // 2nd commit
@@ -296,6 +332,11 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     }, "Metadata table should have a valid base file!");
 
     validateMetadata(testTable);
+
+    // validate table version
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   /**
@@ -305,8 +346,8 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
    * plan has not been successfully executed before the new one is scheduled.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testRepeatedCleanActionsWithMetadataTableEnabled(final 
HoodieTableType tableType) throws Exception {
+  @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", 
"MERGE_ON_READ,8"})
+  public void testRepeatedCleanActionsWithMetadataTableEnabled(final 
HoodieTableType tableType, int tableVersion) throws Exception {
     initPath();
     writeConfig = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
@@ -314,6 +355,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
             .withMaxNumDeltaCommitsBeforeCompaction(4)
             .build())
         .build();
+    writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
     init(tableType, writeConfig);
     String partition = "p1";
     // Simulate two bulk insert operations adding two data files in partition 
"p1"
@@ -362,6 +404,11 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     assertEquals(1, getNumCompactions(metadataMetaClient));
     Set<String> fileSetAfterSecondCleaning = 
getFilePathsInPartition(partition);
     validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, 
fileSetAfterSecondCleaning);
+
+    // validate table version
+    HoodieTableVersion finalTableVersion = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+        .getTableConfig().getTableVersion();
+    assertEquals(tableVersion, finalTableVersion.versionCode());
   }
 
   private int getNumCompactions(HoodieTableMetaClient metaClient) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 1c7a929ee91..c15950ae22d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
 import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -103,7 +104,7 @@ public class TestHoodieMetadataBase extends 
HoodieSparkClientTestHarness {
     initHoodieStorage();
     storage.createDirectory(new StoragePath(basePath));
     initTimelineService();
-    initMetaClient(tableType);
+    initMetaClient(tableType, writeConfig.map(conf -> 
conf.getProps()).orElse(new TypedProperties()));
     initTestDataGenerator();
     metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
     this.writeConfig = writeConfig.isPresent()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index ead5f77518f..cc8aad17c61 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1118,6 +1118,22 @@ public class HoodieTableMetadataUtil {
     }
   }
 
+  /**
+   * Convert rollback action metadata to metadata table records.
+   * <p>
+   * We only need to handle FILES partition here as HUDI rollbacks on MOR 
table may end up adding a new log file. All other partitions
+   * are handled by actual rollback of the deltacommit which added records to 
those partitions.
+   */
+  public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(
+      HoodieEngineContext engineContext, HoodieTableMetaClient 
dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
+
+    List<HoodieRecord> filesPartitionRecords = 
HoodieTableMetadataUtil.convertMetadataToRollbackRecords(rollbackMetadata, 
instantTime, dataTableMetaClient);
+    final HoodieData<HoodieRecord> rollbackRecordsRDD = 
filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData()
+        : engineContext.parallelize(filesPartitionRecords, 
filesPartitionRecords.size());
+
+    return 
Collections.singletonMap(MetadataPartitionType.FILES.getPartitionPath(), 
rollbackRecordsRDD);
+  }
+
   /**
    * Convert rollback action metadata to files partition records.
    * Consider only new log files added.
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 580362350d9..e55e8278708 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -233,6 +233,10 @@ public class HoodieTestUtils {
       
builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()));
     }
 
+    if (properties.containsKey("hoodie.write.table.version")) {
+      
builder.setTableVersion(Integer.parseInt(properties.getProperty("hoodie.write.table.version")));
+    }
+
     String keyGen = 
properties.getProperty("hoodie.datasource.write.keygenerator.class");
     if (!Objects.equals(keyGen, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
         && 
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
index 4b81abe0d70..9f3b421d68e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.util.HoodieTimer
-import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkMetadataWriterFactory}
 import org.apache.hudi.storage.StoragePath
 
 import org.apache.spark.sql.Row
@@ -63,7 +63,7 @@ class CreateMetadataTableProcedure extends BaseProcedure with 
ProcedureBuilder w
     }
     val timer = HoodieTimer.start
     val writeConfig = getWriteConfig(basePath)
-    SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, 
writeConfig, new HoodieSparkEngineContext(jsc))
+    SparkMetadataWriterFactory.create(metaClient.getStorageConf, writeConfig, 
new HoodieSparkEngineContext(jsc), metaClient.getTableConfig)
     Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + 
timer.endTimer / 1000.0 + "secs)"))
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
index 4864a70a9ad..2687f50471d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.util.HoodieTimer
-import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkMetadataWriterFactory}
 import org.apache.hudi.storage.StoragePath
 
 import org.apache.spark.internal.Logging
@@ -64,7 +64,7 @@ class InitMetadataTableProcedure extends BaseProcedure with 
ProcedureBuilder wit
     val timer = HoodieTimer.start
     if (!readOnly) {
       val writeConfig = getWriteConfig(basePath)
-      SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, 
writeConfig, new HoodieSparkEngineContext(jsc))
+      SparkMetadataWriterFactory.create(metaClient.getStorageConf, 
writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig)
     }
 
     val action = if (readOnly) "Opened" else "Initialized"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 44d08f6fb4d..191ad1abf91 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -94,12 +94,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestHoodieSparkMergeOnReadTableRollback extends 
SparkClientFunctionalTestHarness {
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws 
Exception {
+  @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+  void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers, int 
tableVersion) throws Exception {
     // Set TableType to COW
-    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+    Properties properties = new Properties();
+    properties.put(HoodieTableConfig.VERSION.key(), 
String.valueOf(tableVersion));
+    properties.put(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), 
String.valueOf(tableVersion));
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
 
     HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
+    cfg.setValue(HoodieTableConfig.VERSION, String.valueOf(tableVersion));
+    cfg.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(tableVersion));
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
 
       HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index e1c4775f35a..17449e8d098 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -125,7 +125,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
       .save(basePath)
     dfList = dfList :+ inputDF
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
     if (params.shouldValidateColStats) {
       // Currently, routine manually validating the column stats (by actually 
reading every column of every file)
@@ -292,7 +292,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
       .build()
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     val schemaUtil = new TableSchemaResolver(metaClient)
     val tableSchema = schemaUtil.getTableAvroSchema(false)
     val localSourceTableSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
@@ -442,27 +442,32 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
 
 object ColumnStatIndexTestBase {
 
-  case class ColumnStatsTestCase(tableType: HoodieTableType, 
shouldReadInMemory: Boolean)
+  case class ColumnStatsTestCase(tableType: HoodieTableType, 
shouldReadInMemory: Boolean, tableVersion: Int)
 
   def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = 
{
     
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType 
=>
-      Seq(Arguments.arguments(ColumnStatsTestCase(tableType, 
shouldReadInMemory = true)),
-        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= false))
+      Seq(Arguments.arguments(ColumnStatsTestCase(tableType, 
shouldReadInMemory = true, tableVersion = 6)),
+        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= false, tableVersion = 6)),
+        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= true, tableVersion = 8)),
+        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= false, tableVersion = 8))
       )
     ): _*)
   }
 
   def testMetadataColumnStatsIndexParamsInMemory: 
java.util.stream.Stream[Arguments] = {
     
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType 
=>
-      Seq(Arguments.arguments(ColumnStatsTestCase(tableType, 
shouldReadInMemory = true))
+      Seq(Arguments.arguments(ColumnStatsTestCase(tableType, 
shouldReadInMemory = true, tableVersion = 6)),
+        Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory 
= true, tableVersion = 8))
       )
     ): _*)
   }
 
   def testMetadataColumnStatsIndexParamsForMOR: 
java.util.stream.Stream[Arguments] = {
     java.util.stream.Stream.of(
-      
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = true)),
-        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = false))
+      
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = true, tableVersion = 6)),
+        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = false, tableVersion = 6)),
+        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = true, tableVersion = 8)),
+        Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, 
shouldReadInMemory = false, tableVersion = 8))
       )
         : _*)
   }
@@ -470,11 +475,19 @@ object ColumnStatIndexTestBase {
   def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = {
     java.util.stream.Stream.of(
       Seq(
-        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"),
+        // Table version 6
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "6"),
         // empty partition col represents non-partitioned table.
-        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""),
-        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"),
-        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "")
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "6"),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "6"),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "6"),
+
+        // Table version 8
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "8"),
+        // empty partition col represents non-partitioned table.
+        Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "8"),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "8"),
+        Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "8")
       )
         : _*)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 35fdc8b041e..872bca957a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -37,7 +37,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     HoodieMetadataConfig.ENABLE.key -> "true",
     HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
   )
-  val commonOpts: Map[String, String] = Map(
+  def commonOpts: Map[String, String] = Map(
     PARTITIONPATH_FIELD.key -> "partition",
     HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
     HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15"
@@ -94,7 +94,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     if (!metaClientReloaded) {
       // initialization of meta client is required again after writing data so 
that
       // latest table configs are picked up
-      metaClient = HoodieTableMetaClient.reload(metaClient)
+      metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
       metaClientReloaded = true
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 086443e0685..44ae5d05447 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
+import 
org.apache.hudi.common.table.timeline.versioning.v1.InstantFileNameGeneratorV1
 import org.apache.hudi.common.table.view.FileSystemViewManager
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
@@ -50,7 +51,7 @@ import org.apache.spark.sql.types._
 import org.junit.jupiter.api._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{EnumSource, MethodSource, 
ValueSource}
+import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
 
 import java.util.Collections
 import java.util.stream.Collectors
@@ -80,7 +81,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      "hoodie.compact.inline.max.delta.commits" -> "10"
+      "hoodie.compact.inline.max.delta.commits" -> "10",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, 
metadataOpts, commonOpts,
@@ -171,7 +173,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       shouldValidateColStats = false,
       shouldValidateManually = false))
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     validateNonExistantColumnsToIndexDefn(metaClient)
   }
 
@@ -194,7 +196,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     var expectedColStatsSourcePath = if (testCase.tableType == 
HoodieTableType.COPY_ON_WRITE) {
@@ -239,8 +242,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
 
   @ParameterizedTest
   @MethodSource(Array("testTableTypePartitionTypeParams"))
-  def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: 
HoodieTableType, partitionCol : String): Unit = {
-    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+  def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: 
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, 
tableVersion)
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false"
@@ -255,7 +258,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       PARTITIONPATH_FIELD.key -> partitionCol,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5"
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     // inserts
@@ -305,7 +309,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       "index/colstats/mor-bootstrap1-column-stats-index-table.json"
     }
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     val latestCompletedCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime
 
     // lets validate that we have log files generated in case of MOR table
@@ -355,8 +359,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
 
   @ParameterizedTest
   @MethodSource(Array("testTableTypePartitionTypeParams"))
-  def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: 
HoodieTableType, partitionCol : String): Unit = {
-    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+  def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: 
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, 
tableVersion)
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false"
@@ -371,7 +375,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       PARTITIONPATH_FIELD.key() -> partitionCol,
       "hoodie.write.markers.type" -> "DIRECT",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     // inserts
@@ -412,7 +417,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
     }
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     val latestCompletedCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime
 
     // updates a subset which are not deleted and enable col stats and 
validate bootstrap
@@ -426,7 +431,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
 
     validateColumnsToIndex(metaClient, DEFAULT_COLUMNS_TO_INDEX)
@@ -434,7 +439,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
 
   def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol: 
String) : Unit = {
     // simulate failure for latest commit.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     var baseFileName : String = null
     var logFileName : String = null
     val lastCompletedCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
@@ -452,12 +457,18 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
       } else {
         metaClient.getStorage.listFiles(new 
StoragePath(metaClient.getBasePath,  "9"))
       }
-      val baseFileFileStatus = dataFiles.stream().filter(fileStatus => 
fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime)).findFirst().get()
+      val baseFileFileStatus = dataFiles.stream().filter(fileStatus => 
fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime)
+        && fileStatus.getPath.getName.contains("parquet")).findFirst().get()
       baseFileName = baseFileFileStatus.getPath.getName
     }
 
-    val latestCompletedFileName = 
INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit)
-    metaClient.getStorage.deleteFile(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" + 
latestCompletedFileName))
+    if 
(metaClient.getTableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT))
 {
+      val latestCompletedFileName = new 
InstantFileNameGeneratorV1().getFileName(lastCompletedCommit)
+      metaClient.getStorage.deleteFile(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" + 
latestCompletedFileName))
+    } else {
+      val latestCompletedFileName = 
INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit)
+      metaClient.getStorage.deleteFile(new 
StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" + 
latestCompletedFileName))
+    }
 
     // re-create marker for the deleted file.
     if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -475,11 +486,12 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
     }
   }
 
-  @Test
-  def testMORDeleteBlocks(): Unit = {
+  @ParameterizedTest
+  @CsvSource(value = Array("6", "8"))
+  def testMORDeleteBlocks(tableVersion: Int): Unit = {
     val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
     val partitionCol = "c8"
-    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, 
tableVersion)
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
@@ -494,7 +506,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       PARTITIONPATH_FIELD.key() -> partitionCol,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     // inserts
@@ -533,10 +546,10 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("", "c8"))
-  def testColStatsWithCleanCOW(partitionCol: String): Unit = {
+  @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8"))
+  def testColStatsWithCleanCOW(partitionCol: String, tableVersion: Int): Unit 
= {
     val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE
-    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, 
tableVersion)
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true"
     )
@@ -550,7 +563,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       PARTITIONPATH_FIELD.key() -> partitionCol,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1"
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     // inserts
@@ -598,10 +612,10 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("", "c8"))
-  def testColStatsWithCleanMOR(partitionCol: String): Unit = {
+  @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8"))
+  def testColStatsWithCleanMOR(partitionCol: String, tableVersion: Int): Unit 
= {
     val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
-    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, 
tableVersion)
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true"
     )
@@ -616,7 +630,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PARTITIONPATH_FIELD.key() -> partitionCol,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
       HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
-      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
     ) ++ metadataOpts
 
     // inserts
@@ -662,13 +677,13 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
       parquetMaxFileSize = 100 * 1024 * 1024,
       smallFileLimit = 0))
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() 
> 0)
   }
 
   @ParameterizedTest
-  @EnumSource(classOf[HoodieTableType])
-  def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit 
= {
+  @CsvSource(value = Array("COPY_ON_WRITE,6", "MERGE_ON_READ,6", 
"COPY_ON_WRITE,8", "MERGE_ON_READ,8"))
+  def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType, 
tableVersion: Int): Unit = {
     val metadataOpts = Map(
       HoodieMetadataConfig.ENABLE.key -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
@@ -681,7 +696,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
     ) ++ metadataOpts
 
     val schema = StructType(StructField("c1", IntegerType, false) :: 
StructField("c2", StringType, true) :: Nil)
@@ -698,7 +714,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
@@ -715,8 +731,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean): 
Unit = {
+  @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+  def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean, 
tableVersion: Int): Unit = {
     val targetColumnsToIndex = Seq("c1", "c2", "c3")
 
     val metadataOpts = Map(
@@ -733,7 +749,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       PARTITIONPATH_FIELD.key() -> "c8",
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
+      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
     ) ++ metadataOpts
 
     val sourceJSONTablePath = 
getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString
@@ -752,7 +769,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
@@ -811,8 +828,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: 
Boolean): Unit = {
+  @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+  def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: 
Boolean, tableVersion: Int): Unit = {
     var targetColumnsToIndex = Seq("c1", "c2", "c3")
 
     val metadataOpts = Map(
@@ -828,7 +845,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
+      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
     ) ++ metadataOpts
 
     val sourceJSONTablePath = 
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
@@ -847,7 +865,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
@@ -896,7 +914,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
         .mode(SaveMode.Append)
         .save(basePath)
 
-      metaClient = HoodieTableMetaClient.reload(metaClient)
+      metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
       val requestedColumns = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS)
         .getSourceFields.toSeq.filterNot(colName => 
colName.startsWith("_hoodie")).sorted.toSeq
@@ -925,8 +943,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def 
testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: 
Boolean): Unit = {
+  @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+  def 
testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: 
Boolean, tableVersion: Int): Unit = {
     val targetColumnsToIndex = Seq("c1", "c2", "c3")
 
     val metadataOpts = Map(
@@ -941,7 +959,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
     ) ++ metadataOpts
 
     val sourceJSONTablePath = 
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
@@ -960,7 +979,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
 
     val metadataConfig = HoodieMetadataConfig.newBuilder()
       .fromProperties(toProperties(metadataOpts))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e91c75f6e32..5411dc8c6c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -123,6 +123,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
       .save(basePath)
     val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
     deletedDf3.cache()
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     validateDataAndRecordIndices(hudiOpts, deletedDf3)
   }
 
@@ -419,14 +420,10 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = Array(
-    "COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"
-  ))
-  def testRLIWithDTClustering(tableType: String, tableVersion: Int): Unit = {
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIWithDTClustering(tableType: HoodieTableType): Unit = {
     val hudiOpts = commonOpts ++ Map(
-      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
-      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString,
-      HoodieTableConfig.VERSION.key() -> tableVersion.toString,
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
       HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
     )
@@ -434,7 +431,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
     for ((k, v) <- hudiOpts) {
       props.put(k, v)
     }
-    initMetaClient(HoodieTableType.valueOf(tableType), props)
+    initMetaClient(tableType, props)
 
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
@@ -455,6 +452,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
 
     
assertTrue(getLatestClusteringInstant().get().requestedTime.compareTo(lastClusteringInstant.get().requestedTime)
 > 0)
     assertEquals(getLatestClusteringInstant(), 
metaClient.getActiveTimeline.lastInstant())
+    validateDataAndRecordIndices(hudiOpts)
     // We are validating rollback of a DT clustering instant here
     rollbackLastInstant(hudiOpts)
     validateDataAndRecordIndices(hudiOpts)
@@ -545,6 +543,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
     val metadataTableFSView = getHoodieTable(metaClient, 
getWriteConfig(hudiOpts)).getMetadataTable
       .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
     val compactionTimeline = 
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
new file mode 100644
index 00000000000..71e31057b7c
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+
+class TestRecordLevelIndexTableVersionSix extends TestRecordLevelIndex {
+  override def commonOpts: Map[String, String] = super.commonOpts ++ Map(
+    HoodieTableConfig.VERSION.key() -> "6",
+    HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6"
+  )
+}

Reply via email to