This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 394fee03367 [HUDI-8262] Add validation for secondary index in 
HoodieMetadataTableValidator (#12049)
394fee03367 is described below

commit 394fee0336745ee3b22abb5117b3a6570afa320a
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Oct 7 15:26:41 2024 +0530

    [HUDI-8262] Add validation for secondary index in 
HoodieMetadataTableValidator (#12049)
    
    * [HUDI-8262] Add validation for secondary index in 
HoodieMetadataTableValidator
    
    * Addendum changes
    
    * Update 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
    
    Co-authored-by: Sagar Sumit <[email protected]>
    
    * Address review comments
    
    * Addendum changes
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/common/table/HoodieTableConfig.java       |   2 -
 .../apache/hudi/metadata/BaseTableMetadata.java    |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   2 +-
 .../org/apache/hudi/common/util/StringUtils.java   |   7 ++
 .../apache/hudi/common/util/TestStringUtils.java   |  12 ++
 .../org/apache/hudi/SecondaryIndexSupport.scala    |   2 +-
 .../utilities/HoodieMetadataTableValidator.java    | 122 ++++++++++++++++++---
 .../TestHoodieMetadataTableValidator.java          | 117 ++++++++++++++++++++
 8 files changed, 249 insertions(+), 19 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index a00273fc22f..70964b344b3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -879,8 +879,6 @@ public class HoodieTableConfig extends HoodieConfig {
    * @returns true if the specific partition has been initialized, else 
returns false.
    */
   public boolean isMetadataPartitionAvailable(MetadataPartitionType 
metadataPartitionType) {
-    /*return getMetadataPartitions().stream().anyMatch(metadataPartition ->
-        metadataPartition.equals(metadataPartitionType.getPartitionPath()) || 
(FUNCTIONAL_INDEX.equals(metadataPartitionType) && 
metadataPartition.startsWith(FUNCTIONAL_INDEX.getPartitionPath())));*/
     return 
getMetadataPartitions().contains(metadataPartitionType.getPartitionPath());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 743830152aa..5b823b302cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -467,9 +467,9 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   protected abstract Map<String, String> 
getSecondaryKeysForRecordKeys(List<String> recordKeys, String partitionName);
 
   /**
-   * Returns a map of (record-key -> list-of-secondary-index-records) for the 
provided secondary keys.
+   * Returns a map of (secondary-key -> list-of-secondary-index-records) for 
the provided secondary keys.
    */
-  protected abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName);
+  public abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName);
 
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 0053c7fe2fb..51466d12474 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -868,7 +868,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+  public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName) {
     if (keys.isEmpty()) {
       return Collections.emptyMap();
     }
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 6cb3544db49..2b9f1af11b4 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -90,6 +91,12 @@ public class StringUtils {
     return String.join(separator, list.toArray(new String[0]));
   }
 
+  public static <K, V> String join(final Map<K, V> map) {
+    return map.entrySet().stream()
+        .map(e -> e.getKey() + "=" + e.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+  }
+
   public static String toHexString(byte[] bytes) {
     return new String(encodeHex(bytes));
   }
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java 
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 049601e3517..a20bf7b2de0 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -27,12 +27,15 @@ import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,6 +82,15 @@ public class TestStringUtils {
     assertNotEquals(null, StringUtils.join(STRINGS));
   }
 
+  @Test
+  public void testStringJoinWithMap() {
+    Map<String, Object> map = new HashMap<>();
+    map.put("a", 1);
+    map.put("b", true);
+    assertNotNull(StringUtils.join(map));
+    assertEquals("{a=1, b=true}", StringUtils.join(map));
+  }
+
   @Test
   public void testStringJoinWithJavaImpl() {
     assertNull(StringUtils.join(",", null));
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 39ea4b28c8c..bb897e6a181 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -98,7 +98,7 @@ class SecondaryIndexSupport(spark: SparkSession,
 
   /**
    * Returns the configured secondary key for the table
-   * TODO: Handle multiple secondary indexes (similar to functional index)
+   * TODO: [HUDI-8302] Handle multiple secondary indexes (similar to 
functional index)
    */
   private def getSecondaryKeyConfig(queryReferencedColumns: Seq[String],
                                     metaClient: HoodieTableMetaClient): 
Option[(String, String)] = {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 037ce4371f3..9cd840df5a3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -64,6 +65,7 @@ import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
@@ -75,6 +77,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -93,6 +96,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
 import org.apache.spark.sql.functions;
+import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +105,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -327,6 +332,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         required = false)
     public boolean validateRecordIndexContent = false;
 
+    @Parameter(names = {"--validate-secondary-index"},
+        description = "Validate the entries in secondary index match the 
primary key for the records",
+        required = false)
+    public boolean validateSecondaryIndex = false;
+
     @Parameter(names = {"--num-record-index-error-samples"},
         description = "Number of error samples to show for record index 
validation",
         required = false)
@@ -390,6 +400,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           + "   --validate-all-column-stats " + validateAllColumnStats + ", \n"
           + "   --validate-partition-stats " + validatePartitionStats + ", \n"
           + "   --validate-bloom-filters " + validateBloomFilters + ", \n"
+          + "   --validate-secondary-index " + validateSecondaryIndex + ", \n"
           + "   --validate-record-index-count " + validateRecordIndexCount + 
", \n"
           + "   --validate-record-index-content " + validateRecordIndexContent 
+ ", \n"
           + "   --num-record-index-error-samples " + 
numRecordIndexErrorSamples + ", \n"
@@ -427,6 +438,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           && Objects.equals(validateAllColumnStats, 
config.validateAllColumnStats)
           && Objects.equals(validatePartitionStats, 
config.validatePartitionStats)
           && Objects.equals(validateBloomFilters, config.validateBloomFilters)
+          && Objects.equals(validateSecondaryIndex, 
config.validateSecondaryIndex)
           && Objects.equals(validateRecordIndexCount, 
config.validateRecordIndexCount)
           && Objects.equals(validateRecordIndexContent, 
config.validateRecordIndexContent)
           && Objects.equals(viewStorageTypeForFSListing, 
config.viewStorageTypeForFSListing)
@@ -447,7 +459,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     public int hashCode() {
       return Objects.hash(basePath, continuous, skipDataFilesForCleaning, 
validateLatestFileSlices,
           validateLatestBaseFiles, validateAllFileGroups, 
validateAllColumnStats, validatePartitionStats, validateBloomFilters,
-          validateRecordIndexCount, validateRecordIndexContent, 
numRecordIndexErrorSamples,
+          validateSecondaryIndex, validateRecordIndexCount, 
validateRecordIndexContent, numRecordIndexErrorSamples,
           viewStorageTypeForFSListing, viewStorageTypeForMetadata,
           minValidateIntervalSeconds, parallelism, recordIndexParallelism, 
ignoreFailed,
           sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, 
configs, help);
@@ -596,12 +608,16 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         validateRecordIndex(engineContext, metaClient);
         result.add(Pair.of(true, null));
       } catch (HoodieValidationException e) {
-        LOG.error(
-            "Metadata table validation failed due to HoodieValidationException 
in record index validation for table: {} ", cfg.basePath, e);
-        if (!cfg.ignoreFailed) {
-          throw e;
+        handleValidationException(e, result, "Metadata table validation failed 
due to HoodieValidationException in record index validation");
+      }
+
+      try {
+        if (cfg.validateSecondaryIndex) {
+          validateSecondaryIndex(engineContext, metadataTableBasedContext, 
metaClient);
+          result.add(Pair.of(true, null));
         }
-        result.add(Pair.of(false, e));
+      } catch (HoodieValidationException e) {
+        handleValidationException(e, result, "Metadata table validation failed 
due to HoodieValidationException in secondary index validation");
       }
 
       try {
@@ -609,13 +625,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           validatePartitionStats(metadataTableBasedContext, 
finalBaseFilesForCleaning, allPartitions);
           result.add(Pair.of(true, null));
         }
-      } catch (Exception e) {
-        LOG.error(
-            "Metadata table validation failed due to HoodieValidationException 
in partition stats validation for table: {} ", cfg.basePath, e);
-        if (!cfg.ignoreFailed) {
-          throw e;
-        }
-        result.add(Pair.of(false, e));
+      } catch (HoodieValidationException e) {
+        handleValidationException(e, result, "Metadata table validation failed 
due to HoodieValidationException in partition stats validation");
       }
 
       for (Pair<Boolean, ? extends Exception> res : result) {
@@ -644,6 +655,14 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
   }
 
+  private void handleValidationException(HoodieValidationException e, 
List<Pair<Boolean, ? extends Exception>> result, String errorMsg) {
+    LOG.error(errorMsg + " for table: {} ", cfg.basePath, e);
+    if (!cfg.ignoreFailed) {
+      throw e;
+    }
+    result.add(Pair.of(false, e));
+  }
+
   /**
    * Check metadata is initialized and available to ready.
    * If not we will log.warn and skip current validation.
@@ -1058,6 +1077,83 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
   }
 
+  private void validateSecondaryIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                      HoodieTableMetaClient metaClient) {
+    Collection<HoodieIndexDefinition> indexDefinitions = 
metaClient.getIndexMetadata().get().getIndexDefinitions().values();
+    for (HoodieIndexDefinition indexDefinition : indexDefinitions) {
+      validateSecondaryIndex(engineContext, metadataContext, metaClient, 
indexDefinition);
+    }
+  }
+
+  private void validateSecondaryIndex(HoodieSparkEngineContext engineContext, 
HoodieMetadataValidationContext metadataContext,
+                                      HoodieTableMetaClient metaClient, 
HoodieIndexDefinition indexDefinition) {
+    String basePath = metaClient.getBasePath().toString();
+    String latestCompletedCommit = 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline()
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+    JavaRDD<String> secondaryKeys = readSecondaryKeys(engineContext, 
indexDefinition.getSourceFields(), basePath, latestCompletedCommit);
+    secondaryKeys.persist(StorageLevel.MEMORY_AND_DISK());
+    long numSecondaryKeys = secondaryKeys.count();
+    int numPartitions = (int) Math.max(1, numSecondaryKeys / 100);
+    secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
+    for (int i = 0; i < numPartitions; i++) {
+      List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
+      Map<String, List<String>> mdtSecondaryKeyToRecordKeys = 
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
+          .getSecondaryIndexRecords(secKeys, indexDefinition.getIndexName())
+          .entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey,
+              e -> e.getValue().stream().map(rec -> 
rec.getData().getRecordKeyFromSecondaryIndex()).collect(Collectors.toList())));
+      Map<String, List<String>> fsSecondaryKeyToRecordKeys = 
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit, 
indexDefinition.getSourceFields().get(0), secKeys);
+      if (!fsSecondaryKeyToRecordKeys.equals(mdtSecondaryKeyToRecordKeys)) {
+        throw new HoodieValidationException(String.format("Secondary Index 
does not match : \nMDT secondary index: %s \nFS secondary index: %s",
+            StringUtils.join(mdtSecondaryKeyToRecordKeys), 
StringUtils.join(fsSecondaryKeyToRecordKeys)));
+      }
+    }
+    secondaryKeys.unpersist();
+  }
+
+  /**
+   * Queries data in the table and generates a mapping from secondary key to 
list of record keys with value
+   * as secondary key. Here secondary key is the value of secondary key column 
or the secondaryField. Also the
+   * function returns the secondary key mapping only for the input secondary 
keys.
+
+   * @param sparkEngineContext Spark Engine context
+   * @param basePath Table base path
+   * @param latestCompletedCommit Latest completed commit in the table
+   * @param secondaryField The secondary key column used to determine the 
secondary keys
+   * @param secKeys Input secondary keys which will be filtered
+   * @return Mapping of secondary keys to list of record keys with value as 
secondary key
+   */
+  Map<String, List<String>> 
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext, 
String basePath, String latestCompletedCommit,
+                                                          String 
secondaryField, List<String> secKeys) {
+    List<Tuple2<String, String>> recordAndSecondaryKeys = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), 
latestCompletedCommit)
+        .load(basePath)
+        .filter(functions.col(secondaryField).isin(secKeys.toArray()))
+        .select(RECORD_KEY_METADATA_FIELD, secondaryField)
+        .javaRDD()
+        .map(row -> new 
Tuple2<>(row.getAs(RECORD_KEY_METADATA_FIELD).toString(), 
row.getAs(secondaryField).toString()))
+        .collect();
+    Map<String, List<String>> secondaryKeyToRecordKeys = new HashMap<>();
+    for (Tuple2<String, String> recordAndSecondaryKey : 
recordAndSecondaryKeys) {
+      secondaryKeyToRecordKeys.compute(recordAndSecondaryKey._2, (k, v) -> {
+        List<String> recKeys = v != null ? v : new ArrayList<>();
+        recKeys.add(recordAndSecondaryKey._1);
+        return recKeys;
+      });
+    }
+    return secondaryKeyToRecordKeys;
+  }
+
+  private JavaRDD<String> readSecondaryKeys(HoodieSparkEngineContext 
engineContext, List<String> sourceFields, String basePath, String 
latestCompletedCommit) {
+    return engineContext.getSqlContext().read().format("hudi")
+        .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), 
latestCompletedCommit)
+        .load(basePath)
+        .select(sourceFields.get(0))
+        .toJavaRDD()
+        .map(row -> row.getAs(sourceFields.get(0)).toString());
+  }
+
   private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
                                         HoodieTableMetaClient metaClient) {
     String basePath = metaClient.getBasePath().toString();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 5e0d69899a1..e0b0255a6f8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -54,7 +54,11 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -64,6 +68,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +77,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -183,6 +190,104 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     assertTrue(validator.getThrowables().isEmpty());
   }
 
+  @Test
+  public void testSecondaryIndexValidation() throws IOException {
+    // To overwrite the table properties created during test setup
+    storage.deleteDirectory(metaClient.getBasePath());
+
+    sparkSession.sql(
+        "create table tbl ("
+            + "ts bigint, "
+            + "record_key_col string, "
+            + "not_record_key_col string, "
+            + "partition_key_col string "
+            + ") using hudi "
+            + "options ("
+            + "primaryKey = 'record_key_col', "
+            + "type = 'mor', "
+            + "hoodie.metadata.enable = 'true', "
+            + "hoodie.metadata.record.index.enable = 'true', "
+            + "hoodie.datasource.write.recordkey.field = 'record_key_col', "
+            + "hoodie.enable.data.skipping = 'true', "
+            + "hoodie.datasource.write.precombine.field = 'ts'"
+            + ") "
+            + "partitioned by(partition_key_col) "
+            + "location '" + basePath + "'");
+
+    Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+    rows = getRowDataset(2, "row2", "cde", "p2");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+    rows = getRowDataset(3, "row3", "def", "p2");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+
+    // create secondary index
+    sparkSession.sql("create index idx_not_record_key_col on tbl using 
secondary_index(not_record_key_col)");
+    validateSecondaryIndex();
+  }
+
+  @Test
+  public void testGetFSSecondaryKeyToRecordKeys() throws IOException {
+    // To overwrite the table properties created during test setup
+    storage.deleteDirectory(metaClient.getBasePath());
+
+    sparkSession.sql(
+        "create table tbl ("
+            + "ts bigint, "
+            + "record_key_col string, "
+            + "not_record_key_col string, "
+            + "partition_key_col string "
+            + ") using hudi "
+            + "options ("
+            + "primaryKey = 'record_key_col', "
+            + "type = 'mor', "
+            + "hoodie.metadata.enable = 'true', "
+            + "hoodie.metadata.record.index.enable = 'true', "
+            + "hoodie.datasource.write.recordkey.field = 'record_key_col', "
+            + "hoodie.enable.data.skipping = 'true', "
+            + "hoodie.datasource.write.precombine.field = 'ts'"
+            + ") "
+            + "partitioned by(partition_key_col) "
+            + "location '" + basePath + "'");
+
+    Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+    rows = getRowDataset(2, "row2", "cde", "p2");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+    rows = getRowDataset(3, "row3", "def", "p2");
+    rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+    config.ignoreFailed = true;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Validate getFSSecondaryKeyToRecordKeys API
+    int i = 1;
+    for (String secKey : new String[]{"abc", "cde", "def"}) {
+      // There is one to one mapping between record key and secondary key
+      String recKey = "row" + i++;
+      List<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new 
HoodieSparkEngineContext(jsc, sqlContext), basePath,
+              
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(), 
"not_record_key_col", Collections.singletonList(secKey))
+          .get(secKey);
+      assertEquals(Collections.singletonList(recKey), recKeys);
+    }
+  }
+
+  private Dataset<Row> getRowDataset(Object... rowValues) {
+    List<Row> values = Collections.singletonList(RowFactory.create(rowValues));
+    Dataset<Row> rows = sparkSession.createDataFrame(values, new StructType()
+        .add(new StructField("ts", IntegerType, true, Metadata.empty()))
+        .add(new StructField("record_key_col", StringType, true, 
Metadata.empty()))
+        .add(new StructField("not_record_key_col", StringType, true, 
Metadata.empty()))
+        .add(new StructField("partition_key_col", StringType, true, 
Metadata.empty()))
+    );
+    return rows;
+  }
+
   @Test
   public void testPartitionStatsValidation() {
     // TODO: Add validation for compaction and clustering cases
@@ -227,6 +332,18 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     assertTrue(validator.getThrowables().isEmpty());
   }
 
+  private void validateSecondaryIndex() {
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = basePath;
+    config.validateLatestFileSlices = false;
+    config.validateAllFileGroups = false;
+    config.validateSecondaryIndex = true;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    assertTrue(validator.run());
+    assertFalse(validator.hasValidationFailure());
+    assertTrue(validator.getThrowables().isEmpty());
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws 
InterruptedException {

Reply via email to