codope commented on code in PR #12447:
URL: https://github.com/apache/hudi/pull/12447#discussion_r1888498250
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecords
return
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}
+
+ @VisibleForTesting
+ public static Map<String, String>
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+
HoodieMetadataLogRecordReader logRecordScanner) {
+ Map<String, String> recordKeyMap = new HashMap<>();
+ Set<String> keySet = new TreeSet<>(recordKeys);
Review Comment:
Do we need to create a TreeSet of recordKeys? I don't think order of record
keys matters here.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecords
return
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}
+
+ @VisibleForTesting
+ public static Map<String, String>
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+
HoodieMetadataLogRecordReader logRecordScanner) {
+ Map<String, String> recordKeyMap = new HashMap<>();
+ Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
+ // Note that: we read the log records from the oldest to the latest!!!
+ // If we change the read order, we need update the following logic
accordingly.
+ logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+ HoodieMetadataPayload payload = record.getData();
+ if (!payload.isDeleted()) { // process only valid records.
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ // When and Only when the latest log record is non-tombstone,
logRecordMap contains its recordKey.
+ logRecordsMap.remove(recordKey);
Review Comment:
So this is the main fix. If i understand correctly, after this step the
logRecordsMap will not contain any tomstone record, therefore we do not need to
do the check in `HoodieMetadataPayload.combineSecondaryIndexRecord` later right?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecords
return
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}
+
+ @VisibleForTesting
+ public static Map<String, String>
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+
HoodieMetadataLogRecordReader logRecordScanner) {
+ Map<String, String> recordKeyMap = new HashMap<>();
+ Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
+ // Note that: we read the log records from the oldest to the latest!!!
+ // If we change the read order, we need update the following logic
accordingly.
+ logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+ HoodieMetadataPayload payload = record.getData();
+ if (!payload.isDeleted()) { // process only valid records.
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ // When and Only when the latest log record is non-tombstone,
logRecordMap contains its recordKey.
+ logRecordsMap.remove(recordKey);
+ deletedRecordsFromLogs.add(recordKey);
+ }
+ });
+
+ // Return non-tombstone records from the log files.
+ logRecordsMap.forEach((key, value) -> recordKeyMap.put(
+ key,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey())));
+ // Return non-tombstone records from the base file.
+ if (baseFileRecords != null) {
+ baseFileRecords.forEach((key, value) -> {
+ if (!deletedRecordsFromLogs.contains(key)) {
+ recordKeyMap.put(key,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
Review Comment:
Both are fine IMO, but maybe I am biased due to prior work. Ideally, I would
like to retain the semantics of `combineSecondaryIndexRecord` but the current
patch is also simple to understand. Plus, if we filter out tombstone records
before calling `combineSecondaryIndexRecord` then there is no point in keeping
it. Please remove this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]