codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1896420493


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +859,131 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
     }
   }
 
+  /**
+   * Get the deleted keys from the merged log files. The logic is as below. 
Suppose:
+   * <li>A = Set of keys that are valid (not deleted) in the previous log 
files merged</li>
+   * <li>B = Set of keys that are valid in all log files including current log 
file merged</li>
+   * <li>C = Set of keys that are deleted in the current log file</li>
+   * <li>Then, D = Set of deleted keys = C - (B - A)</li>
+   *
+   * @param dataTableMetaClient  data table meta client
+   * @param instantTime          timestamp of the commit
+   * @param engineType           engine type (SPARK, FLINK, JAVA)
+   * @param logFilePaths         list of log file paths for which records need 
to be merged
+   * @param finalWriterSchemaOpt records schema
+   * @param fullFilePath         full path of the current log file
+   * @return set of deleted keys
+   */
+  @VisibleForTesting
+  public static Set<String> getDeletedKeysFromMergedLogs(HoodieTableMetaClient 
dataTableMetaClient,
+                                                         String instantTime,
+                                                         EngineType engineType,
+                                                         List<String> 
logFilePaths,
+                                                         Option<Schema> 
finalWriterSchemaOpt,
+                                                         StoragePath 
fullFilePath) {
+    // Separate out the current log file
+    List<String> logFilePathsWithoutCurrentLogFile = logFilePaths.stream()
+        .filter(logFilePath -> !logFilePath.equals(fullFilePath.toString()))
+        .collect(toList());
+    if (logFilePathsWithoutCurrentLogFile.isEmpty()) {
+      // Only current log file is present, so we can directly get the deleted 
record keys from it and return the RLI records.
+      Map<String, HoodieRecord> currentLogRecords =
+          getLogRecords(Collections.singletonList(fullFilePath.toString()), 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+      return currentLogRecords.entrySet().stream()
+          .filter(entry -> isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+          .map(Map.Entry::getKey)
+          .collect(Collectors.toSet());
+    }
+    // Fetch log records for all log files
+    Map<String, HoodieRecord> allLogRecords =
+        getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, 
instantTime, engineType);
+
+    // Fetch log records for previous log files (excluding the current log 
file)
+    Map<String, HoodieRecord> previousLogRecords =
+        getLogRecords(logFilePathsWithoutCurrentLogFile, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, engineType);
+
+    // Fetch log records for the current log file
+    Map<String, HoodieRecord> currentLogRecords =
+        getLogRecords(Collections.singletonList(fullFilePath.toString()), 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+
+    // Calculate valid (non-deleted) keys
+    Set<String> validKeysForPreviousLogs = 
previousLogRecords.entrySet().stream()
+        .filter(entry -> !isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+
+    Set<String> validKeysIncludingCurrentLogs = 
allLogRecords.entrySet().stream()
+        .filter(entry -> !isDeleteRecord(dataTableMetaClient, 
finalWriterSchemaOpt, entry.getValue()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+
+    // Calculate deleted keys in the current log file
+    Set<String> deletedKeysInCurrentLog = currentLogRecords.entrySet().stream()

Review Comment:
   Yeah I thought about doing it that way but could not convince myself that it 
will always be correct for log files for all kinds of merge strategies. It 
works perfectly well for base files.
   
   My line of thinking is as follows:
   ```
   1. Include keys that are valid in prev and deleted in current
   2. Exclude keys that are deleted in prev but revived in current
   3. Include keys that are newly deleted in current
   
   For example, let's say we have 4 keys spread across 3 log files as below. 
   Here, `Prev` means 2 previous log files and `Current` means entire slice 
including the third inflight log file.
   True/False indicates `isDeleted`, and `-` means key not present.
   
   Key   Prev    Current
   k1    false   true
   k2    true    false
   k3      -     true
   k4    false    -
   ```
   If we go by the simpler logic, then it will only consider `[k1, k4]` (as 
these two are the only valid entries in previous log files). And then finally 
the `deletedRecordKeyList` will contain only `k1`. However, I think the 
expected deleted keys should be `[k1, k3]` right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to