codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1896420493
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +859,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
}
}
+ /**
+ * Get the deleted keys from the merged log files. The logic is as below.
Suppose:
+ * <li>A = Set of keys that are valid (not deleted) in the previous log
files merged</li>
+ * <li>B = Set of keys that are valid in all log files including current log
file merged</li>
+ * <li>C = Set of keys that are deleted in the current log file</li>
+ * <li>Then, D = Set of deleted keys = C - (B - A)</li>
+ *
+ * @param dataTableMetaClient data table meta client
+ * @param instantTime timestamp of the commit
+ * @param engineType engine type (SPARK, FLINK, JAVA)
+ * @param logFilePaths list of log file paths for which records need
to be merged
+ * @param finalWriterSchemaOpt records schema
+ * @param fullFilePath full path of the current log file
+ * @return set of deleted keys
+ */
+ @VisibleForTesting
+ public static Set<String> getDeletedKeysFromMergedLogs(HoodieTableMetaClient
dataTableMetaClient,
+ String instantTime,
+ EngineType engineType,
+ List<String>
logFilePaths,
+ Option<Schema>
finalWriterSchemaOpt,
+ StoragePath
fullFilePath) {
+ // Separate out the current log file
+ List<String> logFilePathsWithoutCurrentLogFile = logFilePaths.stream()
+ .filter(logFilePath -> !logFilePath.equals(fullFilePath.toString()))
+ .collect(toList());
+ if (logFilePathsWithoutCurrentLogFile.isEmpty()) {
+ // Only current log file is present, so we can directly get the deleted
record keys from it and return the RLI records.
+ Map<String, HoodieRecord> currentLogRecords =
+ getLogRecords(Collections.singletonList(fullFilePath.toString()),
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+ return currentLogRecords.entrySet().stream()
+ .filter(entry -> isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+ // Fetch log records for all log files
+ Map<String, HoodieRecord> allLogRecords =
+ getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt,
instantTime, engineType);
+
+ // Fetch log records for previous log files (excluding the current log
file)
+ Map<String, HoodieRecord> previousLogRecords =
+ getLogRecords(logFilePathsWithoutCurrentLogFile, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, engineType);
+
+ // Fetch log records for the current log file
+ Map<String, HoodieRecord> currentLogRecords =
+ getLogRecords(Collections.singletonList(fullFilePath.toString()),
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+
+ // Calculate valid (non-deleted) keys
+ Set<String> validKeysForPreviousLogs =
previousLogRecords.entrySet().stream()
+ .filter(entry -> !isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ Set<String> validKeysIncludingCurrentLogs =
allLogRecords.entrySet().stream()
+ .filter(entry -> !isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ // Calculate deleted keys in the current log file
+ Set<String> deletedKeysInCurrentLog = currentLogRecords.entrySet().stream()
Review Comment:
Yeah I thought about doing it that way but could not convince myself that it
will always be correct for log files for all kinds of merge strategies. It
works perfectly well for base files.
My line of thinking is as follows:
```
1. Include keys that are valid in prev and deleted in current
2. Exclude keys that are deleted in prev but revived in current
3. Include keys that are newly deleted in current
For example, let's say we have 4 keys spread across 3 log files as below.
Here, `Prev` means 2 previous log files and `Current` means entire slice
including the third inflight log file.
True/False indicates `isDeleted`, and `-` means key not present.
Key Prev Current
k1 false true
k2 true false
k3 - true
k4 false -
```
If we go by the simpler logic, then it will only consider `[k1, k4]` (as
these two are the only valid entries in previous log files). And then finally
the `deletedRecordKeyList` will contain only `k1`. However, I think the
expected deleted keys should be `[k1, k3]` right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]