Davis-Zhang-Onehouse commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2178682534


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1692,9 +1710,14 @@ protected Pair<HoodieData<HoodieRecord>, 
List<HoodieFileGroupId>> tagRecordsWith
         hoodieFileGroupIdList.addAll(fileSlices.stream().map(fileSlice -> new 
HoodieFileGroupId(partitionName, 
fileSlice.getFileId())).collect(Collectors.toList()));
 
         List<FileSlice> finalFileSlices = fileSlices;
+        HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
+
+        // Determine key format once per partition to avoid repeated checks
+        boolean useSecondaryKeyForHashing = 
MetadataPartitionType.SECONDARY_INDEX.matchesPartitionPath(partitionName) && 
indexVersion.greaterThanOrEquals(HoodieIndexVersion.V2);
+        SerializableFunction<String, SerializableFunction<Integer, Integer>> 
mappingFunction =

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -592,9 +915,14 @@ record -> {
               return null;
             })
         .filter(Objects::nonNull)
-        .collectAsList()
-        .stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+        .collectAsList();
+
+    Map<String, Set<String>> res = new HashMap<>();

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java:
##########
@@ -26,15 +26,13 @@ public class SecondaryIndexKeyUtils {
   public static String getRecordKeyFromSecondaryIndexKey(String key) {
     // the payload key is in the format of "secondaryKey$primaryKey"
     // we need to extract the primary key from the payload key
-    checkState(key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid 
key format for secondary index payload: " + key);
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);

Review Comment:
   that requires all caller to do if else to detect "this is SI and it is of 
index v2".
   I just remove this check as 
   - getSecondaryIndexKeySeparatorPosition will throw if there is no unescpaed $
   - the check state just check any $, not unescaped $, which is misleading



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -466,14 +419,33 @@ private void 
checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri
     }
   }
 
+  /**
+   * Retrieves a single record from the metadata table by its key.
+   *
+   * @param key The escaped/encoded key to look up in the metadata table
+   * @param partitionName The partition name where the record is stored
+   * @return Option containing the record if found, empty Option if not found
+   */
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
 
-  protected abstract Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName);
+  /**
+   * Retrieves a map of (key -> record) from the metadata table by its keys.
+   *
+   * @param keys The to look up in the metadata table
+   * @param partitionName The partition name where the records are stored
+   * @return A map of (key -> record)
+   */
+  public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeysWithMapping(

Review Comment:
   not sure if you are aware, a new API with almost same signature is introduced
   
   HoodieData<HoodieRecordGlobalLocation> 
readRecordIndexResult(HoodieData<String> recordKeys) 
   
   it has the same input yet different output. We must have some differentiator 
in the name otherwise code will not compile.
   
   I keep the old name and the name new API with a best guess.
   
   We have APIs that take key and return file location, take secondary key and 
return record key, and variants of result only as output or a mapping from key 
-> output.
   
   Too generic naming without enough differentiator makes functionality 
confusing to users.
   



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java:
##########
@@ -187,4 +198,27 @@ public int deduceNumPartitions() {
       return pairRDDData.getNumPartitions();
     }
   }
+
+  @Override
+  public HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   
   and you will read the design doc, run the toy example and we decide the next 
step.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -455,6 +455,47 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "The index name either starts with or matches exactly can be one 
of the following: "
           + 
StringUtils.join(Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()),
 ", "));
 
+  // Range-based repartitioning configuration for metadata table lookups

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:
+   * 1. The keys are sorted within each partition.
+   * 2. There is at most only 1 key per partition.
+   * 3. For partitions containing entries of the same key, the value ranges 
are not overlapping.
+   * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+   *
+   * @param keyRange The range of keys to partition across (0 to keyRange 
inclusive). It must cover all possible keys
+   *                 in the RDD. Keys covered in range but not in the RDD are 
ignored.
+   * @param sampleFraction Fraction of data to sample for determining value 
range per bucket points (between 0 and 1).
+   * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+   * @param seed Random seed for sampling
+   * @return Repartitioned RDD

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:
+   * 1. The keys are sorted within each partition.
+   * 2. There is at most only 1 key per partition.
+   * 3. For partitions containing entries of the same key, the value ranges 
are not overlapping.
+   * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+   *
+   * @param keyRange The range of keys to partition across (0 to keyRange 
inclusive). It must cover all possible keys
+   *                 in the RDD. Keys covered in range but not in the RDD are 
ignored.
+   * @param sampleFraction Fraction of data to sample for determining value 
range per bucket points (between 0 and 1).
+   * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+   * @param seed Random seed for sampling
+   * @return Repartitioned RDD
+   */
+  HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(

Review Comment:
   check commits
   c45c2534d2d processValuesOfTheSameShards key should be generic type
   806a940f0ee processValuesOfTheSameShards value should be generic type
   c25eb092eeb index lookup APIs implmentation 3
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -592,9 +915,14 @@ record -> {
               return null;
             })
         .filter(Objects::nonNull)
-        .collectAsList()
-        .stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+        .collectAsList();
+
+    Map<String, Set<String>> res = new HashMap<>();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to