nsivabalan commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2140668976
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1096,6 +1119,126 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ throw new HoodieMetadataException("Starting the same commit in Metadata
table more than once w/o rolling back : " + instantTime);
+ }
+
+ // this is where we might instantiate the write client to metadata table
for the first time.
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ @Override
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+ if (mdtPartitionsToTag.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = writeStatus.flatMap(
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // tag records w/ location
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+
+ // write partial writes to mdt table (for those partitions where streaming
writes are enabled)
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
+ // dag not yet de-referenced. do not invoke any action on
mdtWriteStatusHoodieData yet.
+ return mdtWriteStatusHoodieData;
+ }
+
+ /**
+ * Upsert the tagged records to metadata table in the streaming flow.
+ * @param hoodieFileGroupsToUpdateAndTaggedMdtRecords Pair of {@link List}
of {@link HoodieFileGroupId} referring to list of all file groups ids that
could receive updates
+ * and {@link HoodieData}
of tagged {@link HoodieRecord}s.
+ * @param instantTime instant time of interest.
+ * @return
+ */
+ protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String
instantTime) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ // update metadata for left over partitions which does not have streaming
writes support.
+ allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context,
metadata, instantTime).map(writeStatus ->
writeStatus.getStat()).collectAsList());
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ private HoodieData<WriteStatus>
prepareAndWriteToNonStreamingPartitions(HoodieEngineContext context,
HoodieCommitMetadata commitMetadata, String instantTime) {
+ Set<String> mdtPartitionsToUpdate =
getNonStreamingMetadataPartitionsToUpdate();
+ Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords =
new MdtPartitionRecordGeneratorBatchMode(instantTime, commitMetadata,
mdtPartitionsToUpdate)
+ .convertMetadata();
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = context.emptyHoodieData();
+ for (Map.Entry<String, HoodieData<HoodieRecord>> entry:
mdtPartitionsAndUnTaggedRecords.entrySet()) {
+ untaggedMdtRecords = untaggedMdtRecords.union(entry.getValue());
+ }
+
+ // write to mdt table for non streaming mdt partitions
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedRecords =
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
mdtPartitionsToUpdate);
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords,
instantTime));
+ return mdtWriteStatusHoodieData;
+ }
+
+ private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
+ Set<String> toReturn =
enabledPartitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ STREAMING_WRITES_SUPPORTED_PARTITIONS.forEach(metadataPartitionType ->
toReturn.remove(metadataPartitionType.getPartitionPath()));
+ return toReturn;
+ }
+
+ /**
+ * Returns List of pair of partition name and MDT fileId updated in the
partition along with the tagged MDT records.
+ * @param untaggedMdtRecords Untagged MDT records
+ */
+ protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
tagRecordsWithLocationWithStreamingWrites(HoodieData<HoodieRecord>
untaggedMdtRecords,
+
Set<String> enabledMetadataPartitions) {
+ List<HoodieFileGroupId> updatedMDTFileGroupIds = new ArrayList<>();
+ // Fetch latest file slices for all enabled MDT partitions
+ Map<String, List<FileSlice>> mdtPartitionLatestFileSlices = new
HashMap<>();
+ try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+ enabledMetadataPartitions.forEach(partitionName -> {
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ if (fileSlices.isEmpty()) {
+ // scheduling of INDEX only initializes the file group and not add
commit
Review Comment:
yes, when a partition is going through async indexing, ingestion writers are
expected to still write log files to these partitions. for these cases, latest
committed file slice might be empty and so we are polling for inflight file
slices as well.
So, generally not applicable. only incase of async indexing is inflight.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]