This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d040cf4d73d8 [HUDI-9505] Bug fix for getSecondaryIndexRecords API
(#13622)
d040cf4d73d8 is described below
commit d040cf4d73d8ce799a704d41e703b4cd77768822
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Mon Jul 28 13:16:02 2025 -0700
[HUDI-9505] Bug fix for getSecondaryIndexRecords API (#13622)
---
.../apache/hudi/common/util/HoodieDataUtils.java | 62 ++++++++--
.../apache/hudi/metadata/BaseTableMetadata.java | 7 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 50 ++------
.../common/data/TestHoodieListDataPairData.java | 62 ++++++++++
.../sql/hudi/common/TestHoodieDataUtils.scala | 126 +++++++++++++++++++++
.../testHoodieBackedTableMetadataIndexLookup.scala | 118 +++++++++++--------
.../utilities/HoodieMetadataTableValidator.java | 2 +-
7 files changed, 325 insertions(+), 102 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
index 50faa20da551..b5e98fb5cb2e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java
@@ -21,8 +21,11 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.util.collection.Pair;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
/**
* Utility class for HoodieData operations.
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
* @return a Map containing the de-duplicated key-value pairs
*/
public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V>
pairData) {
- // Deduplicate locally before shuffling to reduce data movement
+ // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
// If there are multiple entries sharing the same key, use the incoming one
- return pairData.reduceByKey((existing, incoming) -> incoming,
pairData.deduceNumPartitions())
- .collectAsList()
- .stream()
- .collect(Collectors.toMap(
- Pair::getKey,
- Pair::getValue
- ));
+ return pairData.mapToPair(pair ->
+ Pair.of(
+ Option.ofNullable(pair.getKey()),
+ pair.getValue()
+ ))
+ .reduceByKey((existing, incoming) -> incoming,
pairData.deduceNumPartitions())
+ .collectAsList()
+ .stream()
+ .collect(HashMap::new,
+ (map, pair) -> {
+ K key = pair.getKey().orElse(null);
+ map.put(key, pair.getValue());
+ },
+ HashMap::putAll);
}
-}
\ No newline at end of file
+
+ /**
+ * Collects results of the pair data into a {@link Map<K, Set<V>>} where
values with the same key
+ * are grouped into a set.
+ *
+ * @param pairData Hoodie Pair Data to be collected
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a Map containing keys mapped to sets of values
+ */
+ public static <K, V> Map<K, Set<V>> collectPairDataAsMap(HoodiePairData<K,
V> pairData) {
+ // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
+ // If there are multiple entries sharing the same key, combine them into a
set
+ return pairData.mapToPair(pair ->
+ Pair.of(
+ Option.ofNullable(pair.getKey()),
+ Collections.singleton(pair.getValue())
+ ))
+ .reduceByKey((set1, set2) -> {
+ Set<V> combined = new HashSet<>(set1);
+ combined.addAll(set2);
+ return combined;
+ }, pairData.deduceNumPartitions())
+ .collectAsList()
+ .stream()
+ .collect(HashMap::new,
+ (map, pair) -> {
+ K key = pair.getKey().orElse(null);
+ map.put(key, pair.getValue());
+ },
+ HashMap::putAll);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1519e38de244..74240af5e400 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -441,13 +441,14 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
HoodieData<String> keys, String partitionName,
SerializableFunctionUnchecked<String, String> keyEncodingFn);
/**
- * Returns a collection of pairs (secondary-key -> set-of-record-keys) for
the provided secondary keys.
+ * Returns a collection of pairs (secondary-key -> record-keys) for the
provided secondary keys.
*
* @param keys The unescaped/decoded secondary keys to look up in the
metadata table
* @param partitionName The partition name where the secondary index records
are stored
- * @return A collection of pairs where each key is a secondary key and the
value is a set of record keys that are indexed by that secondary key
+ * @return A collection of pairs where each key is a secondary key and the
value is record key that are indexed by that secondary key.
+ * If a secondary key value is mapped to different record keys, they are
tracked as multiple pairs for each of them.
*/
- public abstract HoodiePairData<String, Set<String>>
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
+ public abstract HoodiePairData<String, String>
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 4075e352bf5f..1eeff4c6ed22 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -82,11 +82,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -223,7 +221,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
String partitionName,
boolean shouldLoadInMemory,
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
- ValidationUtils.checkState(keyPrefixes instanceof HoodieListData,
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
// Apply key encoding
List<String> sortedKeyPrefixes = new
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
// Sort the prefixes so that keys are looked up in order
@@ -470,7 +467,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
dataMetaClient.getTableConfig().getMetadataPartitions().contains(partitionName),
() -> "Secondary index is not initialized in MDT for: " +
partitionName);
// Fetch secondary-index records
- Map<String, Set<String>> secondaryKeyRecords =
HoodieDataUtils.dedupeAndCollectAsMap(
+ Map<String, Set<String>> secondaryKeyRecords =
HoodieDataUtils.collectPairDataAsMap(
getSecondaryIndexRecords(HoodieListData.eager(secondaryKeys.collectAsList()),
partitionName));
// Now collect the record-keys and fetch the RLI records
List<String> recordKeys = new ArrayList<>();
@@ -820,7 +817,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- public HoodiePairData<String, Set<String>>
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String
partitionName) {
+ public HoodiePairData<String, String>
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String
partitionName) {
HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, dataMetaClient);
if (indexVersion.equals(HoodieIndexVersion.V1)) {
@@ -832,49 +829,24 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
}
- private HoodiePairData<String, Set<String>>
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+ private HoodiePairData<String, String>
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
if (keys.isEmpty()) {
return HoodieListPairData.eager(Collections.emptyList());
}
- Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys,
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
- .map(record -> {
- if (!record.getData().isDeleted()) {
- return
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(record.getRecordKey());
- }
- return null;
- })
- .filter(Objects::nonNull)
- .collectAsList()
- .stream()
- .collect(HashMap::new,
- (map, pair) -> map.computeIfAbsent(pair.getKey(), k -> new
HashSet<>()).add(pair.getValue()),
- (map1, map2) -> map2.forEach((k, v) ->
map1.computeIfAbsent(k, key -> new HashSet<>()).addAll(v)));
-
-
- return HoodieListPairData.eager(
- res.entrySet()
- .stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- entry ->
Collections.singletonList(entry.getValue())
- ))
- );
- }
-
- private HoodiePairData<String, Set<String>>
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String
partitionName) {
+ return getRecordsByKeyPrefixes(keys, partitionName, false,
SecondaryIndexKeyUtils::escapeSpecialChars)
+ .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+ .mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
+ }
+
+ private HoodiePairData<String, String>
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String
partitionName) {
if (secondaryKeys.isEmpty()) {
return HoodieListPairData.eager(Collections.emptyList());
}
+
return readIndexRecords(secondaryKeys, partitionName,
SecondaryIndexKeyUtils::escapeSpecialChars)
.filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
- .mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getRecordKeySecondaryKeyPair(hoodieRecord.getRecordKey()))
- .groupByKey()
- .mapToPair(p -> {
- HashSet<String> secKeys = new HashSet<>();
- p.getValue().iterator().forEachRemaining(secKeys::add);
- return Pair.of(p.getKey(), secKeys);
- });
+ .mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 07d463362797..90e7873a2fef 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.data;
+import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -34,9 +35,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -464,6 +467,65 @@ public class TestHoodieListDataPairData {
assertEquals(Collections.emptyList(), filtered.collectAsList());
}
+ @Test
+ public void testHoodieDataUtilsDedupeAndCollectAsMap() {
+ // Given: A lazy HoodieListPairData with duplicate keys and null keys
+ List<Pair<String, String>> testData = Arrays.asList(
+ Pair.of("key1", "value1"),
+ Pair.of("key1", "value2"), // Duplicate key
+ Pair.of("key2", "value3"),
+ Pair.of(null, "nullValue1"),
+ Pair.of(null, "nullValue2"), // Duplicate null key
+ Pair.of("key3", "value4")
+ );
+ HoodiePairData<String, String> lazyPairData =
HoodieListPairData.lazy(testData);
+
+ Map<String, String> result =
HoodieDataUtils.dedupeAndCollectAsMap(lazyPairData);
+
+ // hard code another map and assert equals
+ Map<String, String> expectedMap = new HashMap<>();
+ expectedMap.put("key1", "value2"); // Last value wins for duplicate keys
+ expectedMap.put("key2", "value3");
+ expectedMap.put("key3", "value4");
+ expectedMap.put(null, "nullValue2"); // Last value wins for null key too
+
+ assertEquals(expectedMap, result);
+ }
+
+ @Test
+ public void testHoodieDataUtilsCollectPairDataAsMap() {
+ // Given: A lazy HoodieListPairData with duplicate keys and values
+ List<Pair<String, String>> testData = Arrays.asList(
+ Pair.of("key1", "value1a"),
+ Pair.of("key1", "value1b"),
+ Pair.of("key2", "value2"),
+ Pair.of(null, "nullValue1"),
+ Pair.of(null, "nullValue2"),
+ Pair.of(null, "nullValue1") // Duplicate null value
+ );
+ HoodiePairData<String, String> lazyPairData =
HoodieListPairData.lazy(testData);
+
+ Map<String, Set<String>> result =
HoodieDataUtils.collectPairDataAsMap(lazyPairData);
+
+ // hard code another map and assert equals
+ Map<String, Set<String>> expectedMap = new HashMap<>();
+ Set<String> key1Values = new HashSet<>();
+ key1Values.add("value1a");
+ key1Values.add("value1b");
+ expectedMap.put("key1", key1Values);
+
+ Set<String> key2Values = new HashSet<>();
+ key2Values.add("value2");
+ expectedMap.put("key2", key2Values);
+
+ Set<String> nullKeyValues = new HashSet<>();
+ nullKeyValues.add("nullValue1");
+ nullKeyValues.add("nullValue2");
+ expectedMap.put(null, nullKeyValues);
+
+ assertEquals(expectedMap, result);
+ }
+
private static List<Pair<String, String>> constructPairs() {
return Arrays.asList(
ImmutablePair.of(KEY1, STRING_VALUE1),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
new file mode 100644
index 000000000000..6c320561afd2
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieDataUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.data.{HoodiePairData}
+import org.apache.hudi.common.util.HoodieDataUtils
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.data.HoodieJavaPairRDD
+
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test suite for HoodieDataUtils methods that handle null keys using RDD
+ */
+class TestHoodieDataUtils extends HoodieSparkSqlTestBase {
+
+ private var jsc: JavaSparkContext = _
+ private var context: HoodieSparkEngineContext = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ jsc = new JavaSparkContext(spark.sparkContext)
+ context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
+ }
+
+ override def afterAll(): Unit = {
+ if (jsc != null) {
+ jsc.close()
+ }
+ super.afterAll()
+ }
+
+ // Helper method to create a Pair from key and value
+ private def pair(key: String, value: String): Pair[String, String] = {
+ Pair.of(key, value)
+ }
+
+ test("Test dedupeAndCollectAsMap with null keys") {
+ // Given: A dataset with a mix of null and non-null keys, including
duplicates
+ // When: dedupeAndCollectAsMap is called on the data
+ // Then: All non-null keys should be deduplicated normally, and null keys
should be handled without causing reduceByKey failures
+ val testData = Seq(
+ pair("key1", "value1"),
+ pair("key2", "value2"),
+ pair(null, "nullValue1"),
+ pair("key1", "value1_dup"), // Duplicate key
+ pair(null, "nullValue2"), // Another null key
+ pair("key3", "value3")
+ )
+
+ val rdd = spark.sparkContext.parallelize(testData).toJavaRDD()
+ val pairRDD = rdd.mapToPair(p => (p.getKey, p.getValue))
+ val pairData: HoodiePairData[String, String] =
HoodieJavaPairRDD.of(pairRDD)
+ val result = HoodieDataUtils.dedupeAndCollectAsMap(pairData)
+
+ // Hard code expected map and assert equals
+ val expectedMap = new java.util.HashMap[String, String]()
+ expectedMap.put("key1", "value1_dup") // Last value wins
+ expectedMap.put("key2", "value2")
+ expectedMap.put("key3", "value3")
+ expectedMap.put(null, "nullValue2") // Last null value wins
+
+ assert(result.equals(expectedMap), s"Expected $expectedMap but got
$result")
+ }
+
+ test("Test CollectPairDataAsMap with null keys") {
+ // Given: A dataset with both null and non-null keys, where some keys have
multiple values
+ // When: CollectPairDataAsMap is called on the data
+ // Then: Each key (including null) should map to a set containing all its
unique values
+ val testData = Seq(
+ pair("key1", "value1a"),
+ pair("key1", "value1b"),
+ pair(null, "nullValue1"),
+ pair("key2", "value2"),
+ pair(null, "nullValue2"),
+ pair(null, "nullValue1"), // Duplicate null value
+ pair("key1", "value1c")
+ )
+
+ val rdd = spark.sparkContext.parallelize(testData).toJavaRDD()
+ val pairRDD = rdd.mapToPair(p => (p.getKey, p.getValue))
+ val pairData: HoodiePairData[String, String] =
HoodieJavaPairRDD.of(pairRDD)
+ val result = HoodieDataUtils.collectPairDataAsMap(pairData)
+
+ // Hard code expected map and assert equals
+ val expectedMap = new java.util.HashMap[String, java.util.Set[String]]()
+
+ val key1Values = new java.util.HashSet[String]()
+ key1Values.add("value1a")
+ key1Values.add("value1b")
+ key1Values.add("value1c")
+ expectedMap.put("key1", key1Values)
+
+ val key2Values = new java.util.HashSet[String]()
+ key2Values.add("value2")
+ expectedMap.put("key2", key2Values)
+
+ val nullValues = new java.util.HashSet[String]()
+ nullValues.add("nullValue1")
+ nullValues.add("nullValue2")
+ expectedMap.put(null, nullValues)
+
+ assert(result.equals(expectedMap), s"Expected $expectedMap but got
$result")
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
index ad2655fa0e91..08c054e96b1b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
@@ -77,6 +77,8 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
| )
| location '$basePath'
""".stripMargin
+ protected var jsc: JavaSparkContext = _
+ protected var context: HoodieSparkEngineContext = _
/**
* Get the table version for this test implementation
@@ -208,12 +210,11 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
// Verify the table version
metaClient.reload()
- val jsc = new JavaSparkContext(spark.sparkContext)
+ jsc = new JavaSparkContext(spark.sparkContext)
val sqlContext = new SQLContext(spark)
- val context = new HoodieSparkEngineContext(jsc, sqlContext)
+ context = new HoodieSparkEngineContext(jsc, sqlContext)
hoodieBackedTableMetadata = new HoodieBackedTableMetadata(
context, metaClient.getStorage, writeConfig.getMetadataConfig, basePath,
true)
-
}
/**
@@ -230,6 +231,8 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
basePath = null
metaClient = null
testData = null
+ jsc = null
+ context = null
}
/**
@@ -237,14 +240,14 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
*/
protected def testReadRecordIndex(): Unit = {
// Case 1: Empty input
- val emptyResult =
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
- assert(emptyResult.collectAsList().isEmpty, "Empty input should return
empty result")
+ val emptyResultRDD =
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
+ val emptyResult = emptyResultRDD.collectAsList()
+ assert(emptyResult.isEmpty, "Empty input should return empty result")
// Case 2: All existing keys including those with $ characters
val allKeys = HoodieListData.eager(List("a1", "a2", "a$", "$a", "a$a",
"$$").asJava)
- val allResult =
hoodieBackedTableMetadata.readRecordIndex(allKeys).collectAsList().asScala
- assert(allResult.size == 6, "Should return 6 results for 6 existing keys")
-
+ val allResultRDD = hoodieBackedTableMetadata.readRecordIndex(allKeys)
+ val allResult = allResultRDD.collectAsList().asScala
// Validate keys including special characters
val resultKeys = allResult.map(_.getKey()).toSet
assert(resultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Keys
should match input keys including $ characters")
@@ -269,29 +272,31 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
// Case 3: Non-existing keys, some matches the prefix of the existing
records.
val nonExistKeys = HoodieListData.eager(List("", "a", "a100", "200", "$",
"a$$", "$$a", "$a$").asJava)
- val nonExistResult =
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys).collectAsList().asScala
- assert(nonExistResult.isEmpty, "Non-existing keys should return empty
result, $ should not cause partial matches")
+ val nonExistResultRDD =
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys)
+ val nonExistResult = nonExistResultRDD.collectAsList().asScala
+ assert(nonExistResult.isEmpty, "Non-existing keys should return empty
result")
// Case 4: Mix of existing and non-existing keys
val mixedKeys = HoodieListData.eager(List("a1", "a100", "a2",
"a200").asJava)
- val mixedResult =
hoodieBackedTableMetadata.readRecordIndex(mixedKeys).collectAsList().asScala
- assert(mixedResult.size == 2, "Should return 2 results for 2 existing
keys")
+ val mixedResultRDD = hoodieBackedTableMetadata.readRecordIndex(mixedKeys)
+ val mixedResult = mixedResultRDD.collectAsList().asScala
val mixedResultKeys = mixedResult.map(_.getKey()).toSet
assert(mixedResultKeys == Set("a1", "a2"), "Should only return existing
keys")
// Case 5: Duplicate keys including those with $ characters
- val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$",
"a$", "$a", "$a", "a$a", "a$a", "$$", "$$").asJava)
- val dupResult =
hoodieBackedTableMetadata.readRecordIndex(dupKeys).collectAsList().asScala
- assert(dupResult.size == 6, "Should return 6 unique results for duplicate
keys")
+ val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$",
"a$", "$a", "a$a", "a$a", "$a", "$$", "$$").asJava)
+ val dupResultRDD = hoodieBackedTableMetadata.readRecordIndex(dupKeys)
+ val dupResult = dupResultRDD.collectAsList().asScala
val dupResultKeys = dupResult.map(_.getKey()).toSet
assert(dupResultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Should
deduplicate keys including those with $")
// Case 6: Use parallelized RDD
- val jsc = new JavaSparkContext(spark.sparkContext)
- val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
+ jsc = new JavaSparkContext(spark.sparkContext)
+ context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
val rddKeys = HoodieJavaRDD.of(List("a1", "a2", "a$").asJava, context, 2)
val rddResult = hoodieBackedTableMetadata.readRecordIndex(rddKeys)
- assert(rddResult.collectAsList().asScala.size == 3, "RDD input should
return 3 results")
+ val rddResultKeys = rddResult.map(_.getKey()).collectAsList().asScala.toSet
+ assert(rddResultKeys == Set("a1", "a2", "a$"), "Should deduplicate keys
including those with $")
}
/**
@@ -302,13 +307,15 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
val secondaryIndexName = "secondary_index_idx_name"
// Case 1: Empty input
- val emptyResult = hoodieBackedTableMetadata.readSecondaryIndexLocations(
+ val emptyResultRDD = hoodieBackedTableMetadata.readSecondaryIndexLocations(
HoodieListData.eager(List.empty[String].asJava), secondaryIndexName)
- assert(emptyResult.collectAsList().isEmpty, s"Empty input should return
empty result for table version ${getTableVersion}")
+ val emptyResult = emptyResultRDD.collectAsList()
+ assert(emptyResult.isEmpty, s"Empty input should return empty result for
table version ${getTableVersion}")
// Case 2: All existing secondary keys including those with $ characters
val allSecondaryKeys = HoodieListData.eager(List("b1", "b2", "b$",
"sec$key", "$sec$", "$$").asJava)
- val allResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(allSecondaryKeys,
secondaryIndexName).collectAsList().asScala
+ val allResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(allSecondaryKeys,
secondaryIndexName)
+ val allResult = allResultRDD.collectAsList().asScala
assert(allResult.size == 6, s"Should return 6 results for 6 existing
secondary keys in table version ${getTableVersion}")
// Validate HoodieRecordGlobalLocation structure
@@ -328,23 +335,27 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
// Case 3: Non-existing secondary keys, some matches the prefix of
existing records
val nonExistKeys = HoodieListData.eager(List("", "b", "non_exist_1",
"non_exist_2").asJava)
- val nonExistResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(nonExistKeys,
secondaryIndexName).collectAsList().asScala
+ val nonExistResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(nonExistKeys,
secondaryIndexName)
+ val nonExistResult = nonExistResultRDD.collectAsList().asScala
assert(nonExistResult.isEmpty, s"Non-existing secondary keys should return
empty result for table version ${getTableVersion}")
// Case 4: Mix of existing and non-existing secondary keys
val mixedKeys = HoodieListData.eager(List("b1", "non_exist_1", "b2",
"non_exist_2").asJava)
- val mixedResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(mixedKeys,
secondaryIndexName).collectAsList().asScala
+ val mixedResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(mixedKeys,
secondaryIndexName)
+ val mixedResult = mixedResultRDD.collectAsList().asScala
assert(mixedResult.size == 2, s"Should return 2 results for 2 existing
secondary keys in table version ${getTableVersion}")
// Case 5: Duplicate secondary keys
val dupKeys = HoodieListData.eager(List("b1", "b1", "b2", "b2", "b$",
"b$").asJava)
- val dupResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(dupKeys,
secondaryIndexName).collectAsList().asScala
+ val dupResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(dupKeys,
secondaryIndexName)
+ val dupResult = dupResultRDD.collectAsList().asScala
assert(dupResult.size == 3, s"Should return 3 unique results for duplicate
secondary keys in table version ${getTableVersion}")
// Case 6: Test with different secondary index (price column)
val priceIndexName = "secondary_index_idx_price"
val priceKeys = HoodieListData.eager(List("10.0", "20.0", "30.0").asJava)
- val priceResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(priceKeys,
priceIndexName).collectAsList().asScala
+ val priceResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(priceKeys, priceIndexName)
+ val priceResult = priceResultRDD.collectAsList().asScala
assert(priceResult.size == 3, s"Should return 3 results for price
secondary keys in table version ${getTableVersion}")
// Case 7: Test invalid secondary index partition name
@@ -360,9 +371,9 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
// Case 9: Test large number of keys to exercise multiple file slices path
val largeKeyList = (1 to 100).map(i => s"b$i").asJava
val largeKeys = HoodieListData.eager(largeKeyList)
- val largeResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(largeKeys,
secondaryIndexName)
+ val largeResultRDD =
hoodieBackedTableMetadata.readSecondaryIndexLocations(largeKeys,
secondaryIndexName)
// Should not throw exception, even if no results found
- assert(largeResult.collectAsList().size() == 2, "Large key list should
return empty result for non-existing keys")
+ assert(largeResultRDD.collectAsList().size() == 2, "Large key list should
return empty result for non-existing keys")
}
/**
@@ -380,23 +391,34 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
val secondaryIndexName = "secondary_index_idx_name"
// Test with existing secondary keys including those with $ characters
- val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "sec$key",
"$sec$", "$$").asJava)
+ val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "b$", "b1",
"$$", "sec$key", "$sec$", "$$", null).asJava)
val result =
hoodieBackedTableMetadata.getSecondaryIndexRecords(existingKeys,
secondaryIndexName)
- val resultMap = HoodieDataUtils.dedupeAndCollectAsMap(result)
+ val resultMap = HoodieDataUtils.collectPairDataAsMap(result)
assert(resultMap.size == 6, s"Should return 6 results for existing
secondary keys in table version ${getTableVersion}")
-
- // Validate that each secondary key maps to a set of record keys
- resultMap.asScala.foreach { case (secondaryKey, recordKeys) =>
- assert(recordKeys.asScala.nonEmpty, s"Secondary key $secondaryKey should
map to at least one record key")
- assert(recordKeys.size == 1, s"Secondary key $secondaryKey should map to
exactly one record key in this test")
- }
+ assert(resultMap.asScala("b$").asScala == Set("a$"))
+ assert(resultMap.asScala("b1").asScala == Set("a1"))
+ assert(resultMap.asScala("b2").asScala == Set("a2"))
+ assert(resultMap.asScala("$$").asScala == Set("$$"))
+ assert(resultMap.asScala("sec$key").asScala == Set("$a"))
+ assert(resultMap.asScala("$sec$").asScala == Set("a$a"))
// Test with non-existing secondary keys
- val nonExistingKeys = HoodieListData.eager(List("non_exist_1",
"non_exist_2").asJava)
+ val nonExistingKeys = HoodieListData.eager(List("", "b", "$", " ", null,
"non_exist_1", "non_exist_2").asJava)
val nonExistingResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(nonExistingKeys,
secondaryIndexName)
- val nonExistingMap =
HoodieDataUtils.dedupeAndCollectAsMap(nonExistingResult)
+ val nonExistingMap =
HoodieDataUtils.collectPairDataAsMap(nonExistingResult)
assert(nonExistingMap.isEmpty, s"Should return empty result for
non-existing secondary keys in table version ${getTableVersion}")
+
+ // Test with a mixture of existing and non-existing secondary keys
+ val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$", null, "$").asJava,
context, 2)
+ val rddResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(rddKeys, secondaryIndexName)
+
+ // Collect and validate results
+ val rddResultMap = HoodieDataUtils.collectPairDataAsMap(rddResult)
+ assert(rddResultMap.size == 3, s"Should return 3 results for existing
secondary keys with RDD input")
+ assert(resultMap.asScala("b$").asScala == Set("a$"))
+ assert(resultMap.asScala("b1").asScala == Set("a1"))
+ assert(resultMap.asScala("b2").asScala == Set("a2"))
}
}
@@ -416,8 +438,6 @@ class HoodieBackedTableMetadataIndexLookupV8TestBase
extends HoodieBackedTableMe
override protected def testVersionSpecificBehavior(): Unit = {
// For version 1, test that it only supports HoodieListData
val secondaryIndexName = "secondary_index_idx_name"
- val jsc = new JavaSparkContext(spark.sparkContext)
- val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
val rddKeys = HoodieJavaRDD.of(List("b1").asJava, context, 1)
checkExceptionContain(() => {
hoodieBackedTableMetadata.readSecondaryIndexLocations(rddKeys,
secondaryIndexName)
@@ -473,12 +493,10 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase
extends HoodieBackedTableMe
override protected def testVersionSpecificBehavior(): Unit = {
// For version 2, test that it supports both HoodieListData and RDD
val secondaryIndexName = "secondary_index_idx_name"
- val jsc = new JavaSparkContext(spark.sparkContext)
- val context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$").asJava, context, 2)
val rddResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(rddKeys,
secondaryIndexName)
- val locations = rddResult.collectAsList()
- assert(locations.asScala.size == 3, "Version 2 should support RDD input")
+ // Collect and validate results
+ assert(rddResult.count() == 3, "Version 2 should support RDD input")
// Test case for null values in secondary index
testNullValueInSecondaryIndex()
@@ -524,7 +542,6 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase
extends HoodieBackedTableMe
val nullKeys = HoodieListData.eager(List(null.asInstanceOf[String]).asJava)
val nullResult =
hoodieBackedTableMetadata.readSecondaryIndexLocations(nullKeys,
secondaryIndexName)
val nullLocations = nullResult.collectAsList().asScala
-
// Verify that null lookup returns exactly 1 result (for the null_record
we inserted)
assert(nullLocations.size == 1, s"Secondary index lookup should return
exactly 1 result for null value, but found ${nullLocations.size}")
@@ -540,15 +557,18 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase
extends HoodieBackedTableMe
// Test getSecondaryIndexRecords API with null value
val nullRecordsResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(nullKeys, secondaryIndexName)
- val nullRecordsMap =
HoodieDataUtils.dedupeAndCollectAsMap(nullRecordsResult)
-
+ val nullRecordsMap =
HoodieDataUtils.collectPairDataAsMap(nullRecordsResult)
// Verify that null key maps to record keys.
// Assert it is map with key as "null_record" -> value as set of "null"
- assert(nullRecordsMap.keySet().asScala.toList.equals(Seq("null_record")))
- assert(nullRecordsMap.get("null_record").asScala.toList.equals(Seq(null)))
- // Clean up the test data
+ assert(nullRecordsMap.size() == 1)
+ assert(nullRecordsMap.get(null).asScala.equals(Set("null_record")))
+ // Clean up the null record and check again, there should not be any
lookup result.
spark.sql(s"delete from $tableName where id = 'null_record'")
hoodieBackedTableMetadata.reset()
+
+ val nullResult2 =
hoodieBackedTableMetadata.readSecondaryIndexLocations(nullKeys,
secondaryIndexName)
+ // Verify that null lookup returns exactly 1 result (for the null_record
we inserted)
+ assert(nullResult2.isEmpty, s"Secondary index lookup should return empty,
but found ${nullLocations.size}")
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 94c1f755fb55..859041129167 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1136,7 +1136,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
for (int i = 0; i < numPartitions; i++) {
List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
- Map<String, Set<String>> mdtSecondaryKeyToRecordKeys =
HoodieDataUtils.dedupeAndCollectAsMap(
+ Map<String, Set<String>> mdtSecondaryKeyToRecordKeys =
HoodieDataUtils.collectPairDataAsMap(
((HoodieBackedTableMetadata)
metadataContext.tableMetadata).getSecondaryIndexRecords(
HoodieListData.lazy(secKeys), indexDefinition.getIndexName()));
Map<String, Set<String>> fsSecondaryKeyToRecordKeys =
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit,
indexDefinition.getSourceFields().get(0), secKeys);