nsivabalan commented on code in PR #12214:
URL: https://github.com/apache/hudi/pull/12214#discussion_r1833514714


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -59,7 +74,21 @@
  * This results in two I/O passes over the log file.
  */
 @NotThreadSafe
-public class HoodieMergedLogRecordScanner extends 
BaseHoodieMergedLogRecordScanner<String> {
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader

Review Comment:
   why do we have to make this extend from AbstractHoodieLogRecordReader 
instead of BaseHoodieMergedLogRecordScanner



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String
    * @param secondaryKey Secondary key of the record
    * @param isDeleted    true if this record is deleted
    */
-  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
-
-    HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, 
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndexRecord(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey, 
recordKey), partitionPath);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), new 
HoodieSecondaryIndexInfo(isDeleted));
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  public String getRecordKeyFromSecondaryIndex() {
-    return secondaryIndexMetadata.getRecordKey();
+  public static String getRecordKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the primary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);
+    int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+    return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+  }
+
+  public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the secondary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);

Review Comment:
   do we need the nonEmpty() check for every key. we can't store empty keys 
into storage right. So, why keep checking for empty on every lookup. 
   in fact, here we are checking twice (once for fetching primary key and once 
for fetching sec key) 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -1055,17 +914,15 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileAllRecords
       return Collections.emptyMap();
     }
 
-    ClosableIterator<HoodieRecord<?>> records = reader.getRecordIterator();
+    ClosableIterator<HoodieRecord<?>> recordIterator = 
reader.getRecordIterator();

Review Comment:
   can we rename fetchBaseFileAllRecordsByPayload to 
`fetchBaseFileAllRecordsByPayloadForSecIndex` 
   to call out that this is some special handling just for SI. 
   I do not want someone else to re-use this method for something else down the 
line



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String
    * @param secondaryKey Secondary key of the record
    * @param isDeleted    true if this record is deleted
    */
-  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
-
-    HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, 
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndexRecord(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey, 
recordKey), partitionPath);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), new 
HoodieSecondaryIndexInfo(isDeleted));
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  public String getRecordKeyFromSecondaryIndex() {
-    return secondaryIndexMetadata.getRecordKey();
+  public static String getRecordKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the primary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);
+    int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+    return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+  }
+
+  public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the secondary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);
+    int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+    return unescapeSpecialChars(key.substring(0, delimiterIndex));
+  }
+
+  static String constructSecondaryIndexKey(String secondaryKey, String 
recordKey) {
+    return escapeSpecialChars(secondaryKey) + 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR + escapeSpecialChars(recordKey);
+  }
+
+  private static String escapeSpecialChars(String str) {
+    StringBuilder escaped = new StringBuilder();
+    for (char c : str.toCharArray()) {
+      if (c == '\\' || c == '$') {
+        escaped.append('\\');  // Add escape character
+      }
+      escaped.append(c);  // Add the actual character
+    }
+    return escaped.toString();
+  }
+
+  private static int getSecondaryIndexKeySeparatorPosition(String key) {
+    int delimiterIndex = -1;
+    boolean isEscape = false;
+
+    // Find the delimiter index while skipping escaped $
+    for (int i = 0; i < key.length(); i++) {
+      char c = key.charAt(i);
+      if (c == '\\' && !isEscape) {
+        isEscape = true;
+      } else if (c == '$' && !isEscape) {
+        delimiterIndex = i;
+        break;
+      } else {
+        isEscape = false;
+      }
+    }
+    checkState(delimiterIndex != -1, "Invalid encoded key format");
+    return delimiterIndex;
+  }
+
+  private static String unescapeSpecialChars(String str) {

Review Comment:
   can we move all of secondary index utils to SecondaryIndexKeyUtils class



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -59,7 +74,21 @@
  * This results in two I/O passes over the log file.
  */
 @NotThreadSafe
-public class HoodieMergedLogRecordScanner extends 
BaseHoodieMergedLogRecordScanner<String> {
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader

Review Comment:
   oh I see it. you are removing BaseHoodieMergedLogRecordScanner



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2042,30 +2043,66 @@ private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(Hood
         Option.empty());
     ClosableIterator<HoodieRecord> fileSliceIterator = 
ClosableIterator.wrap(fileSliceReader);
     return new ClosableIterator<HoodieRecord>() {
+      private HoodieRecord nextValidRecord;
+
       @Override
       public void close() {
         fileSliceIterator.close();
       }
 
       @Override
       public boolean hasNext() {
-        return fileSliceIterator.hasNext();
+        // As part of hasNext() we try to find the valid non-delete record 
that has a secondary key.
+        if (nextValidRecord != null) {
+          return true;
+        }
+
+        // Secondary key is null when there is a delete record, and we only 
have the record key.
+        // This can happen when the record is deleted in the log file.
+        // In this case, we need not index the record because for the given 
record key,
+        // we have already prepared the delete record before reaching this 
point.
+        // NOTE: Delete record should not happen when initializing the 
secondary index i.e. when called from readSecondaryKeysFromFileSlices,
+        // because from that call, we get the merged records as of some 
committed instant. So, delete records must have been filtered out.
+        // Loop to find the next valid record or exhaust the iterator.
+        while (fileSliceIterator.hasNext()) {
+          HoodieRecord record = fileSliceIterator.next();
+          String secondaryKey = getSecondaryKey(record);
+          if (secondaryKey != null) {
+            nextValidRecord = HoodieMetadataPayload.createSecondaryIndexRecord(
+                record.getRecordKey(tableSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD),
+                secondaryKey,
+                indexDefinition.getIndexName(),
+                false
+            );
+            return true;
+          }
+        }
+
+        // If no valid records are found
+        return false;
       }
 
       @Override
       public HoodieRecord next() {
-        HoodieRecord record = fileSliceIterator.next();
-        String recordKey = record.getRecordKey(tableSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-        String secondaryKeyFields = String.join(".", 
indexDefinition.getSourceFields());
-        String secondaryKey;
+        if (!hasNext()) {
+          throw new NoSuchElementException("No more valid records available.");
+        }
+        HoodieRecord result = nextValidRecord;
+        nextValidRecord = null;  // Reset for the next call
+        return result;
+      }
+
+      private String getSecondaryKey(HoodieRecord record) {
         try {
-          GenericRecord genericRecord = (GenericRecord) 
(record.toIndexedRecord(tableSchema, 
CollectionUtils.emptyProps()).get()).getData();
-          secondaryKey = 
HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields, 
true, false);
+          if (record.toIndexedRecord(tableSchema, 
CollectionUtils.emptyProps()).isPresent()) {

Review Comment:
   don't have a easier way to figure out if a record is deleted or not? 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2042,30 +2043,66 @@ private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(Hood
         Option.empty());
     ClosableIterator<HoodieRecord> fileSliceIterator = 
ClosableIterator.wrap(fileSliceReader);
     return new ClosableIterator<HoodieRecord>() {
+      private HoodieRecord nextValidRecord;
+
       @Override
       public void close() {
         fileSliceIterator.close();
       }
 
       @Override
       public boolean hasNext() {
-        return fileSliceIterator.hasNext();
+        // As part of hasNext() we try to find the valid non-delete record 
that has a secondary key.
+        if (nextValidRecord != null) {
+          return true;
+        }
+
+        // Secondary key is null when there is a delete record, and we only 
have the record key.
+        // This can happen when the record is deleted in the log file.
+        // In this case, we need not index the record because for the given 
record key,
+        // we have already prepared the delete record before reaching this 
point.
+        // NOTE: Delete record should not happen when initializing the 
secondary index i.e. when called from readSecondaryKeysFromFileSlices,
+        // because from that call, we get the merged records as of some 
committed instant. So, delete records must have been filtered out.
+        // Loop to find the next valid record or exhaust the iterator.
+        while (fileSliceIterator.hasNext()) {

Review Comment:
   are you sure we need this logic?
   So, in general, a fileSliceReader can return deleted record too is it? whats 
the purpose? 
   we generally don't return deleted records from say LogRecordReader right. 
   so, why a fileSliceReader might return a deleted record. 
   can you help me understand. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String
    * @param secondaryKey Secondary key of the record
    * @param isDeleted    true if this record is deleted
    */
-  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndex(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
-
-    HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
-    HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey, 
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+  public static HoodieRecord<HoodieMetadataPayload> 
createSecondaryIndexRecord(String recordKey, String secondaryKey, String 
partitionPath, Boolean isDeleted) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey, 
recordKey), partitionPath);
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), new 
HoodieSecondaryIndexInfo(isDeleted));
     return new HoodieAvroRecord<>(key, payload);
   }
 
-  public String getRecordKeyFromSecondaryIndex() {
-    return secondaryIndexMetadata.getRecordKey();
+  public static String getRecordKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the primary key from the payload key
+    checkState(nonEmpty(key) && 
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for 
secondary index payload: " + key);
+    int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+    return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+  }
+
+  public static String getSecondaryKeyFromSecondaryIndexKey(String key) {

Review Comment:
   do these need to be public ? 
   Or feel free to move a Utils class and we can write UTs directly 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -819,8 +821,8 @@ protected Map<String, String> 
getSecondaryKeysForRecordKeys(List<String> recordK
 
     // Parallel lookup keys from each file slice
     Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
-    partitionFileSlices.parallelStream().forEach(partition -> {
-      Map<String, String> partialResult = 
reverseLookupSecondaryKeys(partitionName, recordKeys, partition);
+    partitionFileSlices.parallelStream().forEach(fileSlice -> {

Review Comment:
   no of file slice in secondary index is fixed right. So, why can't we hash 
and route deterministically to one file slice? 
   and so during lookup, for 1 secondary index key value to lookup, we can just 
look into 1 file slice rather than all file slices in sec index? 
   
   may be you discussed this point during design. Curious to know more details 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to