nsivabalan commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2136891851
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1096,6 +1119,126 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ throw new HoodieMetadataException("Starting the same commit in Metadata
table more than once w/o rolling back : " + instantTime);
+ }
+
+ // this is where we might instantiate the write client to metadata table
for the first time.
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ @Override
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+ if (mdtPartitionsToTag.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = writeStatus.flatMap(
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // tag records w/ location
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+
+ // write partial writes to mdt table (for those partitions where streaming
writes are enabled)
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
+ // dag not yet de-referenced. do not invoke any action on
mdtWriteStatusHoodieData yet.
+ return mdtWriteStatusHoodieData;
+ }
+
+ /**
+ * Upsert the tagged records to metadata table in the streaming flow.
+ * @param hoodieFileGroupsToUpdateAndTaggedMdtRecords Pair of {@link List}
of {@link HoodieFileGroupId} referring to list of all file groups ids that
could receive updates
+ * and {@link HoodieData}
of tagged {@link HoodieRecord}s.
+ * @param instantTime instant time of interest.
+ * @return
+ */
+ protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String
instantTime) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ public O batchWriteToMetadataTablePartitions(I preppedRecords, String
instantTime) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ // update metadata for left over partitions which does not have streaming
writes support.
+ allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context,
metadata, instantTime).map(writeStatus ->
writeStatus.getStat()).collectAsList());
Review Comment:
Note to reviewer:
except RLI all other partitions that have been enabled will be written to
MDT here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]