codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1893447744
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,22 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
} else if (FSUtils.isLogFile(fullFilePath)) {
- // for logs, we only need to process log files containing deletes
- if (writeStat.getNumDeletes() > 0) {
- Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
- finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
- return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
- }
- // ignore log file data blocks.
- return new ArrayList<HoodieRecord>().iterator();
+ checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file
should be associated with a delta write stat");
+ List<String> logFilePaths = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
+ .map(logFile -> new StoragePath(new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()),
logFile).toString())
+ .collect(toList());
+ // For log records, we only need to process deletes. However,
deletes may or may not be part of delete blocks (delete using custom merge
mode).
+ // So, we need to process the log files to get the record keys
that are deleted. We can then generate RLI records for those keys.
+ // 1. Get all merged record keys - any custom merger which
handles delete outside delete block should not be present in merged keys.
+ // 2. Get all un-merged record keys - this will contain all
valid and deleted keys, irrespective of delete block or merge mode.
+ // 3. Get deleted record keys - this will be the difference of
merged and un-merged keys.
+ Set<String> mergedRecordKeys = getRecordKeys(logFilePaths,
dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime, true,
true, true, engineType);
+ Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths,
dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime, true,
true, false, engineType);
+ Set<String> deletedRecordKeys = new
HashSet<>(unMergedRecordKeys);
+ deletedRecordKeys.removeAll(mergedRecordKeys);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
Review Comment:
This is the logic to process deletes across all log files irrespective of
merge mode. This is going to be a bit costly because we're doing both merged
and unmerged log records scanning to figure out the deletes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]