This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a43fff18e3a [HUDI-8386] Fix update for secondary index and
corresponding validation in metadata table validator (#12119)
a43fff18e3a is described below
commit a43fff18e3a7067177eb7b4094126ac85cb94118
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Oct 23 17:40:33 2024 +0530
[HUDI-8386] Fix update for secondary index and corresponding validation in
metadata table validator (#12119)
---
.../hudi/metadata/HoodieBackedTableMetadata.java | 8 +++---
.../functional/TestSecondaryIndexPruning.scala | 2 +-
.../utilities/HoodieMetadataTableValidator.java | 31 +++++++++++++---------
.../TestHoodieMetadataTableValidator.java | 18 ++++++++++---
4 files changed, 38 insertions(+), 21 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 1f1c7f170d1..d8fa78935f3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -929,14 +929,14 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
- Collections.sort(sortedSecondaryKeys);
secondaryKeySet.addAll(sortedSecondaryKeys);
+ Collections.sort(sortedSecondaryKeys);
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (secondaryKeySet.contains(recordKey)) {
- String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
+ String secondaryKey = payload.key;
+ if (secondaryKeySet.contains(secondaryKey)) {
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
logRecordsMap.computeIfAbsent(secondaryKey, k -> new
HashMap<>()).put(recordKey, record);
}
});
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index a8f55c514b7..c04dc09a69c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -785,7 +785,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName:
String): Unit = {
- mergedDfList =
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
:: mergedDfList
+ mergedDfList = mergedDfList :+
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
val secondaryKey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs(columnName).toString)
val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
verifyFilePruning(hudiOpts, dataFilter)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5459e3359a2..2c5fa9a1646 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1101,12 +1101,17 @@ public class HoodieMetadataTableValidator implements
Serializable {
secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
for (int i = 0; i < numPartitions; i++) {
List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
- Map<String, List<String>> mdtSecondaryKeyToRecordKeys =
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
+ Map<String, Set<String>> mdtSecondaryKeyToRecordKeys =
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
.getSecondaryIndexRecords(secKeys, indexDefinition.getIndexName())
.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> e.getValue().stream().map(rec ->
rec.getData().getRecordKeyFromSecondaryIndex()).collect(Collectors.toList())));
- Map<String, List<String>> fsSecondaryKeyToRecordKeys =
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit,
indexDefinition.getSourceFields().get(0), secKeys);
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().stream()
+ .map(rec -> rec.getData().isSecondaryIndexDeleted() ? null :
rec.getData().getRecordKeyFromSecondaryIndex())
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()))
+ );
+ Map<String, Set<String>> fsSecondaryKeyToRecordKeys =
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit,
indexDefinition.getSourceFields().get(0), secKeys);
if (!fsSecondaryKeyToRecordKeys.equals(mdtSecondaryKeyToRecordKeys)) {
throw new HoodieValidationException(String.format("Secondary Index
does not match : \nMDT secondary index: %s \nFS secondary index: %s",
StringUtils.join(mdtSecondaryKeyToRecordKeys),
StringUtils.join(fsSecondaryKeyToRecordKeys)));
@@ -1119,16 +1124,16 @@ public class HoodieMetadataTableValidator implements
Serializable {
* Queries data in the table and generates a mapping from secondary key to
list of record keys with value
* as secondary key. Here secondary key is the value of secondary key column
or the secondaryField. Also the
* function returns the secondary key mapping only for the input secondary
keys.
-
- * @param sparkEngineContext Spark Engine context
- * @param basePath Table base path
+ *
+ * @param sparkEngineContext Spark Engine context
+ * @param basePath Table base path
* @param latestCompletedCommit Latest completed commit in the table
- * @param secondaryField The secondary key column used to determine the
secondary keys
- * @param secKeys Input secondary keys which will be filtered
+ * @param secondaryField The secondary key column used to determine
the secondary keys
+ * @param secKeys Input secondary keys which will be filtered
* @return Mapping of secondary keys to list of record keys with value as
secondary key
*/
- Map<String, List<String>>
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext,
String basePath, String latestCompletedCommit,
- String
secondaryField, List<String> secKeys) {
+ Map<String, Set<String>>
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext,
String basePath, String latestCompletedCommit,
+ String
secondaryField, List<String> secKeys) {
List<Tuple2<String, String>> recordAndSecondaryKeys =
sparkEngineContext.getSqlContext().read().format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),
latestCompletedCommit)
.load(basePath)
@@ -1137,10 +1142,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
.javaRDD()
.map(row -> new
Tuple2<>(row.getAs(RECORD_KEY_METADATA_FIELD).toString(),
row.getAs(secondaryField).toString()))
.collect();
- Map<String, List<String>> secondaryKeyToRecordKeys = new HashMap<>();
+ Map<String, Set<String>> secondaryKeyToRecordKeys = new HashMap<>();
for (Tuple2<String, String> recordAndSecondaryKey :
recordAndSecondaryKeys) {
secondaryKeyToRecordKeys.compute(recordAndSecondaryKey._2, (k, v) -> {
- List<String> recKeys = v != null ? v : new ArrayList<>();
+ Set<String> recKeys = v != null ? v : new HashSet<>();
recKeys.add(recordAndSecondaryKey._1);
return recKeys;
});
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 9fad1385400..455fd4b38f2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -72,6 +72,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -216,7 +217,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
- rows = getRowDataset(2, "row2", "cde", "p2");
+ rows = getRowDataset(2, "row2", "abc", "p2");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
rows = getRowDataset(3, "row3", "def", "p2");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
@@ -224,6 +225,17 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
// create secondary index
sparkSession.sql("create index idx_not_record_key_col on tbl using
secondary_index(not_record_key_col)");
validateSecondaryIndex();
+
+ // updating record `not_record_key_col` column from `abc` to `cde`
+ rows = getRowDataset(1, "row1", "cde", "p1");
+ rows.write().format("hudi")
+ .option("hoodie.metadata.enable", "true")
+ .option("hoodie.metadata.record.index.enable", "true")
+ .mode(SaveMode.Append)
+ .save(basePath);
+
+ // validate MDT partition stats
+ validateSecondaryIndex();
}
@Test
@@ -270,10 +282,10 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
for (String secKey : new String[]{"abc", "cde", "def"}) {
// There is one to one mapping between record key and secondary key
String recKey = "row" + i++;
- List<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new
HoodieSparkEngineContext(jsc, sqlContext), basePath,
+ Set<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new
HoodieSparkEngineContext(jsc, sqlContext), basePath,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
"not_record_key_col", Collections.singletonList(secKey))
.get(secKey);
- assertEquals(Collections.singletonList(recKey), recKeys);
+ assertEquals(Collections.singleton(recKey), recKeys);
}
}