lokeshj1703 commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2112352171


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1094,6 +1116,137 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.requestedTime().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
+      }
+      metadataMetaClient.reloadActiveTimeline();
+      reInitWriteClient();
+    }
+
+    getWriteClient().startCommitWithTime(instantTime, 
HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  public void wrapUpStreamingWriteToMetadataTableAndCompleteCommit(String 
instantTime, HoodieEngineContext context, List<HoodieWriteStat> 
metadataWriteStatsSoFar, HoodieCommitMetadata metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);
+    allWriteStats.addAll(prepareAndWriteToFILESPartition(context, metadata, 
instantTime).map(writeStatus -> writeStatus.getStat()).collectAsList());
+    // finally committing to MDT
+    getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(), 
HoodieTimeline.DELTA_COMMIT_ACTION,
+        Collections.emptyMap(), Option.empty());
+  }
+
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    // Generate HoodieRecords for MDT partitions which can be generated just 
by using one WriteStatus
+
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    HoodieData<Pair<String, HoodieRecord>> perWriteStatusRecords = 
writeStatus.flatMap(
+        new 
MetadataIndexGenerator.PerWriteStatsBasedIndexGenerator(mdtPartitionsToTag, 
dataWriteConfig, storageConf, instantTime));

Review Comment:
   Its a static access directly to PerWriteStatsBasedIndexGenerator constructor



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1094,6 +1116,137 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.requestedTime().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
+      }
+      metadataMetaClient.reloadActiveTimeline();
+      reInitWriteClient();
+    }
+
+    getWriteClient().startCommitWithTime(instantTime, 
HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  public void wrapUpStreamingWriteToMetadataTableAndCompleteCommit(String 
instantTime, HoodieEngineContext context, List<HoodieWriteStat> 
metadataWriteStatsSoFar, HoodieCommitMetadata metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);
+    allWriteStats.addAll(prepareAndWriteToFILESPartition(context, metadata, 
instantTime).map(writeStatus -> writeStatus.getStat()).collectAsList());
+    // finally committing to MDT
+    getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(), 
HoodieTimeline.DELTA_COMMIT_ACTION,
+        Collections.emptyMap(), Option.empty());
+  }
+
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    // Generate HoodieRecords for MDT partitions which can be generated just 
by using one WriteStatus
+
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    HoodieData<Pair<String, HoodieRecord>> perWriteStatusRecords = 
writeStatus.flatMap(
+        new 
MetadataIndexGenerator.PerWriteStatsBasedIndexGenerator(mdtPartitionsToTag, 
dataWriteConfig, storageConf, instantTime));
+
+    // Generate HoodieRecords for MDT partitions which need per hudi partition 
writeStats in one spark task
+    // for eg, partition stats index
+    HoodieData<Pair<String, HoodieRecord>> perPartitionRecords = 
metadataIndexGenerator.get().prepareMDTRecordsGroupedByHudiPartition(writeStatus);

Review Comment:
   removed it



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1094,6 +1116,137 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.requestedTime().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {

Review Comment:
   I have added a test scenario where the rollback is useful. 
   If data table commit is rolled back, then the corresponding MDT commit 
should also be rolled back. It is better for MDT rollbacks to be tied to data 
table cleaning service I feel. We probably do not want the cleaner to rollback 
the DT and MDT commits in an independent manner.
   For now I have retained the rollback code for existing commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to