This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f780c92d26d [HUDI-8561] Throw error for unsupported data types in
secondary index (#12326)
f780c92d26d is described below
commit f780c92d26dfe38c99cd1c1a9a71b00006e82c59
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Nov 25 12:04:15 2024 +0530
[HUDI-8561] Throw error for unsupported data types in secondary index
(#12326)
- Throw error for unsupported data types (complex types such as record, map
and array) in secondary index.
- Add test for both successful scenario with primitive types and assert
exception for unsupported types.
---
.../index/functional/BaseHoodieIndexClient.java | 2 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 14 +++
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 27 ++++
.../org/apache/hudi/HoodieSparkIndexClient.java | 14 ++-
.../functional/TestSecondaryIndexPruning.scala | 140 ++++++++++++++++++++-
.../hudi/command/index/TestSecondaryIndex.scala | 9 +-
6 files changed, 199 insertions(+), 7 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
index 786b36a823f..753595ede52 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
@@ -58,7 +58,7 @@ public abstract class BaseHoodieIndexClient {
/**
* Create a functional index.
*/
- public abstract void create(HoodieTableMetaClient metaClient, String
indexName, String indexType, Map<String, Map<String, String>> columns,
Map<String, String> options);
+ public abstract void create(HoodieTableMetaClient metaClient, String
indexName, String indexType, Map<String, Map<String, String>> columns,
Map<String, String> options) throws Exception;
/**
* Drop an index. By default, ignore drop if index does not exist.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e5a4024ad50..6f03b9e8209 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1933,6 +1933,20 @@ public class HoodieTableMetadataUtil {
return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
}
+ /**
+ * Given table schema and fields to index, checks if each field's data type
are supported.
+ *
+ * @param sourceFields fields to index
+ * @param tableSchema table schema
+ * @return true if each field's data type are supported, false otherwise
+ */
+ public static boolean validateDataTypeForSecondaryIndex(List<String>
sourceFields, Schema tableSchema) {
+ return sourceFields.stream().anyMatch(fieldToIndex -> {
+ Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema,
fieldToIndex);
+ return schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+ });
+ }
+
public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
int
secondaryIndexMaxParallelism,
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 629decea429..49d140cc19d 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -42,6 +42,8 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.StoragePath;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,7 +61,9 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTableMetadataUtil extends HoodieCommonTestHarness {
@@ -328,4 +332,27 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
result = getFileIDForFileGroup(MetadataPartitionType.FUNCTIONAL_INDEX, 5,
"func_index_ts");
assertEquals("func-index-ts-0005-0", result);
}
+
+ @Test
+ public void testValidateDataTypeForSecondaryIndex() {
+ // Create a dummy schema with both complex and primitive types
+ Schema schema = SchemaBuilder.record("TestRecord")
+ .fields()
+ .requiredString("stringField")
+ .optionalInt("intField")
+ .name("arrayField").type().array().items().stringType().noDefault()
+ .name("mapField").type().map().values().intType().noDefault()
+ .name("structField").type().record("NestedRecord")
+ .fields()
+ .requiredString("nestedString")
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ // Test for primitive fields
+ assertTrue(validateDataTypeForSecondaryIndex(Arrays.asList("stringField",
"intField"), schema));
+
+ // Test for complex fields
+ assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("arrayField",
"mapField", "structField"), schema));
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
index c2eb464463d..caf98c08faa 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,6 +39,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -55,6 +57,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
public class HoodieSparkIndexClient extends BaseHoodieIndexClient {
@@ -81,7 +84,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
}
@Override
- public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options) {
+ public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options) throws Exception {
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX + userIndexName;
@@ -90,7 +93,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
}
checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
- if (!isEligibleForIndexing(metaClient, indexType, options)) {
+ if (!isEligibleForIndexing(metaClient, indexType, options, columns)) {
throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
}
@@ -146,7 +149,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
if (metaClient.getTableConfig().isMetadataTableAvailable()) {
writeConfig.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
writeConfig.putAll(JavaConverters.mapAsJavaMapConverter(HoodieCLIUtils.getLockOptions(metaClient.getBasePath().toString(),
- metaClient.getBasePath().toUri().getScheme(), new
TypedProperties())).asJava());
+ metaClient.getBasePath().toUri().getScheme(), new
TypedProperties())).asJava());
// [HUDI-7472] Ensure write-config contains the existing MDT partition
to prevent those from getting deleted
metaClient.getTableConfig().getMetadataPartitions().forEach(partitionPath -> {
@@ -169,7 +172,10 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
return writeConfig;
}
- private static boolean isEligibleForIndexing(HoodieTableMetaClient
metaClient, String indexType, Map<String, String> options) {
+ private static boolean isEligibleForIndexing(HoodieTableMetaClient
metaClient, String indexType, Map<String, String> options, Map<String,
Map<String, String>> columns) throws Exception {
+ if (!validateDataTypeForSecondaryIndex(new ArrayList<>(columns.keySet()),
new TableSchemaResolver(metaClient).getTableAvroSchema())) {
+ return false;
+ }
// for secondary index, record index is a must
if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
// either record index is enabled or record index partition is already
present
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index d2afdc5e784..6924ee787b3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieMetadataIndexException,
HoodieWriteConflictException}
+import org.apache.hudi.exception.{HoodieMetadataException,
HoodieMetadataIndexException, HoodieWriteConflictException}
import
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
import
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView,
MetadataPartitionType, SparkHoodieBackedTableMetadataWriter}
@@ -1302,6 +1302,144 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ @Test
+ def testSecondaryIndexWithPrimitiveDataTypes(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ tableName += "test_secondary_index_with_primitive_data_types"
+
+ // Create table with different data types
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | record_key_col string,
+ | string_col string,
+ | int_col int,
+ | bigint_col bigint,
+ | double_col double,
+ | decimal_col decimal(10,2),
+ | timestamp_col timestamp,
+ | boolean_col boolean,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true'
+ | )
+ | partitioned by(partition_key_col)
+ | location '$basePath'
+ """.stripMargin)
+
+ // Insert dummy records
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 10, 100,
1.1, 100.01, timestamp('2023-01-01 12:00:00'), true, 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'def', 20, 200,
2.2, 200.02, timestamp('2023-01-02 12:00:00'), false, 'p1')")
+
+ // Create secondary indexes for different data types
+ val secondaryIndexColumns = Seq("string_col", "int_col", "bigint_col",
"double_col", "decimal_col", "timestamp_col", "boolean_col")
+ secondaryIndexColumns.foreach { col =>
+ spark.sql(s"create index idx_$col on $tableName using
secondary_index($col)")
+ }
+
+ // Validate indexes created successfully
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
+ secondaryIndexColumns.foreach { col =>
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains(s"secondary_index_idx_$col"))
+ }
+
+ // get the timestamp_col for row1 (do not use hardcoded value as it may
change in system where this test is executed)
+ val timestampCol = spark.sql(s"select timestamp_col from $tableName
where record_key_col =
'row1'").collect().head.getAs[java.sql.Timestamp](0).getTime * 1000
+ // Validate secondary index records for each column
+ checkAnswer(s"select key from hudi_metadata('$basePath') where type=7
AND key LIKE '%${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1'")(
+ Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"1.1${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"10${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"100${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"100.01${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"$timestampCol${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"true${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1")
+ )
+
+ // Validate data skipping for each secondary index
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ secondaryIndexColumns.foreach { col =>
+ val (queryFilter, value) = col match {
+ case "string_col" => (s"$col = 'abc'", "abc")
+ case "int_col" => (s"$col = 10", "10")
+ case "bigint_col" => (s"$col = 100", "100")
+ case "double_col" => (s"$col = 1.1", "1.1")
+ case "decimal_col" => (s"$col = 100.01", "100.01")
+ case "timestamp_col" => (s"$col = '2023-01-01 12:00:00'",
"2023-01-01 12:00:00")
+ case "boolean_col" => (s"$col = true", "true")
+ }
+ checkAnswer(s"select ts, record_key_col, cast($col AS STRING),
partition_key_col from $tableName where $queryFilter and
record_key_col='row1'")(
+ Seq(1, "row1", value, "p1")
+ )
+ if (col != "timestamp_col") {
+ verifyQueryPredicate(hudiOpts, col)
+ }
+ }
+ }
+ }
+
+ @Test
+ def testSecondaryIndexWithComplexTypes(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ tableName += "test_secondary_index_with_complex_data_types"
+
+ // Create table with complex data types
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | record_key_col string,
+ | array_col array<int>,
+ | map_col map<string, int>,
+ | struct_col struct<field1:int, field2:string>,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true'
+ | )
+ | partitioned by(partition_key_col)
+ | location '$basePath'
+ """.stripMargin)
+
+ // Insert dummy records
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql(s"insert into $tableName values ('row1', array(1, 2, 3),
map('key1', 10, 'key2', 20), named_struct('field1', 1, 'field2', 'value1'),
'p1')")
+ spark.sql(s"insert into $tableName values ('row2', array(4, 5, 6),
map('key1', 30, 'key2', 40), named_struct('field1', 2, 'field2', 'value2'),
'p2')")
+
+ // Creation of secondary indexes for complex columns should fail
+ val secondaryIndexColumns = Seq("struct_col", "array_col", "map_col")
+ secondaryIndexColumns.foreach { col =>
+ assertThrows[HoodieMetadataIndexException] {
+ spark.sql(s"create index idx_$col on $tableName using
secondary_index($col)")
+ }
+ }
+ }
+ }
+
+
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 4ecceee9fdb..09fba4294aa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -414,7 +414,14 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.save(basePath)
// Verify secondary index for deletes
validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys
= true)
- // Corrupt the data file that was written for the delete key in the
first instant
+ // Corrupt the data file that was written for the delete key in the
first instant.
+ // We are doing it to guard against a scenario where time travel query
unintentionally looks up secondary index,
+ // according to which the data file is supposed to be skipped.
Consider following scenario:
+ // 1. A record is deleted that was inserted in the first instant.
+ // 2. Secondary index gets updated and as per the latest snapshot of
the index should not have the deleted record.
+ // 3. A time travel query is performed with data skipping enabled.
+ // 4. If it was to look up the secondary index, it would have skipped
the data file, but the data file is still present.
+ // 5. Time travel query should throw an exception in this case.
val firstCommitMetadata =
deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get())
val partitionToWriteStats =
firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList)
// Find the path for the given fileId