linliu-code commented on code in PR #12447:
URL: https://github.com/apache/hudi/pull/12447#discussion_r1881187158


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecords
       return 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
     }, record -> record));
   }
+
+  @VisibleForTesting
+  public static Map<String, String> 
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+                                                                       
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+                                                                       
HoodieMetadataLogRecordReader logRecordScanner) {
+    Map<String, String> recordKeyMap = new HashMap<>();
+    Set<String> keySet = new TreeSet<>(recordKeys);
+    Set<String> deletedRecordsFromLogs = new HashSet<>();
+    Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
+    // Note that: we read the log records from the oldest to the latest!!!
+    // If we change the read order, we need update the following logic 
accordingly.
+    logRecordScanner.getRecords().forEach(record -> {
+      String recordKey = 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+      HoodieMetadataPayload payload = record.getData();
+      if (!payload.isDeleted()) { // process only valid records.
+        if (keySet.contains(recordKey)) {
+          logRecordsMap.put(recordKey, record);
+        }
+      } else {
+        // When and Only when the latest log record is non-tombstone, 
logRecordMap contains its recordKey.
+        logRecordsMap.remove(recordKey);
+        deletedRecordsFromLogs.add(recordKey);
+      }
+    });
+
+    // Return non-tombstone records from the log files.
+    logRecordsMap.forEach((key, value) -> recordKeyMap.put(
+        key, 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey())));
+    // Return non-tombstone records from the base file.
+    if (baseFileRecords != null) {
+      baseFileRecords.forEach((key, value) -> {
+        if (!deletedRecordsFromLogs.contains(key)) {
+          recordKeyMap.put(key, 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));

Review Comment:
   @nsivabalan , the it is the merge that cause the bug. Suppose we have `br` 
and a non-delete `lr`. In this case, after the merge, `lr` is returned. Now it 
will go through the ` if (!deletedRecordsFromLogs.contains(key))`, since 
`deletedRecordsFromLogs` contains its rk, `lr` is gone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to