nsivabalan commented on code in PR #12214:
URL: https://github.com/apache/hudi/pull/12214#discussion_r1832064458


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +621,25 @@ public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String
    * @param secondaryKey Secondary key of the record
    * @param isDeleted    true if this record is deleted
    */
-  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
-
-    HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, 
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndexRecord(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
+    // the payload key is in the format of "secondaryKey_primaryKey"
+    HoodieKey key = new HoodieKey(String.format("%s%s%s",secondaryKey, 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR, recordKey), partitionPath);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), new 
HoodieSecondaryIndexInfo(isDeleted));
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  public String getRecordKeyFromSecondaryIndex() {
-    return secondaryIndexMetadata.getRecordKey();
+  public static String getRecordKeyFromSecondaryIndex(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the primary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);
+    return key.substring(key.lastIndexOf(SECONDARY_INDEX_RECORD_KEY_SEPARATOR) 
+ 1);
+  }
+
+  public static String getSecondaryKeyFromSecondaryIndex(String key) {

Review Comment:
   lets name this getSecondaryKeyFromSecondaryIndexKey



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String> 
reverseLookupSecondaryKeys(String partitionName, Lis
 
       Set<String> keySet = new TreeSet<>(recordKeys);
       Set<String> deletedRecordsFromLogs = new HashSet<>();
+      // Map of recordKey (primaryKey) -> log record that is not deleted for 
all input recordKeys
       Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
       logRecordScanner.getRecords().forEach(record -> {
+        String recordKey = 
getRecordKeyFromSecondaryIndex(record.getRecordKey());
         HoodieMetadataPayload payload = record.getData();
-        if (!payload.isDeleted()) { // process only valid records.
-          String recordKey = payload.getRecordKeyFromSecondaryIndex();
-          if (keySet.contains(recordKey)) {
-            logRecordsMap.put(recordKey, record);
-          }
+        if (!payload.isDeleted() && keySet.contains(recordKey)) { // process 
only valid records.
+          logRecordsMap.put(recordKey, record);
         } else {
-          deletedRecordsFromLogs.add(record.getRecordKey());
+          deletedRecordsFromLogs.add(recordKey);
         }
       });
 
       // Map of (record-key, secondary-index-record)
       Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = 
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
-      // Iterate over all provided log-records, merging them into existing 
records
-      logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1, 
value1, (oldRecord, newRecord) -> {
-        Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = 
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
-        return mergedRecord.orElseGet(null);
-      }));
-      baseFileRecords.forEach((key, value) -> {
-        if (!deletedRecordsFromLogs.contains(key)) {
-          recordKeyMap.put(key, value.getRecordKey());
-        }
-      });
+      if (baseFileRecords.isEmpty()) {

Review Comment:
   are there chances that baseFileRecords itself is null? if so, should we also 
check for null here? 
   for eg, if slice contain no base file. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String> 
reverseLookupSecondaryKeys(String partitionName, Lis
 
       Set<String> keySet = new TreeSet<>(recordKeys);
       Set<String> deletedRecordsFromLogs = new HashSet<>();
+      // Map of recordKey (primaryKey) -> log record that is not deleted for 
all input recordKeys
       Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
       logRecordScanner.getRecords().forEach(record -> {
+        String recordKey = 
getRecordKeyFromSecondaryIndex(record.getRecordKey());
         HoodieMetadataPayload payload = record.getData();
-        if (!payload.isDeleted()) { // process only valid records.
-          String recordKey = payload.getRecordKeyFromSecondaryIndex();
-          if (keySet.contains(recordKey)) {
-            logRecordsMap.put(recordKey, record);
-          }
+        if (!payload.isDeleted() && keySet.contains(recordKey)) { // process 
only valid records.
+          logRecordsMap.put(recordKey, record);
         } else {
-          deletedRecordsFromLogs.add(record.getRecordKey());
+          deletedRecordsFromLogs.add(recordKey);
         }
       });
 
       // Map of (record-key, secondary-index-record)
       Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = 
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
-      // Iterate over all provided log-records, merging them into existing 
records
-      logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1, 
value1, (oldRecord, newRecord) -> {
-        Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = 
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
-        return mergedRecord.orElseGet(null);
-      }));
-      baseFileRecords.forEach((key, value) -> {
-        if (!deletedRecordsFromLogs.contains(key)) {
-          recordKeyMap.put(key, value.getRecordKey());
-        }
-      });
+      if (baseFileRecords.isEmpty()) {
+        logRecordsMap.forEach((key1, value1) -> {
+          if (!value1.getData().isDeleted()) {
+            recordKeyMap.put(key1, 
getSecondaryKeyFromSecondaryIndex(value1.getRecordKey()));
+          }
+        });
+      } else {
+        // Iterate over all provided log-records, merging them into existing 
records
+        logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1, 
value1, (oldRecord, newRecord) -> {
+          Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = 
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
+          if (!mergedRecord.isPresent() || 
mergedRecord.get().getData().isDeleted()) {
+            System.out.println(">>> mergedRecord is not present or deleted " + 
mergedRecord);

Review Comment:
   can you remove SOPs



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java:
##########
@@ -78,24 +78,27 @@ public Map<String, HoodieRecord> getRecords() {
 
   @Override
   public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    // Merge the new record with the existing record in the map
-    HoodieRecord<T> oldRecord = (HoodieRecord<T>) 
records.get(newRecord.getRecordKey());
-    if (oldRecord != null) {
-      LOG.debug("Merging new record with existing record in the map. Key: {}", 
newRecord.getRecordKey());
-      recordMerger.fullOuterMerge(oldRecord, readerSchema, newRecord, 
readerSchema, this.getPayloadProps()).forEach(
-          mergedRecord -> {
-            HoodieRecord<T> combinedRecord = mergedRecord.getLeft();
-            if (combinedRecord.getData() != oldRecord.getData()) {
-              HoodieRecord latestHoodieRecord = 
getLatestHoodieRecord(newRecord, combinedRecord, newRecord.getRecordKey());
-              records.put(newRecord.getRecordKey(), latestHoodieRecord.copy());
-            }
-          });
+    String key = newRecord.getRecordKey();

Review Comment:
   why separate patch. can we clean it up in this patch itself. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String> 
reverseLookupSecondaryKeys(String partitionName, Lis
 
       Set<String> keySet = new TreeSet<>(recordKeys);
       Set<String> deletedRecordsFromLogs = new HashSet<>();
+      // Map of recordKey (primaryKey) -> log record that is not deleted for 
all input recordKeys
       Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new 
HashMap<>();
       logRecordScanner.getRecords().forEach(record -> {
+        String recordKey = 
getRecordKeyFromSecondaryIndex(record.getRecordKey());
         HoodieMetadataPayload payload = record.getData();
-        if (!payload.isDeleted()) { // process only valid records.
-          String recordKey = payload.getRecordKeyFromSecondaryIndex();
-          if (keySet.contains(recordKey)) {
-            logRecordsMap.put(recordKey, record);
-          }
+        if (!payload.isDeleted() && keySet.contains(recordKey)) { // process 
only valid records.
+          logRecordsMap.put(recordKey, record);
         } else {
-          deletedRecordsFromLogs.add(record.getRecordKey());
+          deletedRecordsFromLogs.add(recordKey);

Review Comment:
   we could go into this else block on 2 conditions. 
   not everytime its a delete record. 
   can you fix that. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +621,25 @@ public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String
    * @param secondaryKey Secondary key of the record
    * @param isDeleted    true if this record is deleted
    */
-  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
-
-    HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, 
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndexRecord(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
+    // the payload key is in the format of "secondaryKey_primaryKey"
+    HoodieKey key = new HoodieKey(String.format("%s%s%s",secondaryKey, 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR, recordKey), partitionPath);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), new 
HoodieSecondaryIndexInfo(isDeleted));
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  public String getRecordKeyFromSecondaryIndex() {
-    return secondaryIndexMetadata.getRecordKey();
+  public static String getRecordKeyFromSecondaryIndex(String key) {

Review Comment:
   lets name this getRecordKeyFromSecondaryIndexKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to