nsivabalan commented on code in PR #12447:
URL: https://github.com/apache/hudi/pull/12447#discussion_r1881148569
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecords
return
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}
+
+ @VisibleForTesting
+ public static Map<String, String>
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+
HoodieMetadataLogRecordReader logRecordScanner) {
+ Map<String, String> recordKeyMap = new HashMap<>();
+ Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
+ // Note that: we read the log records from the oldest to the latest!!!
+ // If we change the read order, we need update the following logic
accordingly.
+ logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+ HoodieMetadataPayload payload = record.getData();
+ if (!payload.isDeleted()) { // process only valid records.
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ // When and Only when the latest log record is non-tombstone,
logRecordMap contains its recordKey.
+ logRecordsMap.remove(recordKey);
+ deletedRecordsFromLogs.add(recordKey);
+ }
+ });
+
+ // Return non-tombstone records from the log files.
+ logRecordsMap.forEach((key, value) -> recordKeyMap.put(
+ key,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey())));
+ // Return non-tombstone records from the base file.
+ if (baseFileRecords != null) {
+ baseFileRecords.forEach((key, value) -> {
+ if (!deletedRecordsFromLogs.contains(key)) {
+ recordKeyMap.put(key,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
Review Comment:
For eg, I went to master and made edits as below. all tests you have added
in this patch worked.
```
+ @VisibleForTesting
+ public static Map<String, String>
reverseLookupSecondaryKeysInternal(List<String> keySet, Map<String,
HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+
HoodieMetadataLogRecordReader logRecordScanner) {
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
+ // Map of recordKey (primaryKey) -> log record that is not deleted for
all input recordKeys
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
+ logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+ HoodieMetadataPayload payload = record.getData();
+ if (!payload.isDeleted()) { // process only valid records.
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ deletedRecordsFromLogs.remove(recordKey); // we can check if its
present and then remove if need be
+ }
+ } else {
+ deletedRecordsFromLogs.add(recordKey);
+ logRecordsMap.remove(recordKey);
+ }
+ });
+
+ Map<String, String> recordKeyMap = new HashMap<>();
+ if (baseFileRecords == null || baseFileRecords.isEmpty()) {
+ logRecordsMap.forEach((key1, value1) -> {
+ if (!value1.getData().isDeleted() &&
!deletedRecordsFromLogs.contains(key1)) {
+ recordKeyMap.put(key1,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value1.getRecordKey()));
+ }
+ });
+ } else {
+ // Iterate over all provided log-records, merging them into existing
records
+ logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1,
value1, (oldRecord, newRecord) -> {
+ Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
+ return mergedRecord.orElse(null);
+ }));
+ baseFileRecords.forEach((key, value) -> {
+ if (!deletedRecordsFromLogs.contains(key)) {
+ recordKeyMap.put(key,
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
+ }
+ });
+ }
return recordKeyMap;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]