vinothchandar commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2175751122


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1692,9 +1710,14 @@ protected Pair<HoodieData<HoodieRecord>, 
List<HoodieFileGroupId>> tagRecordsWith
         hoodieFileGroupIdList.addAll(fileSlices.stream().map(fileSlice -> new 
HoodieFileGroupId(partitionName, 
fileSlice.getFileId())).collect(Collectors.toList()));
 
         List<FileSlice> finalFileSlices = fileSlices;
+        HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
+
+        // Determine key format once per partition to avoid repeated checks
+        boolean useSecondaryKeyForHashing = 
MetadataPartitionType.SECONDARY_INDEX.matchesPartitionPath(partitionName) && 
indexVersion.greaterThanOrEquals(HoodieIndexVersion.V2);
+        SerializableFunction<String, SerializableFunction<Integer, Integer>> 
mappingFunction =

Review Comment:
   revisit



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java:
##########
@@ -72,6 +69,18 @@ public static void validateTableVersion(HoodieTableConfig 
tableConfig, HoodieWri
     }
   }
 
+  private static boolean 
isValidTableVersionWriteVersionPair(HoodieTableVersion tableVersion, 
HoodieTableVersion writeVersion) {

Review Comment:
   lets UT this



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1256,11 +1258,27 @@ protected Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> tagRecordsWith
       });
     }
 
+    Map<String, HoodieIndexVersion> indexVersions = new HashMap<>();
+    partitionToLatestFileSlices.keySet().forEach(
+        mdtPartition -> indexVersions.put(mdtPartition, 
existingIndexVersionOrDefault(mdtPartition, metadataMetaClient)));
+    
+    // For each partition, determine the key format once and create the 
appropriate mapping function
+    Map<String, SerializableFunction<String, SerializableFunction<Integer, 
Integer>>> partitionMappingFunctions = new HashMap<>();

Review Comment:
   two `forEach` calls adds more complexity to reader? I combined them, 
eliminated the first map



##########
hudi-common/src/main/java/org/apache/hudi/common/function/SerializableFunctionPairOut.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.function;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+
+/**

Review Comment:
   is this AI generated or sth. this is a lot of comments for what this 
interface does.. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NineToEightDowngradeHandler implements DowngradeHandler {
+
+  @Override
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(

Review Comment:
   this needs to be there. and we can scope this down to only dropping index 
versions 2? 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/HoodieJavaPairRDDTest.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class HoodieJavaPairRDDTest {

Review Comment:
   naming



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+  @Override
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(

Review Comment:
   overall - we introduced the versioning of index def now. So, can we not drop 
SI/EI during upgrade. instead we instruct the user to drop and recreate for 
better performance? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+  @Override
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(

Review Comment:
   Does this handle EI



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/partitioner/ConditionalRangePartitionerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data.partitioner;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConditionalRangePartitionerTest {

Review Comment:
   name it like rest of code base? TestConditional...?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -145,20 +172,38 @@ public static Builder newBuilder() {
     return new Builder();
   }
 
+  /**
+   * Create a new Builder pre-populated with values from this instance.
+   */
+  public Builder toBuilder() {
+    Builder builder = new Builder();
+    builder.withIndexName(this.indexName)
+        .withIndexType(this.indexType)
+        .withIndexFunction(this.indexFunction)
+        .withSourceFields(new ArrayList<>(this.sourceFields))
+        .withIndexOptions(new HashMap<>(this.indexOptions))
+        .withVersion(this.version);
+    return builder;
+  }
+
   public static class Builder {
 
     private String indexName;
     private String indexType;
     private String indexFunction;
     private List<String> sourceFields;
     private Map<String, String> indexOptions;
+    private HoodieIndexVersion version;
 
     public Builder() {
       this.sourceFields = new ArrayList<>();
       this.indexOptions = new HashMap<>();
+      this.version = null;
     }
 
     public Builder withIndexName(String indexName) {
+      // Validate the index name belongs to a valid partition path. Function 
throws exception if it is a random index name.
+      MetadataPartitionType.fromPartitionPath(indexName);

Review Comment:
   if we are validating sth, please explicitly call 
`ValidationUtils.checkArgument`



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -455,6 +455,47 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "The index name either starts with or matches exactly can be one 
of the following: "
           + 
StringUtils.join(Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()),
 ", "));
 
+  // Range-based repartitioning configuration for metadata table lookups

Review Comment:
   do we need all these configs? we should just use whats already present for 
non range repartitioning?



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.

Review Comment:
   can we throw an exception for this case with a `throws` clause



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:

Review Comment:
   can't assume RDD



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java:
##########
@@ -187,4 +198,27 @@ public int deduceNumPartitions() {
       return pairRDDData.getNumPartitions();
     }
   }
+
+  @Override
+  public HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(

Review Comment:
   why not just leverage spark sort.. which does a bunch of this.



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:
+   * 1. The keys are sorted within each partition.
+   * 2. There is at most only 1 key per partition.
+   * 3. For partitions containing entries of the same key, the value ranges 
are not overlapping.
+   * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+   *
+   * @param keyRange The range of keys to partition across (0 to keyRange 
inclusive). It must cover all possible keys
+   *                 in the RDD. Keys covered in range but not in the RDD are 
ignored.
+   * @param sampleFraction Fraction of data to sample for determining value 
range per bucket points (between 0 and 1).
+   * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+   * @param seed Random seed for sampling
+   * @return Repartitioned RDD
+   */
+  HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(

Review Comment:
   Can we use a single term `partition` instead of bucket..



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * WARNING: It is caller's responsibility to ensure that it is of <Integer, 
String> type.
+   *
+   * Repartitions the RDD based on key ranges so that:
+   * 1. The keys are sorted within each partition.
+   * 2. There is at most only 1 key per partition.
+   * 3. For partitions containing entries of the same key, the value ranges 
are not overlapping.
+   * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+   *
+   * @param keyRange The range of keys to partition across (0 to keyRange 
inclusive). It must cover all possible keys
+   *                 in the RDD. Keys covered in range but not in the RDD are 
ignored.
+   * @param sampleFraction Fraction of data to sample for determining value 
range per bucket points (between 0 and 1).
+   * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+   * @param seed Random seed for sampling
+   * @return Repartitioned RDD

Review Comment:
   this is the base class.. so lets avoid mentions of RDD



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,22 +602,99 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeys(String parti
         .withLength(Long.MAX_VALUE)
         .withShouldUseRecordPosition(false)
         .build();
-         ClosableIterator<IndexedRecord> it = 
fileGroupReader.getClosableIterator()) {
-      Map<String, HoodieRecord<HoodieMetadataPayload>> records = new 
HashMap<>();
-      while (it.hasNext()) {
-        GenericRecord metadataRecord = (GenericRecord) it.next();
-        HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
-        String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
-        HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
-        records.put(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
-      }
-      return records;
+  }
+
+  private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithMapping(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    Map<String, HoodieRecord<HoodieMetadataPayload>> map = new HashMap<>();
+    try (ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
iterator =
+             lookupRecordsWithMappingIter(partitionName, sortedKeys, 
fileSlice, isFullKey, keyEncoder)) {
+      iterator.forEachRemaining(entry -> map.put(entry.getKey(), 
entry.getValue()));
+    }
+    return HoodieDataUtils.eagerMapKV(map);
+  }
+
+  private HoodieListData<HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithoutMapping(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    List<HoodieRecord<HoodieMetadataPayload>> res = new ArrayList<>();
+    try (ClosableIterator<HoodieRecord<HoodieMetadataPayload>> iterator =
+             lookupRecordsWithoutMappingIter(partitionName, sortedKeys, 
fileSlice, isFullKey, keyEncoder)) {
+      iterator.forEachRemaining(entry -> res.add(entry));
+    }
+    return HoodieListData.eager(res);
+  }
+
+  private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
lookupRecordsWithMappingIter(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    return lookupRecords(sortedKeys, fileSlice, isFullKey, metadataRecord -> {
+      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+      String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
+      HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+      return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
+    }, keyEncoder);
+  }
+
+  private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithoutMappingIter(
+      String partitionName,
+      List<String> keys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    return lookupRecords(keys, fileSlice, isFullKey,
+        metadataRecord -> {
+          HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+          return new HoodieAvroRecord<>(new HoodieKey(payload.key, 
partitionName), payload);
+        }, keyEncoder);
+  }
+
+  /**
+   * Lookup records and produce a lazy iterator of mapped HoodieRecords.
+   */
+  private <T> ClosableIterator<T> lookupRecords(
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      RecordLookupTransformer<T> transformer,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    // If no keys to lookup, we must return early, otherwise, the hfile lookup 
will return all records.
+    if (sortedKeys.isEmpty()) {
+      return new EmptyIterator<>();
+    }
+    try {
+      HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
buildFileGroupReader(sortedKeys, fileSlice, isFullKey, keyEncoder);
+      ClosableIterator<IndexedRecord> rawIterator = 
fileGroupReader.getClosableIterator();
+
+      return new CloseableMappingIterator<>(rawIterator, record -> {
+        GenericRecord metadataRecord = (GenericRecord) record;
+        try {
+          return transformer.apply(metadataRecord);
+        } catch (IOException e) {
+          throw new HoodieIOException("Error processing record with key " + 
new HoodieMetadataPayload(Option.of(metadataRecord)).key, e);
+        }
+      });
     } catch (IOException e) {
-      throw new HoodieIOException(
-          "Error merging records from metadata table for " + keys.size() + " 
keys : ", e);
+      throw new HoodieIOException("Error merging records from metadata table 
for " + sortedKeys.size() + " keys", e);
     }
   }
 
+  // Functional interface for generic payload transformation
+  @FunctionalInterface
+  private interface RecordLookupTransformer<T> {

Review Comment:
   can't we just use a SerializableFunction.. for sth to be a functional 
interface, I'd like to see some extended usage



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -277,61 +267,295 @@ private Predicate 
transformKeyPrefixesToPredicate(List<String> keyPrefixes) {
     return Predicates.startsWithAny(null, right);
   }
 
-  @Override
-  protected Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName) {
-    if (keys.isEmpty()) {
-      return Collections.emptyMap();
+  private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
doLookupWithMapping(
+      HoodieData<String> keys, String partitionName, List<FileSlice> 
fileSlices, boolean isSecondaryIndex,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+
+    final int numFileSlices = fileSlices.size();
+    if (numFileSlices == 1) {
+      TreeSet<String> distinctSortedKeys = new TreeSet<>(keys.collectAsList());
+      return lookupRecordsWithMapping(partitionName, new 
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex, 
keyEncoder);
     }
 
-    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+    HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
+    SerializableFunction<String, SerializableFunction<Integer, Integer>> 
mappingFunction =
+        
HoodieTableMetadataUtil.getRecordKeyToFileGroupIndexFunction(partitionName, 
indexVersion, false);
+    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
indexVersion);
+    if (keys instanceof HoodieListData) {

Review Comment:
   as far as I can tell, there is no special `instanceof HoodieListData` in 
existing code. can we avoid this.. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -508,6 +827,10 @@ public HoodieTableFileSystemView 
getMetadataFileSystemView() {
     return metadataFileSystemView;
   }
 
+  public HoodieMetadataConfig getMetadataConfig() {

Review Comment:
   do we need this



##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java:
##########
@@ -26,15 +26,13 @@ public class SecondaryIndexKeyUtils {
   public static String getRecordKeyFromSecondaryIndexKey(String key) {
     // the payload key is in the format of "secondaryKey$primaryKey"
     // we need to extract the primary key from the payload key
-    checkState(key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid 
key format for secondary index payload: " + key);
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);

Review Comment:
   is there sth that can validate for both v1 and v2.. that we can add



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -466,14 +419,33 @@ private void 
checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri
     }
   }
 
+  /**
+   * Retrieves a single record from the metadata table by its key.
+   *
+   * @param key The escaped/encoded key to look up in the metadata table
+   * @param partitionName The partition name where the record is stored
+   * @return Option containing the record if found, empty Option if not found
+   */
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
 
-  protected abstract Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName);
+  /**
+   * Retrieves a map of (key -> record) from the metadata table by its keys.
+   *
+   * @param keys The to look up in the metadata table
+   * @param partitionName The partition name where the records are stored
+   * @return A map of (key -> record)
+   */
+  public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeysWithMapping(

Review Comment:
   +1. lets fix javadocs.. to reflect the new return type. I think its okay to 
keep the name. the differences you are highlighting are impl details. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -309,18 +311,21 @@ public Map<Pair<String, String>, 
List<HoodieMetadataColumnStats>> getColumnStats
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean 
shouldLoadInMemory) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
+                                                                               
  String partitionName,
+                                                                               
  boolean shouldLoadInMemory,
+                                                                               
  Option<SerializableFunction<String, String>> keyEncoder) {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
   @Override
-  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {
-    throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexWithMapping(HoodieData<String> recordKeys) {

Review Comment:
   please revert these name changes. and one below



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
     if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
       return indexMetadataOpt;
     }
+    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
     if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      StoragePath indexDefinitionPath =
-          new StoragePath(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get());
-      try {
-        Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
-        if (bytesOpt.isPresent()) {
-          return Option.of(HoodieIndexMetadata.fromJson(new 
String(bytesOpt.get())));
-        } else {
-          return Option.of(new HoodieIndexMetadata());
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not load index definition at path: 
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+      indexDefOption = loadIndexDefFromStorage(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+    }
+    populateIndexVersionIfMissing(tableConfig.getTableVersion(), 
indexDefOption);
+    return indexDefOption;
+  }
+
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {
+    indexDefOption.ifPresent(idxDefs ->
+        idxDefs.getIndexDefinitions().replaceAll((indexName, idxDef) -> {
+          
ValidationUtils.checkArgument(HoodieIndexVersion.isValidIndexDefinition(tableVersion,
 idxDef),
+              String.format("Table version %s, index definition %s", 
tableVersion, idxDef));
+          if (idxDef.getVersion() == null) {
+            // If version field is missing, it implies either of the cases 
(validated by isValidIndexDefinition):
+            // - It is table version 8, because we don't write version 
attributes in some hudi releases
+            // - It is table version 9, and it is not secondary index. Since 
we drop SI on upgrade and we always write version attributes.
+            return 
idxDef.toBuilder().withVersion(getCurrentVersion(tableVersion, 
idxDef.getIndexName())).build();

Review Comment:
   lets add tests to validate the index defs are written with valid index 
versions on storage.. (if we don't have em). 
   
   instead of all this - can we just write index version as a part of upgrade 
to table version 9? i.e read and one time rewrite the index defs.. 
   
   Then the regular path becomes easy : if there is no index def version i.e. 
null (only possible due to a table version < 9 read by table version >= 9 
process), then we assume v1..
   
   This vastly simplifies ongoing maintenance.
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -592,9 +915,14 @@ record -> {
               return null;
             })
         .filter(Objects::nonNull)
-        .collectAsList()
-        .stream()
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+        .collectAsList();
+
+    Map<String, Set<String>> res = new HashMap<>();

Review Comment:
   is there way to just write this more nicely using java streams using 
map/Option.map.orElse



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
     if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
       return indexMetadataOpt;
     }
+    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
     if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      StoragePath indexDefinitionPath =
-          new StoragePath(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get());
-      try {
-        Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
-        if (bytesOpt.isPresent()) {
-          return Option.of(HoodieIndexMetadata.fromJson(new 
String(bytesOpt.get())));
-        } else {
-          return Option.of(new HoodieIndexMetadata());
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not load index definition at path: 
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+      indexDefOption = loadIndexDefFromStorage(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+    }
+    populateIndexVersionIfMissing(tableConfig.getTableVersion(), 
indexDefOption);
+    return indexDefOption;
+  }
+
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {
+    indexDefOption.ifPresent(idxDefs ->
+        idxDefs.getIndexDefinitions().replaceAll((indexName, idxDef) -> {
+          
ValidationUtils.checkArgument(HoodieIndexVersion.isValidIndexDefinition(tableVersion,
 idxDef),
+              String.format("Table version %s, index definition %s", 
tableVersion, idxDef));
+          if (idxDef.getVersion() == null) {
+            // If version field is missing, it implies either of the cases 
(validated by isValidIndexDefinition):
+            // - It is table version 8, because we don't write version 
attributes in some hudi releases
+            // - It is table version 9, and it is not secondary index. Since 
we drop SI on upgrade and we always write version attributes.

Review Comment:
   yes. we can simply and assume that if its `null`, we just return v1. going 
forward we assert/enforce version is always written. I had a comment on this in 
the other PR as wel.. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to