codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1895973744
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,22 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
} else if (FSUtils.isLogFile(fullFilePath)) {
- // for logs, we only need to process log files containing deletes
- if (writeStat.getNumDeletes() > 0) {
- Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
- finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
- return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
- }
- // ignore log file data blocks.
- return new ArrayList<HoodieRecord>().iterator();
+ checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file
should be associated with a delta write stat");
+ List<String> logFilePaths = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
+ .map(logFile -> new StoragePath(new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()),
logFile).toString())
+ .collect(toList());
+ // For log records, we only need to process deletes. However,
deletes may or may not be part of delete blocks (delete using custom merge
mode).
+ // So, we need to process the log files to get the record keys
that are deleted. We can then generate RLI records for those keys.
+ // 1. Get all merged record keys - any custom merger which
handles delete outside delete block should not be present in merged keys.
+ // 2. Get all un-merged record keys - this will contain all
valid and deleted keys, irrespective of delete block or merge mode.
+ // 3. Get deleted record keys - this will be the difference of
merged and un-merged keys.
+ Set<String> mergedRecordKeys = getRecordKeys(logFilePaths,
dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime, true,
true, true, engineType);
+ Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths,
dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime, true,
true, false, engineType);
+ Set<String> deletedRecordKeys = new
HashSet<>(unMergedRecordKeys);
Review Comment:
I have changed the RLI logic as discussed, however, for SI the logic is same
i.e. we still read from SI (to figure out delete records). But it avoids any
collection on the driver. SI update is all through distributed task execution
now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]