This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 394fee03367 [HUDI-8262] Add validation for secondary index in
HoodieMetadataTableValidator (#12049)
394fee03367 is described below
commit 394fee0336745ee3b22abb5117b3a6570afa320a
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Oct 7 15:26:41 2024 +0530
[HUDI-8262] Add validation for secondary index in
HoodieMetadataTableValidator (#12049)
* [HUDI-8262] Add validation for secondary index in
HoodieMetadataTableValidator
* Addendum changes
* Update
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
Co-authored-by: Sagar Sumit <[email protected]>
* Address review comments
* Addendum changes
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/common/table/HoodieTableConfig.java | 2 -
.../apache/hudi/metadata/BaseTableMetadata.java | 4 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../org/apache/hudi/common/util/StringUtils.java | 7 ++
.../apache/hudi/common/util/TestStringUtils.java | 12 ++
.../org/apache/hudi/SecondaryIndexSupport.scala | 2 +-
.../utilities/HoodieMetadataTableValidator.java | 122 ++++++++++++++++++---
.../TestHoodieMetadataTableValidator.java | 117 ++++++++++++++++++++
8 files changed, 249 insertions(+), 19 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index a00273fc22f..70964b344b3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -879,8 +879,6 @@ public class HoodieTableConfig extends HoodieConfig {
* @returns true if the specific partition has been initialized, else
returns false.
*/
public boolean isMetadataPartitionAvailable(MetadataPartitionType
metadataPartitionType) {
- /*return getMetadataPartitions().stream().anyMatch(metadataPartition ->
- metadataPartition.equals(metadataPartitionType.getPartitionPath()) ||
(FUNCTIONAL_INDEX.equals(metadataPartitionType) &&
metadataPartition.startsWith(FUNCTIONAL_INDEX.getPartitionPath())));*/
return
getMetadataPartitions().contains(metadataPartitionType.getPartitionPath());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 743830152aa..5b823b302cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -467,9 +467,9 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
protected abstract Map<String, String>
getSecondaryKeysForRecordKeys(List<String> recordKeys, String partitionName);
/**
- * Returns a map of (record-key -> list-of-secondary-index-records) for the
provided secondary keys.
+ * Returns a map of (secondary-key -> list-of-secondary-index-records) for
the provided secondary keys.
*/
- protected abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName);
+ public abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName);
public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 0053c7fe2fb..51466d12474 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -868,7 +868,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+ public Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName) {
if (keys.isEmpty()) {
return Collections.emptyMap();
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 6cb3544db49..2b9f1af11b4 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -90,6 +91,12 @@ public class StringUtils {
return String.join(separator, list.toArray(new String[0]));
}
+ public static <K, V> String join(final Map<K, V> map) {
+ return map.entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.joining(", ", "{", "}"));
+ }
+
public static String toHexString(byte[] bytes) {
return new String(encodeHex(bytes));
}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 049601e3517..a20bf7b2de0 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -27,12 +27,15 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,6 +82,15 @@ public class TestStringUtils {
assertNotEquals(null, StringUtils.join(STRINGS));
}
+ @Test
+ public void testStringJoinWithMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("a", 1);
+ map.put("b", true);
+ assertNotNull(StringUtils.join(map));
+ assertEquals("{a=1, b=true}", StringUtils.join(map));
+ }
+
@Test
public void testStringJoinWithJavaImpl() {
assertNull(StringUtils.join(",", null));
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 39ea4b28c8c..bb897e6a181 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -98,7 +98,7 @@ class SecondaryIndexSupport(spark: SparkSession,
/**
* Returns the configured secondary key for the table
- * TODO: Handle multiple secondary indexes (similar to functional index)
+ * TODO: [HUDI-8302] Handle multiple secondary indexes (similar to
functional index)
*/
private def getSecondaryKeyConfig(queryReferencedColumns: Seq[String],
metaClient: HoodieTableMetaClient):
Option[(String, String)] = {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 037ce4371f3..9cd840df5a3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -64,6 +65,7 @@ import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
@@ -75,6 +77,7 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -93,6 +96,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.functions;
+import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,6 +105,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -327,6 +332,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
required = false)
public boolean validateRecordIndexContent = false;
+ @Parameter(names = {"--validate-secondary-index"},
+ description = "Validate the entries in secondary index match the
primary key for the records",
+ required = false)
+ public boolean validateSecondaryIndex = false;
+
@Parameter(names = {"--num-record-index-error-samples"},
description = "Number of error samples to show for record index
validation",
required = false)
@@ -390,6 +400,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
+ " --validate-partition-stats " + validatePartitionStats + ", \n"
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
+ + " --validate-secondary-index " + validateSecondaryIndex + ", \n"
+ " --validate-record-index-count " + validateRecordIndexCount +
", \n"
+ " --validate-record-index-content " + validateRecordIndexContent
+ ", \n"
+ " --num-record-index-error-samples " +
numRecordIndexErrorSamples + ", \n"
@@ -427,6 +438,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
&& Objects.equals(validateAllColumnStats,
config.validateAllColumnStats)
&& Objects.equals(validatePartitionStats,
config.validatePartitionStats)
&& Objects.equals(validateBloomFilters, config.validateBloomFilters)
+ && Objects.equals(validateSecondaryIndex,
config.validateSecondaryIndex)
&& Objects.equals(validateRecordIndexCount,
config.validateRecordIndexCount)
&& Objects.equals(validateRecordIndexContent,
config.validateRecordIndexContent)
&& Objects.equals(viewStorageTypeForFSListing,
config.viewStorageTypeForFSListing)
@@ -447,7 +459,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
public int hashCode() {
return Objects.hash(basePath, continuous, skipDataFilesForCleaning,
validateLatestFileSlices,
validateLatestBaseFiles, validateAllFileGroups,
validateAllColumnStats, validatePartitionStats, validateBloomFilters,
- validateRecordIndexCount, validateRecordIndexContent,
numRecordIndexErrorSamples,
+ validateSecondaryIndex, validateRecordIndexCount,
validateRecordIndexContent, numRecordIndexErrorSamples,
viewStorageTypeForFSListing, viewStorageTypeForMetadata,
minValidateIntervalSeconds, parallelism, recordIndexParallelism,
ignoreFailed,
sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath,
configs, help);
@@ -596,12 +608,16 @@ public class HoodieMetadataTableValidator implements
Serializable {
validateRecordIndex(engineContext, metaClient);
result.add(Pair.of(true, null));
} catch (HoodieValidationException e) {
- LOG.error(
- "Metadata table validation failed due to HoodieValidationException
in record index validation for table: {} ", cfg.basePath, e);
- if (!cfg.ignoreFailed) {
- throw e;
+ handleValidationException(e, result, "Metadata table validation failed
due to HoodieValidationException in record index validation");
+ }
+
+ try {
+ if (cfg.validateSecondaryIndex) {
+ validateSecondaryIndex(engineContext, metadataTableBasedContext,
metaClient);
+ result.add(Pair.of(true, null));
}
- result.add(Pair.of(false, e));
+ } catch (HoodieValidationException e) {
+ handleValidationException(e, result, "Metadata table validation failed
due to HoodieValidationException in secondary index validation");
}
try {
@@ -609,13 +625,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
validatePartitionStats(metadataTableBasedContext,
finalBaseFilesForCleaning, allPartitions);
result.add(Pair.of(true, null));
}
- } catch (Exception e) {
- LOG.error(
- "Metadata table validation failed due to HoodieValidationException
in partition stats validation for table: {} ", cfg.basePath, e);
- if (!cfg.ignoreFailed) {
- throw e;
- }
- result.add(Pair.of(false, e));
+ } catch (HoodieValidationException e) {
+ handleValidationException(e, result, "Metadata table validation failed
due to HoodieValidationException in partition stats validation");
}
for (Pair<Boolean, ? extends Exception> res : result) {
@@ -644,6 +655,14 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
}
+ private void handleValidationException(HoodieValidationException e,
List<Pair<Boolean, ? extends Exception>> result, String errorMsg) {
+ LOG.error(errorMsg + " for table: {} ", cfg.basePath, e);
+ if (!cfg.ignoreFailed) {
+ throw e;
+ }
+ result.add(Pair.of(false, e));
+ }
+
/**
* Check metadata is initialized and available to ready.
* If not we will log.warn and skip current validation.
@@ -1058,6 +1077,83 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
}
+ private void validateSecondaryIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient) {
+ Collection<HoodieIndexDefinition> indexDefinitions =
metaClient.getIndexMetadata().get().getIndexDefinitions().values();
+ for (HoodieIndexDefinition indexDefinition : indexDefinitions) {
+ validateSecondaryIndex(engineContext, metadataContext, metaClient,
indexDefinition);
+ }
+ }
+
+ private void validateSecondaryIndex(HoodieSparkEngineContext engineContext,
HoodieMetadataValidationContext metadataContext,
+ HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ String basePath = metaClient.getBasePath().toString();
+ String latestCompletedCommit =
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline()
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+ JavaRDD<String> secondaryKeys = readSecondaryKeys(engineContext,
indexDefinition.getSourceFields(), basePath, latestCompletedCommit);
+ secondaryKeys.persist(StorageLevel.MEMORY_AND_DISK());
+ long numSecondaryKeys = secondaryKeys.count();
+ int numPartitions = (int) Math.max(1, numSecondaryKeys / 100);
+ secondaryKeys = secondaryKeys.sortBy(x -> x, true, numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ List<String> secKeys = secondaryKeys.collectPartitions(new int[] {i})[0];
+ Map<String, List<String>> mdtSecondaryKeyToRecordKeys =
((HoodieBackedTableMetadata) metadataContext.tableMetadata)
+ .getSecondaryIndexRecords(secKeys, indexDefinition.getIndexName())
+ .entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getValue().stream().map(rec ->
rec.getData().getRecordKeyFromSecondaryIndex()).collect(Collectors.toList())));
+ Map<String, List<String>> fsSecondaryKeyToRecordKeys =
getFSSecondaryKeyToRecordKeys(engineContext, basePath, latestCompletedCommit,
indexDefinition.getSourceFields().get(0), secKeys);
+ if (!fsSecondaryKeyToRecordKeys.equals(mdtSecondaryKeyToRecordKeys)) {
+ throw new HoodieValidationException(String.format("Secondary Index
does not match : \nMDT secondary index: %s \nFS secondary index: %s",
+ StringUtils.join(mdtSecondaryKeyToRecordKeys),
StringUtils.join(fsSecondaryKeyToRecordKeys)));
+ }
+ }
+ secondaryKeys.unpersist();
+ }
+
+ /**
+ * Queries data in the table and generates a mapping from secondary key to
list of record keys with value
+ * as secondary key. Here secondary key is the value of secondary key column
or the secondaryField. Also the
+ * function returns the secondary key mapping only for the input secondary
keys.
+
+ * @param sparkEngineContext Spark Engine context
+ * @param basePath Table base path
+ * @param latestCompletedCommit Latest completed commit in the table
+ * @param secondaryField The secondary key column used to determine the
secondary keys
+ * @param secKeys Input secondary keys which will be filtered
+ * @return Mapping of secondary keys to list of record keys with value as
secondary key
+ */
+ Map<String, List<String>>
getFSSecondaryKeyToRecordKeys(HoodieSparkEngineContext sparkEngineContext,
String basePath, String latestCompletedCommit,
+ String
secondaryField, List<String> secKeys) {
+ List<Tuple2<String, String>> recordAndSecondaryKeys =
sparkEngineContext.getSqlContext().read().format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),
latestCompletedCommit)
+ .load(basePath)
+ .filter(functions.col(secondaryField).isin(secKeys.toArray()))
+ .select(RECORD_KEY_METADATA_FIELD, secondaryField)
+ .javaRDD()
+ .map(row -> new
Tuple2<>(row.getAs(RECORD_KEY_METADATA_FIELD).toString(),
row.getAs(secondaryField).toString()))
+ .collect();
+ Map<String, List<String>> secondaryKeyToRecordKeys = new HashMap<>();
+ for (Tuple2<String, String> recordAndSecondaryKey :
recordAndSecondaryKeys) {
+ secondaryKeyToRecordKeys.compute(recordAndSecondaryKey._2, (k, v) -> {
+ List<String> recKeys = v != null ? v : new ArrayList<>();
+ recKeys.add(recordAndSecondaryKey._1);
+ return recKeys;
+ });
+ }
+ return secondaryKeyToRecordKeys;
+ }
+
+ private JavaRDD<String> readSecondaryKeys(HoodieSparkEngineContext
engineContext, List<String> sourceFields, String basePath, String
latestCompletedCommit) {
+ return engineContext.getSqlContext().read().format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),
latestCompletedCommit)
+ .load(basePath)
+ .select(sourceFields.get(0))
+ .toJavaRDD()
+ .map(row -> row.getAs(sourceFields.get(0)).toString());
+ }
+
private void validateRecordIndexCount(HoodieSparkEngineContext
sparkEngineContext,
HoodieTableMetaClient metaClient) {
String basePath = metaClient.getBasePath().toString();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 5e0d69899a1..e0b0255a6f8 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -54,7 +54,11 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -64,6 +68,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -72,6 +77,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -183,6 +190,104 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
assertTrue(validator.getThrowables().isEmpty());
}
+ @Test
+ public void testSecondaryIndexValidation() throws IOException {
+ // To overwrite the table properties created during test setup
+ storage.deleteDirectory(metaClient.getBasePath());
+
+ sparkSession.sql(
+ "create table tbl ("
+ + "ts bigint, "
+ + "record_key_col string, "
+ + "not_record_key_col string, "
+ + "partition_key_col string "
+ + ") using hudi "
+ + "options ("
+ + "primaryKey = 'record_key_col', "
+ + "type = 'mor', "
+ + "hoodie.metadata.enable = 'true', "
+ + "hoodie.metadata.record.index.enable = 'true', "
+ + "hoodie.datasource.write.recordkey.field = 'record_key_col', "
+ + "hoodie.enable.data.skipping = 'true', "
+ + "hoodie.datasource.write.precombine.field = 'ts'"
+ + ") "
+ + "partitioned by(partition_key_col) "
+ + "location '" + basePath + "'");
+
+ Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+ rows = getRowDataset(2, "row2", "cde", "p2");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+ rows = getRowDataset(3, "row3", "def", "p2");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+
+ // create secondary index
+ sparkSession.sql("create index idx_not_record_key_col on tbl using
secondary_index(not_record_key_col)");
+ validateSecondaryIndex();
+ }
+
+ @Test
+ public void testGetFSSecondaryKeyToRecordKeys() throws IOException {
+ // To overwrite the table properties created during test setup
+ storage.deleteDirectory(metaClient.getBasePath());
+
+ sparkSession.sql(
+ "create table tbl ("
+ + "ts bigint, "
+ + "record_key_col string, "
+ + "not_record_key_col string, "
+ + "partition_key_col string "
+ + ") using hudi "
+ + "options ("
+ + "primaryKey = 'record_key_col', "
+ + "type = 'mor', "
+ + "hoodie.metadata.enable = 'true', "
+ + "hoodie.metadata.record.index.enable = 'true', "
+ + "hoodie.datasource.write.recordkey.field = 'record_key_col', "
+ + "hoodie.enable.data.skipping = 'true', "
+ + "hoodie.datasource.write.precombine.field = 'ts'"
+ + ") "
+ + "partitioned by(partition_key_col) "
+ + "location '" + basePath + "'");
+
+ Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+ rows = getRowDataset(2, "row2", "cde", "p2");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+ rows = getRowDataset(3, "row3", "def", "p2");
+ rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
+
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.validateAllFileGroups = true;
+ config.ignoreFailed = true;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Validate getFSSecondaryKeyToRecordKeys API
+ int i = 1;
+ for (String secKey : new String[]{"abc", "cde", "def"}) {
+ // There is one to one mapping between record key and secondary key
+ String recKey = "row" + i++;
+ List<String> recKeys = validator.getFSSecondaryKeyToRecordKeys(new
HoodieSparkEngineContext(jsc, sqlContext), basePath,
+
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
"not_record_key_col", Collections.singletonList(secKey))
+ .get(secKey);
+ assertEquals(Collections.singletonList(recKey), recKeys);
+ }
+ }
+
+ private Dataset<Row> getRowDataset(Object... rowValues) {
+ List<Row> values = Collections.singletonList(RowFactory.create(rowValues));
+ Dataset<Row> rows = sparkSession.createDataFrame(values, new StructType()
+ .add(new StructField("ts", IntegerType, true, Metadata.empty()))
+ .add(new StructField("record_key_col", StringType, true,
Metadata.empty()))
+ .add(new StructField("not_record_key_col", StringType, true,
Metadata.empty()))
+ .add(new StructField("partition_key_col", StringType, true,
Metadata.empty()))
+ );
+ return rows;
+ }
+
@Test
public void testPartitionStatsValidation() {
// TODO: Add validation for compaction and clustering cases
@@ -227,6 +332,18 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
assertTrue(validator.getThrowables().isEmpty());
}
+ private void validateSecondaryIndex() {
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = basePath;
+ config.validateLatestFileSlices = false;
+ config.validateAllFileGroups = false;
+ config.validateSecondaryIndex = true;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ assertTrue(validator.run());
+ assertFalse(validator.hasValidationFailure());
+ assertTrue(validator.getThrowables().isEmpty());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
InterruptedException {