This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d040cf4d73d8 [HUDI-9505] Bug fix for getSecondaryIndexRecords API 
(#13622)
d040cf4d73d8 is described below

commit d040cf4d73d8ce799a704d41e703b4cd77768822
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Mon Jul 28 13:16:02 2025 -0700

    [HUDI-9505] Bug fix for getSecondaryIndexRecords API (#13622)
---
 .../apache/hudi/common/util/HoodieDataUtils.java   |  62 ++++++++--
 .../apache/hudi/metadata/BaseTableMetadata.java    |   7 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  50 ++------
 .../common/data/TestHoodieListDataPairData.java    |  62 ++++++++++
 .../sql/hudi/common/TestHoodieDataUtils.scala      | 126 +++++++++++++++++++++
 .../testHoodieBackedTableMetadataIndexLookup.scala | 118 +++++++++++--------
 .../utilities/HoodieMetadataTableValidator.java    |   2 +-
 7 files changed, 325 insertions(+), 102 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
index 50faa20da551..b5e98fb5cb2e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
@@ -21,8 +21,11 @@ package org.apache.hudi.common.util;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.util.collection.Pair;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
 /**
  * Utility class for HoodieData operations.
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
    * @return a Map containing the de-duplicated key-value pairs
    */
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
-    // Deduplicate locally before shuffling to reduce data movement
+    // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            pair.getValue()
+        ))
+        .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);
+              map.put(key, pair.getValue());
+            },
+            HashMap::putAll);
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> collectPairDataAsMap(HoodiePairData<K, 
V> pairData) {
+    // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
+    // If there are multiple entries sharing the same key, combine them into a 
set
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            Collections.singleton(pair.getValue())
+        ))
+        .reduceByKey((set1, set2) -> {
+          Set<V> combined = new HashSet<>(set1);
+          combined.addAll(set2);
+          return combined;
+        }, pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);
+              map.put(key, pair.getValue());
+            },
+            HashMap::putAll);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1519e38de244..74240af5e400 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -441,13 +441,14 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
           HoodieData<String> keys, String partitionName, 
SerializableFunctionUnchecked<String, String> keyEncodingFn);
 
   /**
-   * Returns a collection of pairs (secondary-key -> set-of-record-keys) for 
the provided secondary keys.
+   * Returns a collection of pairs (secondary-key -> record-keys) for the 
provided secondary keys.
    *
    * @param keys The unescaped/decoded secondary keys to look up in the 
metadata table
    * @param partitionName The partition name where the secondary index records 
are stored
-   * @return A collection of pairs where each key is a secondary key and the 
value is a set of record keys that are indexed by that secondary key
+   * @return A collection of pairs where each key is a secondary key and the 
value is record key that are indexed by that secondary key.
+   * If a secondary key value is mapped to different record keys, they are 
tracked as multiple pairs for each of them.
    */
-  public abstract HoodiePairData<String, Set<String>> 
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
+  public abstract HoodiePairData<String, String> 
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
 
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 4075e352bf5f..1eeff4c6ed22 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -82,11 +82,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -223,7 +221,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory,
                                                                                
  SerializableFunctionUnchecked<String, String> keyEncodingFn) {
-    ValidationUtils.checkState(keyPrefixes instanceof HoodieListData, 
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
     // Apply key encoding
     List<String> sortedKeyPrefixes = new 
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
     // Sort the prefixes so that keys are looked up in order
@@ -470,7 +467,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         
dataMetaClient.getTableConfig().getMetadataPartitions().contains(partitionName),
         () -> "Secondary index is not initialized in MDT for: " + 
partitionName);
     // Fetch secondary-index records
-    Map<String, Set<String>> secondaryKeyRecords = 
HoodieDataUtils.dedupeAndCollectAsMap(
+    Map<String, Set<String>> secondaryKeyRecords = 
HoodieDataUtils.collectPairDataAsMap(
         
getSecondaryIndexRecords(HoodieListData.eager(secondaryKeys.collectAsList()), 
partitionName));
     // Now collect the record-keys and fetch the RLI records
     List<String> recordKeys = new ArrayList<>();
@@ -820,7 +817,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  public HoodiePairData<String, Set<String>> 
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String 
partitionName) {
+  public HoodiePairData<String, String> 
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String 
partitionName) {
     HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, dataMetaClient);
 
     if (indexVersion.equals(HoodieIndexVersion.V1)) {
@@ -832,49 +829,24 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     }
   }
 
-  private HoodiePairData<String, Set<String>> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
     if (keys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys, 
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
-            .map(record -> {
-              if (!record.getData().isDeleted()) {
-                return 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(record.getRecordKey());
-              }
-              return null;
-            })
-            .filter(Objects::nonNull)
-            .collectAsList()
-            .stream()
-            .collect(HashMap::new,
-                    (map, pair) -> map.computeIfAbsent(pair.getKey(), k -> new 
HashSet<>()).add(pair.getValue()),
-                    (map1, map2) -> map2.forEach((k, v) -> 
map1.computeIfAbsent(k, key -> new HashSet<>()).addAll(v)));
-
-
-    return HoodieListPairData.eager(
-            res.entrySet()
-                    .stream()
-                    .collect(Collectors.toMap(
-                            Map.Entry::getKey,
-                            entry -> 
Collections.singletonList(entry.getValue())
-                    ))
-    );
-  }
-
-  private HoodiePairData<String, Set<String>> 
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String 
partitionName) {
+    return getRecordsByKeyPrefixes(keys, partitionName, false, 
SecondaryIndexKeyUtils::escapeSpecialChars)
+        .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+        .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
+  }
+
+  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String 
partitionName) {
     if (secondaryKeys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
+
     return readIndexRecords(secondaryKeys, partitionName, 
SecondaryIndexKeyUtils::escapeSpecialChars)
         .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
-        .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getRecordKeySecondaryKeyPair(hoodieRecord.getRecordKey()))
-        .groupByKey()
-        .mapToPair(p -> {
-          HashSet<String> secKeys = new HashSet<>();
-          p.getValue().iterator().forEachRemaining(secKeys::add);
-          return Pair.of(p.getKey(), secKeys);
-        });
+        .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 07d463362797..90e7873a2fef 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.data;
 
+import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -34,9 +35,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -464,6 +467,65 @@ public class TestHoodieListDataPairData {
     assertEquals(Collections.emptyList(), filtered.collectAsList());
   }
 
+  @Test
+  public void testHoodieDataUtilsDedupeAndCollectAsMap() {
+    // Given: A lazy HoodieListPairData with duplicate keys and null keys
+    List<Pair<String, String>> testData = Arrays.asList(
+        Pair.of("key1", "value1"),
+        Pair.of("key1", "value2"),  // Duplicate key
+        Pair.of("key2", "value3"),
+        Pair.of(null, "nullValue1"),
+        Pair.of(null, "nullValue2"),  // Duplicate null key
+        Pair.of("key3", "value4")
+    );
+    HoodiePairData<String, String> lazyPairData = 
HoodieListPairData.lazy(testData);
+
+    Map<String, String> result = 
HoodieDataUtils.dedupeAndCollectAsMap(lazyPairData);
+
+    // hard code another map and assert equals
+    Map<String, String> expectedMap = new HashMap<>();
+    expectedMap.put("key1", "value2");  // Last value wins for duplicate keys
+    expectedMap.put("key2", "value3");
+    expectedMap.put("key3", "value4");
+    expectedMap.put(null, "nullValue2");  // Last value wins for null key too
+    
+    assertEquals(expectedMap, result);
+  }
+
+  @Test
+  public void testHoodieDataUtilsCollectPairDataAsMap() {
+    // Given: A lazy HoodieListPairData with duplicate keys and values
+    List<Pair<String, String>> testData = Arrays.asList(
+        Pair.of("key1", "value1a"),
+        Pair.of("key1", "value1b"),
+        Pair.of("key2", "value2"),
+        Pair.of(null, "nullValue1"),
+        Pair.of(null, "nullValue2"),
+        Pair.of(null, "nullValue1")  // Duplicate null value
+    );
+    HoodiePairData<String, String> lazyPairData = 
HoodieListPairData.lazy(testData);
+
+    Map<String, Set<String>> result = 
HoodieDataUtils.collectPairDataAsMap(lazyPairData);
+
+    // hard code another map and assert equals
+    Map<String, Set<String>> expectedMap = new HashMap<>();
+    Set<String> key1Values = new HashSet<>();
+    key1Values.add("value1a");
+    key1Values.add("value1b");
+    expectedMap.put("key1", key1Values);
+    
+    Set<String> key2Values = new HashSet<>();
+    key2Values.add("value2");
+    expectedMap.put("key2", key2Values);
+    
+    Set<String> nullKeyValues = new HashSet<>();
+    nullKeyValues.add("nullValue1");
+    nullKeyValues.add("nullValue2");
+    expectedMap.put(null, nullKeyValues);
+    
+    assertEquals(expectedMap, result);
+  }
+
   private static List<Pair<String, String>> constructPairs() {
     return Arrays.asList(
         ImmutablePair.of(KEY1, STRING_VALUE1),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
new file mode 100644
index 000000000000..6c320561afd2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.data.{HoodiePairData}
+import org.apache.hudi.common.util.HoodieDataUtils
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.data.HoodieJavaPairRDD
+
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test suite for HoodieDataUtils methods that handle null keys using RDD
+ */
+class TestHoodieDataUtils extends HoodieSparkSqlTestBase {
+
+  private var jsc: JavaSparkContext = _
+  private var context: HoodieSparkEngineContext = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    jsc = new JavaSparkContext(spark.sparkContext)
+    context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
+  }
+
+  override def afterAll(): Unit = {
+    if (jsc != null) {
+      jsc.close()
+    }
+    super.afterAll()
+  }
+
+  // Helper method to create a Pair from key and value
+  private def pair(key: String, value: String): Pair[String, String] = {
+    Pair.of(key, value)
+  }
+
+  test("Test dedupeAndCollectAsMap with null keys") {
+    // Given: A dataset with a mix of null and non-null keys, including 
duplicates
+    // When: dedupeAndCollectAsMap is called on the data
+    // Then: All non-null keys should be deduplicated normally, and null keys 
should be handled without causing reduceByKey failures
+    val testData = Seq(
+      pair("key1", "value1"),
+      pair("key2", "value2"),
+      pair(null, "nullValue1"),
+      pair("key1", "value1_dup"),  // Duplicate key
+      pair(null, "nullValue2"),    // Another null key
+      pair("key3", "value3")
+    )
+
+    val rdd = spark.sparkContext.parallelize(testData).toJavaRDD()
+    val pairRDD = rdd.mapToPair(p => (p.getKey, p.getValue))
+    val pairData: HoodiePairData[String, String] = 
HoodieJavaPairRDD.of(pairRDD)
+    val result = HoodieDataUtils.dedupeAndCollectAsMap(pairData)
+
+    // Hard code expected map and assert equals
+    val expectedMap = new java.util.HashMap[String, String]()
+    expectedMap.put("key1", "value1_dup")  // Last value wins
+    expectedMap.put("key2", "value2")
+    expectedMap.put("key3", "value3")
+    expectedMap.put(null, "nullValue2")     // Last null value wins
+
+    assert(result.equals(expectedMap), s"Expected $expectedMap but got 
$result")
+  }
+
+  test("Test CollectPairDataAsMap with null keys") {
+    // Given: A dataset with both null and non-null keys, where some keys have 
multiple values
+    // When: CollectPairDataAsMap is called on the data
+    // Then: Each key (including null) should map to a set containing all its 
unique values
+    val testData = Seq(
+      pair("key1", "value1a"),
+      pair("key1", "value1b"),
+      pair(null, "nullValue1"),
+      pair("key2", "value2"),
+      pair(null, "nullValue2"),
+      pair(null, "nullValue1"),  // Duplicate null value
+      pair("key1", "value1c")
+    )
+
+    val rdd = spark.sparkContext.parallelize(testData).toJavaRDD()
+    val pairRDD = rdd.mapToPair(p => (p.getKey, p.getValue))
+    val pairData: HoodiePairData[String, String] = 
HoodieJavaPairRDD.of(pairRDD)
+    val result = HoodieDataUtils.collectPairDataAsMap(pairData)
+
+    // Hard code expected map and assert equals
+    val expectedMap = new java.util.HashMap[String, java.util.Set[String]]()
+
+    val key1Values = new java.util.HashSet[String]()
+    key1Values.add("value1a")
+    key1Values.add("value1b")
+    key1Values.add("value1c")
+    expectedMap.put("key1", key1Values)
+
+    val key2Values = new java.util.HashSet[String]()
+    key2Values.add("value2")
+    expectedMap.put("key2", key2Values)
+
+    val nullValues = new java.util.HashSet[String]()
+    nullValues.add("nullValue1")
+    nullValues.add("nullValue2")
+    expectedMap.put(null, nullValues)
+
+    assert(result.equals(expectedMap), s"Expected $expectedMap but got 
$result")
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
index ad2655fa0e91..08c054e96b1b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
@@ -77,6 +77,8 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
        | )
        | location '$basePath'
        """.stripMargin
+  protected var jsc: JavaSparkContext = _
+  protected var context: HoodieSparkEngineContext = _
 
   /**
    * Get the table version for this test implementation
@@ -208,12 +210,11 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
 
     // Verify the table version
     metaClient.reload()
-    val jsc = new JavaSparkContext(spark.sparkContext)
+    jsc = new JavaSparkContext(spark.sparkContext)
     val sqlContext = new SQLContext(spark)
-    val context = new HoodieSparkEngineContext(jsc, sqlContext)
+    context = new HoodieSparkEngineContext(jsc, sqlContext)
     hoodieBackedTableMetadata = new HoodieBackedTableMetadata(
       context, metaClient.getStorage, writeConfig.getMetadataConfig, basePath, 
true)
-
   }
 
   /**
@@ -230,6 +231,8 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     basePath = null
     metaClient = null
     testData = null
+    jsc = null
+    context = null
   }
 
   /**
@@ -237,14 +240,14 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
    */
   protected def testReadRecordIndex(): Unit = {
     // Case 1: Empty input
-    val emptyResult = 
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
-    assert(emptyResult.collectAsList().isEmpty, "Empty input should return 
empty result")
+    val emptyResultRDD = 
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
+    val emptyResult = emptyResultRDD.collectAsList()
+    assert(emptyResult.isEmpty, "Empty input should return empty result")
 
     // Case 2: All existing keys including those with $ characters
     val allKeys = HoodieListData.eager(List("a1", "a2", "a$", "$a", "a$a", 
"$$").asJava)
-    val allResult = 
hoodieBackedTableMetadata.readRecordIndex(allKeys).collectAsList().asScala
-    assert(allResult.size == 6, "Should return 6 results for 6 existing keys")
-
+    val allResultRDD = hoodieBackedTableMetadata.readRecordIndex(allKeys)
+    val allResult = allResultRDD.collectAsList().asScala
     // Validate keys including special characters
     val resultKeys = allResult.map(_.getKey()).toSet
     assert(resultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Keys 
should match input keys including $ characters")
@@ -269,29 +272,31 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
 
     // Case 3: Non-existing keys, some matches the prefix of the existing 
records.
     val nonExistKeys = HoodieListData.eager(List("", "a", "a100", "200", "$", 
"a$$", "$$a", "$a$").asJava)
-    val nonExistResult = 
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys).collectAsList().asScala
-    assert(nonExistResult.isEmpty, "Non-existing keys should return empty 
result, $ should not cause partial matches")
+    val nonExistResultRDD = 
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys)
+    val nonExistResult = nonExistResultRDD.collectAsList().asScala
+    assert(nonExistResult.isEmpty, "Non-existing keys should return empty 
result")
 
     // Case 4: Mix of existing and non-existing keys
     val mixedKeys = HoodieListData.eager(List("a1", "a100", "a2", 
"a200").asJava)
-    val mixedResult = 
hoodieBackedTableMetadata.readRecordIndex(mixedKeys).collectAsList().asScala
-    assert(mixedResult.size == 2, "Should return 2 results for 2 existing 
keys")
+    val mixedResultRDD = hoodieBackedTableMetadata.readRecordIndex(mixedKeys)
+    val mixedResult = mixedResultRDD.collectAsList().asScala
     val mixedResultKeys = mixedResult.map(_.getKey()).toSet
     assert(mixedResultKeys == Set("a1", "a2"), "Should only return existing 
keys")
 
     // Case 5: Duplicate keys including those with $ characters
-    val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$", 
"a$", "$a", "$a", "a$a", "a$a", "$$", "$$").asJava)
-    val dupResult = 
hoodieBackedTableMetadata.readRecordIndex(dupKeys).collectAsList().asScala
-    assert(dupResult.size == 6, "Should return 6 unique results for duplicate 
keys")
+    val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$", 
"a$", "$a", "a$a", "a$a", "$a", "$$", "$$").asJava)
+    val dupResultRDD = hoodieBackedTableMetadata.readRecordIndex(dupKeys)
+    val dupResult = dupResultRDD.collectAsList().asScala
     val dupResultKeys = dupResult.map(_.getKey()).toSet
     assert(dupResultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Should 
deduplicate keys including those with $")
 
     // Case 6: Use parallelized RDD
-    val jsc = new JavaSparkContext(spark.sparkContext)
-    val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
+    jsc = new JavaSparkContext(spark.sparkContext)
+    context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
     val rddKeys = HoodieJavaRDD.of(List("a1", "a2", "a$").asJava, context, 2)
     val rddResult = hoodieBackedTableMetadata.readRecordIndex(rddKeys)
-    assert(rddResult.collectAsList().asScala.size == 3, "RDD input should 
return 3 results")
+    val rddResultKeys = rddResult.map(_.getKey()).collectAsList().asScala.toSet
+    assert(rddResultKeys == Set("a1", "a2", "a$"), "Should deduplicate keys 
including those with $")
   }
 
   /**
@@ -302,13 +307,15 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
     val secondaryIndexName = "secondary_index_idx_name"
 
     // Case 1: Empty input
-    val emptyResult = hoodieBackedTableMetadata.readSecondaryIndexLocations(
+    val emptyResultRDD = hoodieBackedTableMetadata.readSecondaryIndexLocations(
       HoodieListData.eager(List.empty[String].asJava), secondaryIndexName)
-    assert(emptyResult.collectAsList().isEmpty, s"Empty input should return 
empty result for table version ${getTableVersion}")
+    val emptyResult = emptyResultRDD.collectAsList()
+    assert(emptyResult.isEmpty, s"Empty input should return empty result for 
table version ${getTableVersion}")
 
     // Case 2: All existing secondary keys including those with $ characters
     val allSecondaryKeys = HoodieListData.eager(List("b1", "b2", "b$", 
"sec$key", "$sec$", "$$").asJava)
-    val allResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(allSecondaryKeys, 
secondaryIndexName).collectAsList().asScala
+    val allResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(allSecondaryKeys, 
secondaryIndexName)
+    val allResult = allResultRDD.collectAsList().asScala
     assert(allResult.size == 6, s"Should return 6 results for 6 existing 
secondary keys in table version ${getTableVersion}")
 
     // Validate HoodieRecordGlobalLocation structure
@@ -328,23 +335,27 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
 
     // Case 3: Non-existing secondary keys, some matches the prefix of 
existing records
     val nonExistKeys = HoodieListData.eager(List("", "b", "non_exist_1", 
"non_exist_2").asJava)
-    val nonExistResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(nonExistKeys, 
secondaryIndexName).collectAsList().asScala
+    val nonExistResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(nonExistKeys, 
secondaryIndexName)
+    val nonExistResult = nonExistResultRDD.collectAsList().asScala
     assert(nonExistResult.isEmpty, s"Non-existing secondary keys should return 
empty result for table version ${getTableVersion}")
 
     // Case 4: Mix of existing and non-existing secondary keys
     val mixedKeys = HoodieListData.eager(List("b1", "non_exist_1", "b2", 
"non_exist_2").asJava)
-    val mixedResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(mixedKeys, 
secondaryIndexName).collectAsList().asScala
+    val mixedResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(mixedKeys, 
secondaryIndexName)
+    val mixedResult = mixedResultRDD.collectAsList().asScala
     assert(mixedResult.size == 2, s"Should return 2 results for 2 existing 
secondary keys in table version ${getTableVersion}")
 
     // Case 5: Duplicate secondary keys
     val dupKeys = HoodieListData.eager(List("b1", "b1", "b2", "b2", "b$", 
"b$").asJava)
-    val dupResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(dupKeys, 
secondaryIndexName).collectAsList().asScala
+    val dupResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(dupKeys, 
secondaryIndexName)
+    val dupResult = dupResultRDD.collectAsList().asScala
     assert(dupResult.size == 3, s"Should return 3 unique results for duplicate 
secondary keys in table version ${getTableVersion}")
 
     // Case 6: Test with different secondary index (price column)
     val priceIndexName = "secondary_index_idx_price"
     val priceKeys = HoodieListData.eager(List("10.0", "20.0", "30.0").asJava)
-    val priceResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(priceKeys, 
priceIndexName).collectAsList().asScala
+    val priceResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(priceKeys, priceIndexName)
+    val priceResult = priceResultRDD.collectAsList().asScala
     assert(priceResult.size == 3, s"Should return 3 results for price 
secondary keys in table version ${getTableVersion}")
 
     // Case 7: Test invalid secondary index partition name
@@ -360,9 +371,9 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     // Case 9: Test large number of keys to exercise multiple file slices path
     val largeKeyList = (1 to 100).map(i => s"b$i").asJava
     val largeKeys = HoodieListData.eager(largeKeyList)
-    val largeResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(largeKeys, 
secondaryIndexName)
+    val largeResultRDD = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(largeKeys, 
secondaryIndexName)
     // Should not throw exception, even if no results found
-    assert(largeResult.collectAsList().size() == 2, "Large key list should 
return empty result for non-existing keys")
+    assert(largeResultRDD.collectAsList().size() == 2, "Large key list should 
return empty result for non-existing keys")
   }
 
   /**
@@ -380,23 +391,34 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
     val secondaryIndexName = "secondary_index_idx_name"
 
     // Test with existing secondary keys including those with $ characters
-    val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "sec$key", 
"$sec$", "$$").asJava)
+    val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "b$", "b1", 
"$$", "sec$key", "$sec$", "$$", null).asJava)
     val result = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(existingKeys, 
secondaryIndexName)
-    val resultMap = HoodieDataUtils.dedupeAndCollectAsMap(result)
+    val resultMap = HoodieDataUtils.collectPairDataAsMap(result)
 
     assert(resultMap.size == 6, s"Should return 6 results for existing 
secondary keys in table version ${getTableVersion}")
-
-    // Validate that each secondary key maps to a set of record keys
-    resultMap.asScala.foreach { case (secondaryKey, recordKeys) =>
-      assert(recordKeys.asScala.nonEmpty, s"Secondary key $secondaryKey should 
map to at least one record key")
-      assert(recordKeys.size == 1, s"Secondary key $secondaryKey should map to 
exactly one record key in this test")
-    }
+    assert(resultMap.asScala("b$").asScala == Set("a$"))
+    assert(resultMap.asScala("b1").asScala == Set("a1"))
+    assert(resultMap.asScala("b2").asScala == Set("a2"))
+    assert(resultMap.asScala("$$").asScala == Set("$$"))
+    assert(resultMap.asScala("sec$key").asScala == Set("$a"))
+    assert(resultMap.asScala("$sec$").asScala == Set("a$a"))
 
     // Test with non-existing secondary keys
-    val nonExistingKeys = HoodieListData.eager(List("non_exist_1", 
"non_exist_2").asJava)
+    val nonExistingKeys = HoodieListData.eager(List("", "b", "$", " ", null, 
"non_exist_1", "non_exist_2").asJava)
     val nonExistingResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(nonExistingKeys, 
secondaryIndexName)
-    val nonExistingMap = 
HoodieDataUtils.dedupeAndCollectAsMap(nonExistingResult)
+    val nonExistingMap = 
HoodieDataUtils.collectPairDataAsMap(nonExistingResult)
     assert(nonExistingMap.isEmpty, s"Should return empty result for 
non-existing secondary keys in table version ${getTableVersion}")
+
+    // Test with a mixture of existing and non-existing secondary keys
+    val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$", null, "$").asJava, 
context, 2)
+    val rddResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(rddKeys, secondaryIndexName)
+
+    // Collect and validate results
+    val rddResultMap = HoodieDataUtils.collectPairDataAsMap(rddResult)
+    assert(rddResultMap.size == 3, s"Should return 3 results for existing 
secondary keys with RDD input")
+    assert(resultMap.asScala("b$").asScala == Set("a$"))
+    assert(resultMap.asScala("b1").asScala == Set("a1"))
+    assert(resultMap.asScala("b2").asScala == Set("a2"))
   }
 }
 
@@ -416,8 +438,6 @@ class HoodieBackedTableMetadataIndexLookupV8TestBase 
extends HoodieBackedTableMe
   override protected def testVersionSpecificBehavior(): Unit = {
     // For version 1, test that it only supports HoodieListData
     val secondaryIndexName = "secondary_index_idx_name"
-    val jsc = new JavaSparkContext(spark.sparkContext)
-    val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
     val rddKeys = HoodieJavaRDD.of(List("b1").asJava, context, 1)
     checkExceptionContain(() => {
       hoodieBackedTableMetadata.readSecondaryIndexLocations(rddKeys, 
secondaryIndexName)
@@ -473,12 +493,10 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase 
extends HoodieBackedTableMe
   override protected def testVersionSpecificBehavior(): Unit = {
     // For version 2, test that it supports both HoodieListData and RDD
     val secondaryIndexName = "secondary_index_idx_name"
-    val jsc = new JavaSparkContext(spark.sparkContext)
-    val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
     val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$").asJava, context, 2)
     val rddResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(rddKeys, 
secondaryIndexName)
-    val locations = rddResult.collectAsList()
-    assert(locations.asScala.size == 3, "Version 2 should support RDD input")
+    // Collect and validate results
+    assert(rddResult.count() == 3, "Version 2 should support RDD input")
 
     // Test case for null values in secondary index
     testNullValueInSecondaryIndex()
@@ -524,7 +542,6 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase 
extends HoodieBackedTableMe
     val nullKeys = HoodieListData.eager(List(null.asInstanceOf[String]).asJava)
     val nullResult = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(nullKeys, 
secondaryIndexName)
     val nullLocations = nullResult.collectAsList().asScala
-
     // Verify that null lookup returns exactly 1 result (for the null_record 
we inserted)
     assert(nullLocations.size == 1, s"Secondary index lookup should return 
exactly 1 result for null value, but found ${nullLocations.size}")
 
@@ -540,15 +557,18 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase 
extends HoodieBackedTableMe
 
     // Test getSecondaryIndexRecords API with null value
     val nullRecordsResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(nullKeys, secondaryIndexName)
-    val nullRecordsMap = 
HoodieDataUtils.dedupeAndCollectAsMap(nullRecordsResult)
-
+    val nullRecordsMap = 
HoodieDataUtils.collectPairDataAsMap(nullRecordsResult)
     // Verify that null key maps to record keys.
     // Assert it is map with key as "null_record" -> value as set of "null"
-    assert(nullRecordsMap.keySet().asScala.toList.equals(Seq("null_record")))
-    assert(nullRecordsMap.get("null_record").asScala.toList.equals(Seq(null)))
-    // Clean up the test data
+    assert(nullRecordsMap.size() == 1)
+    assert(nullRecordsMap.get(null).asScala.equals(Set("null_record")))
+    // Clean up the null record and check again, there should not be any 
lookup result.
     spark.sql(s"delete from $tableName where id = 'null_record'")
     hoodieBackedTableMetadata.reset()
+
+    val nullResult2 = 
hoodieBackedTableMetadata.readSecondaryIndexLocations(nullKeys, 
secondaryIndexName)
+    // Verify that null lookup returns exactly 1 result (for the null_record 
we inserted)
+    assert(nullResult2.isEmpty, s"Secondary index lookup should return empty, 
but found ${nullLocations.size}")
   }
 }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 94c1f755fb55..859041129167 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1136,7 +1136,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
     for (int i = 0; i < numPartitions; i++) {
       List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
-      Map<String, Set<String>> mdtSecondaryKeyToRecordKeys = 
HoodieDataUtils.dedupeAndCollectAsMap(
+      Map<String, Set<String>> mdtSecondaryKeyToRecordKeys = 
HoodieDataUtils.collectPairDataAsMap(
           ((HoodieBackedTableMetadata) 
metadataContext.tableMetadata).getSecondaryIndexRecords(
               HoodieListData.lazy(secKeys), indexDefinition.getIndexName()));
       Map<String, Set<String>> fsSecondaryKeyToRecordKeys = 
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit, 
indexDefinition.getSourceFields().get(0), secKeys);


Reply via email to