codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1895973744


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,22 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
             if 
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
               return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
             } else if (FSUtils.isLogFile(fullFilePath)) {
-              // for logs, we only need to process log files containing deletes
-              if (writeStat.getNumDeletes() > 0) {
-                Set<String> deletedRecordKeys = 
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
-                    finalWriterSchemaOpt, maxBufferSize, instantTime, false, 
true);
-                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
-              }
-              // ignore log file data blocks.
-              return new ArrayList<HoodieRecord>().iterator();
+              checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file 
should be associated with a delta write stat");
+              List<String> logFilePaths = ((HoodieDeltaWriteStat) 
writeStat).getLogFiles().stream()
+                  .map(logFile -> new StoragePath(new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()), 
logFile).toString())
+                  .collect(toList());
+              // For log records, we only need to process deletes. However, 
deletes may or may not be part of delete blocks (delete using custom merge 
mode).
+              // So, we need to process the log files to get the record keys 
that are deleted. We can then generate RLI records for those keys.
+              // 1. Get all merged record keys - any custom merger which 
handles delete outside delete block should not be present in merged keys.
+              // 2. Get all un-merged record keys - this will contain all 
valid and deleted keys, irrespective of delete block or merge mode.
+              // 3. Get deleted record keys - this will be the difference of 
merged and un-merged keys.
+              Set<String> mergedRecordKeys = getRecordKeys(logFilePaths, 
dataTableMetaClient,
+                  finalWriterSchemaOpt, maxBufferSize, instantTime, true, 
true, true, engineType);
+              Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths, 
dataTableMetaClient,
+                  finalWriterSchemaOpt, maxBufferSize, instantTime, true, 
true, false, engineType);
+              Set<String> deletedRecordKeys = new 
HashSet<>(unMergedRecordKeys);

Review Comment:
   I have changed the RLI logic as discussed, however, for SI the logic is same 
i.e. we still read from SI (to figure out delete records). But it avoids any 
collection on the driver. SI update is all through distributed task execution 
now.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to