nsivabalan commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2540499042


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+    // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);
+    Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+    try {
+      if (!filesPartitionAvailable) {
+        initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES, 
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to bootstrap table " + sourceBasePath, 
e);
+    }
+
+    // Perform the commit using bulkCommit
+    HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+    // perform tables services on metadata table
+    performTableServices(Option.of(inflightInstantTimestamp));
+    if (!filesPartitionAvailable) {
+      bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES, 
records, fileGroupCountAndRecordsPair.getKey());
+      dataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
MetadataPartitionType.FILES, true);
+    } else {
+      commit(inflightInstantTimestamp, 
Collections.singletonMap(MetadataPartitionType.FILES, 
fileGroupCountAndRecordsPair.getValue()));
+    }
+  }
+
+  protected Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo> 
partitionInfoList) {

Review Comment:
   fix the naming. 



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -253,7 +253,7 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
     HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+        new HoodieTableFileSystemView(metaClient, activeTimeline, 
Option.of(metadataConfig.getNumPartitionLevels()), allFiles);

Review Comment:
   this may not work. 
   we have to do it differently



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);

Review Comment:
   don't we need to initialize `initMetadataReader()` if FILES partition is 
already bootstrapped? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+    // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);
+    Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+    try {
+      if (!filesPartitionAvailable) {
+        initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES, 
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to bootstrap table " + sourceBasePath, 
e);
+    }
+
+    // Perform the commit using bulkCommit

Review Comment:
   fix java docs. we may not do bulk commit for every bootstrap flow.
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {

Review Comment:
   java docs please



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+    // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);

Review Comment:
   lets clean up temp changes. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
   }
 
   private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient, 
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
-    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath);
+    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
+    TransactionManager txnManager = new TransactionManager(writeConfig, 
FSUtils.getFs(writeConfig.getBasePath(), 
hoodieSparkEngineContext.getHadoopConf().get()));
+
+    HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+      if (cfg.boostrap) {
+        runBootstrapSync(sparkTable, sourceTableMetaClient, 
targetTableMetaClient, writeClient, schema, txnManager);

Review Comment:
   we are missing to lock for bootstrap. 
   can we begin txn at the beginning (before L 257). 
   and finally end the trnx w/n finally block or in the end



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
   }
 
   private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient, 
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
-    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath);
+    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
+    TransactionManager txnManager = new TransactionManager(writeConfig, 
FSUtils.getFs(writeConfig.getBasePath(), 
hoodieSparkEngineContext.getHadoopConf().get()));
+
+    HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+      if (cfg.boostrap) {
+        runBootstrapSync(sparkTable, sourceTableMetaClient, 
targetTableMetaClient, writeClient, schema, txnManager);
+      } else {
+        List<HoodieInstant> instantsToSync = 
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient, 
sourceTableMetaClient);
+        for (HoodieInstant instant : instantsToSync) {
+          String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .transitionRequestedToInflight(
+                  instant.getAction(),
+                  commitTime);
+
+          Option<byte[]> commitMetadataInBytes = Option.empty();
+          List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+          SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+              targetTableMetaClient, cfg.sourceBasePath, 
instant.getTimestamp(), pendingInstants);
+
+          try {
+            txnManager.beginTransaction(Option.of(instant), Option.empty());
+            HoodieTableMetadataWriter hoodieTableMetadataWriter =
+                (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+            // perform table services if required on metadata table
+            
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+            switch (instant.getAction()) {
+              case HoodieTimeline.COMMIT_ACTION:
+                HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+                HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+                HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+                HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.CLEAN_ACTION:
+                HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+                HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+                    writeConfig, hoodieSparkEngineContext, 
targetTableMetaClient);
+                //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+                commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+                break;
+            }
+
+          } finally {
+            txnManager.endTransaction(Option.of(instant));
+          }
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
+        }
+        if (cfg.performTableMaintenance) {
+          runArchiver(sparkTable, writeClient.getConfig(), 
hoodieSparkEngineContext);
+        }
+      }
+    }
+  }
 
-    if (cfg.boostrap) {
-      HoodieBootstrapMetadataSync bootstrapMetadataSync = new 
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath, 
cfg.targetBasePath, schema);
-      bootstrapMetadataSync.run();
+  private void runBootstrapSync(HoodieSparkTable sparkTable, 
HoodieTableMetaClient sourceTableMetaClient,
+                                HoodieTableMetaClient targetTableMetaClient, 
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager) 
throws Exception {
+    Option<HoodieInstant> sourceLastInstant = 
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+    if (!sourceLastInstant.isPresent()) {
       return;
     }
 
-    List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath, 
targetTableMetaClient, sourceTableMetaClient);
-    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
-    for (HoodieInstant instant : instantsToSync) {
-      try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
-        String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .transitionRequestedToInflight(
-                instant.getAction(),
-                commitTime);
-
-        Option<byte[]> commitMetadataInBytes = Option.empty();
-        List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
-        SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
-            targetTableMetaClient, 
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(), 
pendingInstants);
-
-
-        HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
-        if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
-          HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
-          HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
+    String commitTime = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+    targetTableMetaClient
+        .reloadActiveTimeline()
+        .transitionRequestedToInflight(
+            HoodieTimeline.REPLACE_COMMIT_ACTION,
+            commitTime);
 
-          // add metadata sync checkpoint info
-          tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+    HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+    try {
+      txnManager.beginTransaction(Option.of(instant), Option.empty());
+      SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+          (SparkHoodieBackedMetadataSyncMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+      
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+    } finally {
+      txnManager.endTransaction(Option.of(instant));
+    }
+      Option<HoodieInstant> targetTableLastInstant = 
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+          cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(), 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), 
sourceLastInstant.get()));
+      HoodieReplaceCommitMetadata replaceCommitMetadata = 
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+      replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+      targetTableMetaClient
+          .reloadActiveTimeline()
+          .saveAsComplete(new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+              
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
 
-          HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
-          HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+  private HoodieReplaceCommitMetadata 
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient 
sourceTableMetaClient, Schema schema) {
+    List<HoodieInstant> replaceCommits =  
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+    HoodieReplaceCommitMetadata replaceCommitMetadata = new 
HoodieReplaceCommitMetadata();
+    Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+    replaceCommits.forEach(replaceCommit -> {
+      try {
+        HoodieReplaceCommitMetadata commitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
 HoodieReplaceCommitMetadata.class);
+        Map<String, List<String>> partitionsToReplacedFileGroups = 
commitMetadata.getPartitionToReplaceFileIds();
+        for (Map.Entry<String, List<String>> entry : 
partitionsToReplacedFileGroups.entrySet()) {
+          String partition = entry.getKey();
+          List<String> replacedFiles = entry.getValue();
+          totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new 
ArrayList<>());
+          totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Failed to deserialize instant " + 
replaceCommit.getTimestamp(), e);
+      }
+    });
 
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
-          // add metadata sync checkpoint info
-          
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
-          HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
-          HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
-              writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
-          //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
-          }
+    
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
 
-          commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
-        }
+    replaceCommitMetadata.addMetadata("schema", schema.toString());
+    replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+    replaceCommitMetadata.setCompacted(false);
+    return replaceCommitMetadata;
+  }
 
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
-      }
+  private void runArchiver(

Review Comment:
   java docs



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);

Review Comment:
   if metadata table is enabled in the source table, shouldn't we be calling 
listAllPartitionsFromMDT(...)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+    // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);
+    Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+    try {
+      if (!filesPartitionAvailable) {
+        initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES, 
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to bootstrap table " + sourceBasePath, 
e);
+    }
+
+    // Perform the commit using bulkCommit
+    HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+    // perform tables services on metadata table
+    performTableServices(Option.of(inflightInstantTimestamp));
+    if (!filesPartitionAvailable) {
+      bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES, 
records, fileGroupCountAndRecordsPair.getKey());
+      dataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
MetadataPartitionType.FILES, true);
+    } else {
+      commit(inflightInstantTimestamp, 
Collections.singletonMap(MetadataPartitionType.FILES, 
fileGroupCountAndRecordsPair.getValue()));
+    }
+  }
+
+  protected Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo> 
partitionInfoList) {

Review Comment:
   java docs. how is this different from the impl we have in the base class



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedMetadataSyncMetadataWriter.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SparkHoodieBackedMetadataSyncMetadataWriter extends 
SparkHoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkHoodieBackedMetadataSyncMetadataWriter.class);
+  private final String sourceBasePath;
+  private String inflightInstantTimestamp;
+
+  public SparkHoodieBackedMetadataSyncMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
+                                              HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy, HoodieEngineContext engineContext,
+                                              String inflightInstantTimestamp, 
String sourceBasePath) {
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
Option.of(inflightInstantTimestamp));
+    this.sourceBasePath = sourceBasePath;
+    this.inflightInstantTimestamp = inflightInstantTimestamp;
+  }
+
+  public static HoodieTableMetadataWriter create(Configuration conf,
+                                                 HoodieWriteConfig writeConfig,
+                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                 HoodieEngineContext context,
+                                                 String 
inflightInstantTimestamp) {
+    return new SparkHoodieBackedMetadataSyncMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, 
inflightInstantTimestamp, 
writeConfig.getMetadataConfig().getBasePathOverride());
+  }
+
+  protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                       Option<String> 
inflightInstantTimestamp) throws IOException {
+    // Do not initialize the metadata table during metadata sync
+    metadataMetaClient = initializeMetaClient();
+    this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getBasePath(), true);
+    return true;
+  }
+
+  public void bootstrap(Option<String> boostrapUntilInstantOpt) throws 
IOException {
+    if (!boostrapUntilInstantOpt.isPresent()) {
+      return;
+    }
+
+    String lastInstantTimestamp = boostrapUntilInstantOpt.get();
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
+
+    // Check and then open the metadata table reader so FILES partition can be 
read during initialization of other partitions
+    // initMetadataReader();
+    // Load the metadata table metaclient if required
+    if (dataMetaClient == null) {
+      dataMetaClient = 
HoodieTableMetaClient.builder().setConf(engineContext.getHadoopConf().get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+    }
+
+    // initialize metadata writer
+    List<DirectoryInfo> partitionInfoList = 
listAllPartitionsFromFilesystem(lastInstantTimestamp, sourceBasePath);
+    // Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);
+    Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = 
initializeFilesPartition2(lastInstantTimestamp, partitionInfoList);
+
+    try {
+      if (!filesPartitionAvailable) {
+        initializeFileGroups(metadataMetaClient, MetadataPartitionType.FILES, 
inflightInstantTimestamp, fileGroupCountAndRecordsPair.getKey());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to bootstrap table " + sourceBasePath, 
e);
+    }
+
+    // Perform the commit using bulkCommit
+    HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
+    // perform tables services on metadata table
+    performTableServices(Option.of(inflightInstantTimestamp));
+    if (!filesPartitionAvailable) {
+      bulkCommit(inflightInstantTimestamp, MetadataPartitionType.FILES, 
records, fileGroupCountAndRecordsPair.getKey());
+      dataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
MetadataPartitionType.FILES, true);
+    } else {
+      commit(inflightInstantTimestamp, 
Collections.singletonMap(MetadataPartitionType.FILES, 
fileGroupCountAndRecordsPair.getValue()));
+    }
+  }
+
+  protected Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition2(String lastInstantTimestamp, List<DirectoryInfo> 
partitionInfoList) {
+    // FILES partition uses a single file group
+    final int fileGroupCount = 1;
+
+    List<String> partitions = partitionInfoList.stream().map(p -> 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+        .collect(Collectors.toList());
+    final int totalDataFilesCount = 
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
+    LOG.info("Committing total {} partitions and {} files to metadata", 
partitions.size(), totalDataFilesCount);
+
+    // Record which saves the list of all partitions
+    HoodieRecord record = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
+    HoodieData<HoodieRecord> allPartitionsRecord = 
engineContext.parallelize(Collections.singletonList(record), 1);
+    if (partitionInfoList.isEmpty()) {
+      return Pair.of(fileGroupCount, allPartitionsRecord);
+    }
+
+    HoodieTableMetaClient sourceTableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(sourceBasePath).setConf(hadoopConf.get()).build();
+    FileSystemViewStorageConfig.Builder spillableConfBuilder = 
FileSystemViewStorageConfig.newBuilder();
+    
spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK);
+    
//.withBaseStoreDir(FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+    
//.withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
+    HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
+    HoodieTimeline timeline = 
sourceTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+    SpillableMapBasedFileSystemView fileSystemView = new 
SpillableMapBasedFileSystemView(sourceTableMetaClient, timeline, 
spillableConfBuilder.build(), commonConfig);

Review Comment:
   we should close the FSV in the end



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
   }
 
   private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient, 
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
-    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath);
+    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
+    TransactionManager txnManager = new TransactionManager(writeConfig, 
FSUtils.getFs(writeConfig.getBasePath(), 
hoodieSparkEngineContext.getHadoopConf().get()));
+
+    HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+      if (cfg.boostrap) {
+        runBootstrapSync(sparkTable, sourceTableMetaClient, 
targetTableMetaClient, writeClient, schema, txnManager);
+      } else {
+        List<HoodieInstant> instantsToSync = 
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient, 
sourceTableMetaClient);
+        for (HoodieInstant instant : instantsToSync) {
+          String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .transitionRequestedToInflight(
+                  instant.getAction(),
+                  commitTime);
+
+          Option<byte[]> commitMetadataInBytes = Option.empty();
+          List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+          SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+              targetTableMetaClient, cfg.sourceBasePath, 
instant.getTimestamp(), pendingInstants);
+
+          try {
+            txnManager.beginTransaction(Option.of(instant), Option.empty());
+            HoodieTableMetadataWriter hoodieTableMetadataWriter =
+                (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+            // perform table services if required on metadata table
+            
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+            switch (instant.getAction()) {
+              case HoodieTimeline.COMMIT_ACTION:
+                HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+                HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+                HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+                HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.CLEAN_ACTION:
+                HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+                HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+                    writeConfig, hoodieSparkEngineContext, 
targetTableMetaClient);
+                //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+                commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+                break;
+            }
+
+          } finally {
+            txnManager.endTransaction(Option.of(instant));
+          }
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
+        }
+        if (cfg.performTableMaintenance) {
+          runArchiver(sparkTable, writeClient.getConfig(), 
hoodieSparkEngineContext);
+        }
+      }
+    }
+  }
 
-    if (cfg.boostrap) {
-      HoodieBootstrapMetadataSync bootstrapMetadataSync = new 
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath, 
cfg.targetBasePath, schema);
-      bootstrapMetadataSync.run();
+  private void runBootstrapSync(HoodieSparkTable sparkTable, 
HoodieTableMetaClient sourceTableMetaClient,
+                                HoodieTableMetaClient targetTableMetaClient, 
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager) 
throws Exception {
+    Option<HoodieInstant> sourceLastInstant = 
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+    if (!sourceLastInstant.isPresent()) {
       return;
     }
 
-    List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath, 
targetTableMetaClient, sourceTableMetaClient);
-    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
-    for (HoodieInstant instant : instantsToSync) {
-      try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
-        String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .transitionRequestedToInflight(
-                instant.getAction(),
-                commitTime);
-
-        Option<byte[]> commitMetadataInBytes = Option.empty();
-        List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
-        SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
-            targetTableMetaClient, 
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(), 
pendingInstants);
-
-
-        HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
-        if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
-          HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
-          HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
+    String commitTime = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+    targetTableMetaClient
+        .reloadActiveTimeline()
+        .transitionRequestedToInflight(
+            HoodieTimeline.REPLACE_COMMIT_ACTION,
+            commitTime);
 
-          // add metadata sync checkpoint info
-          tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+    HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+    try {
+      txnManager.beginTransaction(Option.of(instant), Option.empty());
+      SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+          (SparkHoodieBackedMetadataSyncMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+      
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+    } finally {
+      txnManager.endTransaction(Option.of(instant));
+    }
+      Option<HoodieInstant> targetTableLastInstant = 
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+          cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(), 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), 
sourceLastInstant.get()));
+      HoodieReplaceCommitMetadata replaceCommitMetadata = 
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+      replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+      targetTableMetaClient
+          .reloadActiveTimeline()
+          .saveAsComplete(new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+              
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
 
-          HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
-          HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+  private HoodieReplaceCommitMetadata 
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient 
sourceTableMetaClient, Schema schema) {
+    List<HoodieInstant> replaceCommits =  
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+    HoodieReplaceCommitMetadata replaceCommitMetadata = new 
HoodieReplaceCommitMetadata();
+    Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+    replaceCommits.forEach(replaceCommit -> {
+      try {
+        HoodieReplaceCommitMetadata commitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
 HoodieReplaceCommitMetadata.class);
+        Map<String, List<String>> partitionsToReplacedFileGroups = 
commitMetadata.getPartitionToReplaceFileIds();
+        for (Map.Entry<String, List<String>> entry : 
partitionsToReplacedFileGroups.entrySet()) {
+          String partition = entry.getKey();
+          List<String> replacedFiles = entry.getValue();
+          totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new 
ArrayList<>());
+          totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Failed to deserialize instant " + 
replaceCommit.getTimestamp(), e);
+      }
+    });
 
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
-          // add metadata sync checkpoint info
-          
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
-          HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
-          HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
-              writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
-          //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
-          }
+    
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
 
-          commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
-        }
+    replaceCommitMetadata.addMetadata("schema", schema.toString());
+    replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+    replaceCommitMetadata.setCompacted(false);
+    return replaceCommitMetadata;
+  }
 
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
-      }
+  private void runArchiver(

Review Comment:
   don't we need a lock here? 
   what if two concurrent writer tries to archive the target table concurrently 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to