codope commented on code in PR #12447:
URL: https://github.com/apache/hudi/pull/12447#discussion_r1888498250


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecords
       return 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
     }, record -> record));
   }
+
+  @VisibleForTesting
+  public static Map<String, String> 
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+                                                                       
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+                                                                       
HoodieMetadataLogRecordReader logRecordScanner) {
+    Map<String, String> recordKeyMap = new HashMap<>();
+    Set<String> keySet = new TreeSet<>(recordKeys);

Review Comment:
   Do we need to create a TreeSet of recordKeys? I don't think order of record 
keys matters here.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecords
       return 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
     }, record -> record));
   }
+
+  @VisibleForTesting
+  public static Map<String, String> 
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+                                                                       
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+                                                                       
HoodieMetadataLogRecordReader logRecordScanner) {
+    Map<String, String> recordKeyMap = new HashMap<>();
+    Set<String> keySet = new TreeSet<>(recordKeys);
+    Set<String> deletedRecordsFromLogs = new HashSet<>();
+    Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
+    // Note that: we read the log records from the oldest to the latest!!!
+    // If we change the read order, we need update the following logic 
accordingly.
+    logRecordScanner.getRecords().forEach(record -> {
+      String recordKey = 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+      HoodieMetadataPayload payload = record.getData();
+      if (!payload.isDeleted()) { // process only valid records.
+        if (keySet.contains(recordKey)) {
+          logRecordsMap.put(recordKey, record);
+        }
+      } else {
+        // When and Only when the latest log record is non-tombstone, 
logRecordMap contains its recordKey.
+        logRecordsMap.remove(recordKey);

Review Comment:
   So this is the main fix. If i understand correctly, after this step the 
logRecordsMap will not contain any tomstone record, therefore we do not need to 
do the check in `HoodieMetadataPayload.combineSecondaryIndexRecord` later right?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -919,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecords
       return 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
     }, record -> record));
   }
+
+  @VisibleForTesting
+  public static Map<String, String> 
reverseLookupSecondaryKeysInternal(List<String> recordKeys,
+                                                                       
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
+                                                                       
HoodieMetadataLogRecordReader logRecordScanner) {
+    Map<String, String> recordKeyMap = new HashMap<>();
+    Set<String> keySet = new TreeSet<>(recordKeys);
+    Set<String> deletedRecordsFromLogs = new HashSet<>();
+    Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
+    // Note that: we read the log records from the oldest to the latest!!!
+    // If we change the read order, we need update the following logic 
accordingly.
+    logRecordScanner.getRecords().forEach(record -> {
+      String recordKey = 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
+      HoodieMetadataPayload payload = record.getData();
+      if (!payload.isDeleted()) { // process only valid records.
+        if (keySet.contains(recordKey)) {
+          logRecordsMap.put(recordKey, record);
+        }
+      } else {
+        // When and Only when the latest log record is non-tombstone, 
logRecordMap contains its recordKey.
+        logRecordsMap.remove(recordKey);
+        deletedRecordsFromLogs.add(recordKey);
+      }
+    });
+
+    // Return non-tombstone records from the log files.
+    logRecordsMap.forEach((key, value) -> recordKeyMap.put(
+        key, 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey())));
+    // Return non-tombstone records from the base file.
+    if (baseFileRecords != null) {
+      baseFileRecords.forEach((key, value) -> {
+        if (!deletedRecordsFromLogs.contains(key)) {
+          recordKeyMap.put(key, 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));

Review Comment:
   Both are fine IMO, but maybe I am biased due to prior work. Ideally, I would 
like to retain the semantics of `combineSecondaryIndexRecord` but the current 
patch is also simple to understand. Plus, if we filter out tombstone records 
before calling `combineSecondaryIndexRecord` then there is no point in keeping 
it. Please remove this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to