This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a43fff18e3a [HUDI-8386] Fix update for secondary index and 
corresponding validation in metadata table validator (#12119)
a43fff18e3a is described below

commit a43fff18e3a7067177eb7b4094126ac85cb94118
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Oct 23 17:40:33 2024 +0530

    [HUDI-8386] Fix update for secondary index and corresponding validation in 
metadata table validator (#12119)
---
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  8 +++---
 .../functional/TestSecondaryIndexPruning.scala     |  2 +-
 .../utilities/HoodieMetadataTableValidator.java    | 31 +++++++++++++---------
 .../TestHoodieMetadataTableValidator.java          | 18 ++++++++++---
 4 files changed, 38 insertions(+), 21 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 1f1c7f170d1..d8fa78935f3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -929,14 +929,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
       Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
       List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
-      Collections.sort(sortedSecondaryKeys);
       secondaryKeySet.addAll(sortedSecondaryKeys);
+      Collections.sort(sortedSecondaryKeys);
 
       logRecordScanner.getRecords().forEach(record -> {
         HoodieMetadataPayload payload = record.getData();
-        String recordKey = payload.getRecordKeyFromSecondaryIndex();
-        if (secondaryKeySet.contains(recordKey)) {
-          String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
+        String secondaryKey = payload.key;
+        if (secondaryKeySet.contains(secondaryKey)) {
+          String recordKey = payload.getRecordKeyFromSecondaryIndex();
           logRecordsMap.computeIfAbsent(secondaryKey, k -> new 
HashMap<>()).put(recordKey, record);
         }
       });
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index a8f55c514b7..c04dc09a69c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -785,7 +785,7 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
   }
 
   private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: 
String): Unit = {
-    mergedDfList = 
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
 :: mergedDfList
+    mergedDfList = mergedDfList :+ 
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
     val secondaryKey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs(columnName).toString)
     val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
     verifyFilePruning(hudiOpts, dataFilter)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5459e3359a2..2c5fa9a1646 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1101,12 +1101,17 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
     for (int i = 0; i < numPartitions; i++) {
       List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
-      Map<String, List<String>> mdtSecondaryKeyToRecordKeys = 
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
+      Map<String, Set<String>> mdtSecondaryKeyToRecordKeys = 
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
           .getSecondaryIndexRecords(secKeys, indexDefinition.getIndexName())
           .entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey,
-              e -> e.getValue().stream().map(rec -> 
rec.getData().getRecordKeyFromSecondaryIndex()).collect(Collectors.toList())));
-      Map<String, List<String>> fsSecondaryKeyToRecordKeys = 
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit, 
indexDefinition.getSourceFields().get(0), secKeys);
+          .collect(Collectors.toMap(
+              Map.Entry::getKey,
+              e -> e.getValue().stream()
+                  .map(rec -> rec.getData().isSecondaryIndexDeleted() ? null : 
rec.getData().getRecordKeyFromSecondaryIndex())
+                  .filter(Objects::nonNull)
+                  .collect(Collectors.toSet()))
+          );
+      Map<String, Set<String>> fsSecondaryKeyToRecordKeys = 
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit, 
indexDefinition.getSourceFields().get(0), secKeys);
       if (!fsSecondaryKeyToRecordKeys.equals(mdtSecondaryKeyToRecordKeys)) {
         throw new HoodieValidationException(String.format("Secondary Index 
does not match : \nMDT secondary index: %s \nFS secondary index: %s",
             StringUtils.join(mdtSecondaryKeyToRecordKeys), 
StringUtils.join(fsSecondaryKeyToRecordKeys)));
@@ -1119,16 +1124,16 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * Queries data in the table and generates a mapping from secondary key to 
list of record keys with value
    * as secondary key. Here secondary key is the value of secondary key column 
or the secondaryField. Also the
    * function returns the secondary key mapping only for the input secondary 
keys.
-
-   * @param sparkEngineContext Spark Engine context
-   * @param basePath Table base path
+   *
+   * @param sparkEngineContext    Spark Engine context
+   * @param basePath              Table base path
    * @param latestCompletedCommit Latest completed commit in the table
-   * @param secondaryField The secondary key column used to determine the 
secondary keys
-   * @param secKeys Input secondary keys which will be filtered
+   * @param secondaryField        The secondary key column used to determine 
the secondary keys
+   * @param secKeys               Input secondary keys which will be filtered
    * @return Mapping of secondary keys to list of record keys with value as 
secondary key
    */
-  Map<String, List<String>> 
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext, 
String basePath, String latestCompletedCommit,
-                                                          String 
secondaryField, List<String> secKeys) {
+  Map<String, Set<String>> 
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext, 
String basePath, String latestCompletedCommit,
+                                                         String 
secondaryField, List<String> secKeys) {
     List<Tuple2<String, String>> recordAndSecondaryKeys = 
sparkEngineContext.getSqlContext().read().format("hudi")
         .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), 
latestCompletedCommit)
         .load(basePath)
@@ -1137,10 +1142,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         .javaRDD()
         .map(row -> new 
Tuple2<>(row.getAs(RECORD_KEY_METADATA_FIELD).toString(), 
row.getAs(secondaryField).toString()))
         .collect();
-    Map<String, List<String>> secondaryKeyToRecordKeys = new HashMap<>();
+    Map<String, Set<String>> secondaryKeyToRecordKeys = new HashMap<>();
     for (Tuple2<String, String> recordAndSecondaryKey : 
recordAndSecondaryKeys) {
       secondaryKeyToRecordKeys.compute(recordAndSecondaryKey._2, (k, v) -> {
-        List<String> recKeys = v != null ? v : new ArrayList<>();
+        Set<String> recKeys = v != null ? v : new HashSet<>();
         recKeys.add(recordAndSecondaryKey._1);
         return recKeys;
       });
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 9fad1385400..455fd4b38f2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -72,6 +72,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -216,7 +217,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
     Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
-    rows = getRowDataset(2, "row2", "cde", "p2");
+    rows = getRowDataset(2, "row2", "abc", "p2");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
     rows = getRowDataset(3, "row3", "def", "p2");
     rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
@@ -224,6 +225,17 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     // create secondary index
     sparkSession.sql("create index idx_not_record_key_col on tbl using 
secondary_index(not_record_key_col)");
     validateSecondaryIndex();
+
+    // updating record `not_record_key_col` column from `abc` to `cde`
+    rows = getRowDataset(1, "row1", "cde", "p1");
+    rows.write().format("hudi")
+        .option("hoodie.metadata.enable", "true")
+        .option("hoodie.metadata.record.index.enable", "true")
+        .mode(SaveMode.Append)
+        .save(basePath);
+
+    // validate MDT partition stats
+    validateSecondaryIndex();
   }
 
   @Test
@@ -270,10 +282,10 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     for (String secKey : new String[]{"abc", "cde", "def"}) {
       // There is one to one mapping between record key and secondary key
       String recKey = "row" + i++;
-      List<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new 
HoodieSparkEngineContext(jsc, sqlContext), basePath,
+      Set<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new 
HoodieSparkEngineContext(jsc, sqlContext), basePath,
               
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(), 
"not_record_key_col", Collections.singletonList(secKey))
           .get(secKey);
-      assertEquals(Collections.singletonList(recKey), recKeys);
+      assertEquals(Collections.singleton(recKey), recKeys);
     }
   }
 

Reply via email to