nsivabalan commented on code in PR #12984:
URL: https://github.com/apache/hudi/pull/12984#discussion_r1997495050
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -839,43 +838,62 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
return baseFileWriteStats.stream()
.flatMap(writeStat -> {
HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
- return
CollectionUtils.toStream(BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage));
+ return
CollectionUtils.toStream(RecordIndexRecordKeyParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage));
})
.iterator();
}
// Process log file write stats
if (!logFileWriteStats.isEmpty()) {
- String partitionPath =
logFileWriteStats.get(0).getPartitionPath();
- List<String> currentLogFilePaths = logFileWriteStats.stream()
- .map(writeStat -> new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()).toString())
- .collect(Collectors.toList());
- List<String> allLogFilePaths = logFileWriteStats.stream()
- .flatMap(writeStat -> {
- checkState(writeStat instanceof HoodieDeltaWriteStat, "Log
file should be associated with a delta write stat");
- List<String> currentLogFiles = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
- .map(logFile -> new StoragePath(new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()),
logFile).toString())
- .collect(Collectors.toList());
- return currentLogFiles.stream();
- })
- .collect(Collectors.toList());
- // Extract revived and deleted keys
- Pair<Set<String>, Set<String>> revivedAndDeletedKeys =
- getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient,
instantTime, engineType, allLogFilePaths, finalWriterSchemaOpt,
currentLogFilePaths);
- Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
- Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
- // Process revived keys to create updates
- List<HoodieRecord> revivedRecords = revivedKeys.stream()
- .map(recordKey ->
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId,
instantTime, writesFileIdEncoding))
- .collect(Collectors.toList());
- // Process deleted keys to create deletes
- List<HoodieRecord> deletedRecords = deletedKeys.stream()
- .map(HoodieMetadataPayload::createRecordIndexDelete)
- .collect(Collectors.toList());
- // Combine all records into one list
+ FileSlice previousFileSliceForFileId =
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+ FileSlice latestFileSlicesIncludingInflight = new
FileSlice(previousFileSliceForFileId);
Review Comment:
Note to Reviewer: I attempted going w/ FileGroupReader to fetch these
values. After spending more than 2 hours, I gave up due to scope creep.
Due to broadcast manager, and this class being in hudi-common module, and
even HoodieBackedTableMetadataWriter is in client-common module, I had to make
lot of changes to bring in File Group reader at this layer via call back or
some other contrived way. It definitely needs some brainstorming w/ @yihua who
is taking up fixing Metadata record generation. So for now, to unblock 1.0.2, I
am going ahead w/ file slice reader.
We can take up cleaning up all of usages of file slices reader in MDT writer
code path together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]