nsivabalan commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2136891851


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1096,6 +1119,126 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes 
should be enabled for startCommit API");
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      throw new HoodieMetadataException("Starting the same commit in Metadata 
table more than once w/o rolling back : " + instantTime);
+    }
+
+    // this is where we might instantiate the write client to metadata table 
for the first time.
+    getWriteClient().startCommitForMetadataTable(metadataMetaClient, 
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+    if (mdtPartitionsToTag.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    HoodieData<HoodieRecord> untaggedMdtRecords = writeStatus.flatMap(
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
 dataWriteConfig, storageConf, instantTime));
+
+    // tag records w/ location
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
+        mdtPartitionsToTag.stream().map(mdtPartition -> 
mdtPartition.getPartitionPath()).collect(
+            Collectors.toSet()));
+
+    // write partial writes to mdt table (for those partitions where streaming 
writes are enabled)
+    HoodieData<WriteStatus> mdtWriteStatusHoodieData = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
+    // dag not yet de-referenced. do not invoke any action on 
mdtWriteStatusHoodieData yet.
+    return mdtWriteStatusHoodieData;
+  }
+
+  /**
+   * Upsert the tagged records to metadata table in the streaming flow.
+   * @param hoodieFileGroupsToUpdateAndTaggedMdtRecords Pair of {@link List} 
of {@link HoodieFileGroupId} referring to list of all file groups ids that 
could receive updates
+   *                                                    and {@link HoodieData} 
of tagged {@link HoodieRecord}s.
+   * @param instantTime instant time of interest.
+   * @return
+   */
+  protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String 
instantTime) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  public O batchWriteToMetadataTablePartitions(I preppedRecords, String 
instantTime) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  public void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata 
metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);
+    // update metadata for left over partitions which does not have streaming 
writes support.
+    allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context, 
metadata, instantTime).map(writeStatus -> 
writeStatus.getStat()).collectAsList());

Review Comment:
   Note to reviewer: 
   except RLI all other partitions that have been enabled will be written to 
MDT here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to