vamsikarnika commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2542596752


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
   }
 
   private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient, 
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
-    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath);
+    HoodieWriteConfig writeConfig = getWriteConfig(schema, 
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
+    TransactionManager txnManager = new TransactionManager(writeConfig, 
FSUtils.getFs(writeConfig.getBasePath(), 
hoodieSparkEngineContext.getHadoopConf().get()));
+
+    HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+      if (cfg.boostrap) {
+        runBootstrapSync(sparkTable, sourceTableMetaClient, 
targetTableMetaClient, writeClient, schema, txnManager);
+      } else {
+        List<HoodieInstant> instantsToSync = 
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient, 
sourceTableMetaClient);
+        for (HoodieInstant instant : instantsToSync) {
+          String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .transitionRequestedToInflight(
+                  instant.getAction(),
+                  commitTime);
+
+          Option<byte[]> commitMetadataInBytes = Option.empty();
+          List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+          SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+              targetTableMetaClient, cfg.sourceBasePath, 
instant.getTimestamp(), pendingInstants);
+
+          try {
+            txnManager.beginTransaction(Option.of(instant), Option.empty());
+            HoodieTableMetadataWriter hoodieTableMetadataWriter =
+                (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+            // perform table services if required on metadata table
+            
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+            switch (instant.getAction()) {
+              case HoodieTimeline.COMMIT_ACTION:
+                HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+                HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+                HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+                HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+                // add metadata sync checkpoint info
+                
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+                commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+                break;
+              case HoodieTimeline.CLEAN_ACTION:
+                HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+                    
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+                HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+                    writeConfig, hoodieSparkEngineContext, 
targetTableMetaClient);
+                //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+                hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+                commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+                break;
+            }
+
+          } finally {
+            txnManager.endTransaction(Option.of(instant));
+          }
+          targetTableMetaClient
+              .reloadActiveTimeline()
+              .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
+        }
+        if (cfg.performTableMaintenance) {
+          runArchiver(sparkTable, writeClient.getConfig(), 
hoodieSparkEngineContext);
+        }
+      }
+    }
+  }
 
-    if (cfg.boostrap) {
-      HoodieBootstrapMetadataSync bootstrapMetadataSync = new 
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath, 
cfg.targetBasePath, schema);
-      bootstrapMetadataSync.run();
+  private void runBootstrapSync(HoodieSparkTable sparkTable, 
HoodieTableMetaClient sourceTableMetaClient,
+                                HoodieTableMetaClient targetTableMetaClient, 
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager) 
throws Exception {
+    Option<HoodieInstant> sourceLastInstant = 
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+    if (!sourceLastInstant.isPresent()) {
       return;
     }
 
-    List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath, 
targetTableMetaClient, sourceTableMetaClient);
-    HoodieSparkEngineContext hoodieSparkEngineContext = new 
HoodieSparkEngineContext(jsc);
-    for (HoodieInstant instant : instantsToSync) {
-      try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
-        String commitTime = writeClient.startCommit(instant.getAction(), 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .transitionRequestedToInflight(
-                instant.getAction(),
-                commitTime);
-
-        Option<byte[]> commitMetadataInBytes = Option.empty();
-        List<String> pendingInstants = 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
-        SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
-            targetTableMetaClient, 
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(), 
pendingInstants);
-
-
-        HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig, 
hoodieSparkEngineContext, targetTableMetaClient);
-        if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
-          HoodieCommitMetadata sourceCommitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
-          HoodieCommitMetadata tgtCommitMetadata = 
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
+    String commitTime = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, 
targetTableMetaClient); // single writer. will rollback any pending commits 
from previous round.
+    targetTableMetaClient
+        .reloadActiveTimeline()
+        .transitionRequestedToInflight(
+            HoodieTimeline.REPLACE_COMMIT_ACTION,
+            commitTime);
 
-          // add metadata sync checkpoint info
-          tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+    HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+    try {
+      txnManager.beginTransaction(Option.of(instant), Option.empty());
+      SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+          (SparkHoodieBackedMetadataSyncMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get();
+      
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+    } finally {
+      txnManager.endTransaction(Option.of(instant));
+    }
+      Option<HoodieInstant> targetTableLastInstant = 
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      SyncMetadata syncMetadata = 
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+          cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(), 
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), 
sourceLastInstant.get()));
+      HoodieReplaceCommitMetadata replaceCommitMetadata = 
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+      replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
+      targetTableMetaClient
+          .reloadActiveTimeline()
+          .saveAsComplete(new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+              
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
 
-          HoodieReplaceCommitMetadata srcReplaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
-          HoodieReplaceCommitMetadata tgtReplaceCommitMetadata = 
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+  private HoodieReplaceCommitMetadata 
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient 
sourceTableMetaClient, Schema schema) {
+    List<HoodieInstant> replaceCommits =  
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+        .filter(instant -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+    HoodieReplaceCommitMetadata replaceCommitMetadata = new 
HoodieReplaceCommitMetadata();
+    Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+    replaceCommits.forEach(replaceCommit -> {
+      try {
+        HoodieReplaceCommitMetadata commitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
 HoodieReplaceCommitMetadata.class);
+        Map<String, List<String>> partitionsToReplacedFileGroups = 
commitMetadata.getPartitionToReplaceFileIds();
+        for (Map.Entry<String, List<String>> entry : 
partitionsToReplacedFileGroups.entrySet()) {
+          String partition = entry.getKey();
+          List<String> replacedFiles = entry.getValue();
+          totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new 
ArrayList<>());
+          totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Failed to deserialize instant " + 
replaceCommit.getTimestamp(), e);
+      }
+    });
 
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata, 
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
-          }
-          // add metadata sync checkpoint info
-          
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA, 
syncMetadata.toJson());
-          commitMetadataInBytes = 
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
-        } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
-          HoodieCleanMetadata srcCleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-              
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
-          HoodieCleanMetadata tgtCleanMetadata = 
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
-              writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
-          //HoodieCleanMetadata tgtCleanMetadata = 
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
-          try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
-                   (HoodieTableMetadataWriter) 
sparkTable.getMetadataWriter(commitTime).get()) {
-            hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
-          }
+    
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
 
-          commitMetadataInBytes = 
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
-        }
+    replaceCommitMetadata.addMetadata("schema", schema.toString());
+    replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+    replaceCommitMetadata.setCompacted(false);
+    return replaceCommitMetadata;
+  }
 
-        targetTableMetaClient
-            .reloadActiveTimeline()
-            .saveAsComplete(new HoodieInstant(true, instant.getAction(), 
commitTime), commitMetadataInBytes);
-      }
+  private void runArchiver(

Review Comment:
   I think Archival Job handles locks internally? Do we need explicitly set the 
lock? Need to check code. For now. I'll move this code into the synchronized 
block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to