This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f780c92d26d [HUDI-8561] Throw error for unsupported data types in 
secondary index (#12326)
f780c92d26d is described below

commit f780c92d26dfe38c99cd1c1a9a71b00006e82c59
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Nov 25 12:04:15 2024 +0530

    [HUDI-8561] Throw error for unsupported data types in secondary index 
(#12326)
    
    - Throw error for unsupported data types (complex types such as record, map 
and array) in secondary index.
    - Add test for both successful scenario with primitive types and assert 
exception for unsupported types.
---
 .../index/functional/BaseHoodieIndexClient.java    |   2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  14 +++
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  27 ++++
 .../org/apache/hudi/HoodieSparkIndexClient.java    |  14 ++-
 .../functional/TestSecondaryIndexPruning.scala     | 140 ++++++++++++++++++++-
 .../hudi/command/index/TestSecondaryIndex.scala    |   9 +-
 6 files changed, 199 insertions(+), 7 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
index 786b36a823f..753595ede52 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
@@ -58,7 +58,7 @@ public abstract class BaseHoodieIndexClient {
   /**
    * Create a functional index.
    */
-  public abstract void create(HoodieTableMetaClient metaClient, String 
indexName, String indexType, Map<String, Map<String, String>> columns, 
Map<String, String> options);
+  public abstract void create(HoodieTableMetaClient metaClient, String 
indexName, String indexType, Map<String, Map<String, String>> columns, 
Map<String, String> options) throws Exception;
 
   /**
    * Drop an index. By default, ignore drop if index does not exist.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e5a4024ad50..6f03b9e8209 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1933,6 +1933,20 @@ public class HoodieTableMetadataUtil {
     return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
   }
 
+  /**
+   * Given table schema and fields to index, checks if each field's data type 
are supported.
+   *
+   * @param sourceFields fields to index
+   * @param tableSchema  table schema
+   * @return true if each field's data type are supported, false otherwise
+   */
+  public static boolean validateDataTypeForSecondaryIndex(List<String> 
sourceFields, Schema tableSchema) {
+    return sourceFields.stream().anyMatch(fieldToIndex -> {
+      Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
fieldToIndex);
+      return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+    });
+  }
+
   public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
                                                                         
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
                                                                         int 
secondaryIndexMaxParallelism,
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 629decea429..49d140cc19d 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -42,6 +42,8 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.StoragePath;
 
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,7 +61,9 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieTableMetadataUtil extends HoodieCommonTestHarness {
@@ -328,4 +332,27 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     result = getFileIDForFileGroup(MetadataPartitionType.FUNCTIONAL_INDEX, 5, 
"func_index_ts");
     assertEquals("func-index-ts-0005-0", result);
   }
+
+  @Test
+  public void testValidateDataTypeForSecondaryIndex() {
+    // Create a dummy schema with both complex and primitive types
+    Schema schema = SchemaBuilder.record("TestRecord")
+        .fields()
+        .requiredString("stringField")
+        .optionalInt("intField")
+        .name("arrayField").type().array().items().stringType().noDefault()
+        .name("mapField").type().map().values().intType().noDefault()
+        .name("structField").type().record("NestedRecord")
+        .fields()
+        .requiredString("nestedString")
+        .endRecord()
+        .noDefault()
+        .endRecord();
+
+    // Test for primitive fields
+    assertTrue(validateDataTypeForSecondaryIndex(Arrays.asList("stringField", 
"intField"), schema));
+
+    // Test for complex fields
+    assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("arrayField", 
"mapField", "structField"), schema));
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
index c2eb464463d..caf98c08faa 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,6 +39,7 @@ import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +57,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
 
 public class HoodieSparkIndexClient extends BaseHoodieIndexClient {
 
@@ -81,7 +84,7 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
   }
 
   @Override
-  public void create(HoodieTableMetaClient metaClient, String userIndexName, 
String indexType, Map<String, Map<String, String>> columns, Map<String, String> 
options) {
+  public void create(HoodieTableMetaClient metaClient, String userIndexName, 
String indexType, Map<String, Map<String, String>> columns, Map<String, String> 
options) throws Exception {
     String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
         ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
         : PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX + userIndexName;
@@ -90,7 +93,7 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
     }
     checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
 
-    if (!isEligibleForIndexing(metaClient, indexType, options)) {
+    if (!isEligibleForIndexing(metaClient, indexType, options, columns)) {
       throw new HoodieMetadataIndexException("Not eligible for indexing: " + 
indexType + ", indexName: " + userIndexName);
     }
 
@@ -146,7 +149,7 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
     if (metaClient.getTableConfig().isMetadataTableAvailable()) {
       writeConfig.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
       
writeConfig.putAll(JavaConverters.mapAsJavaMapConverter(HoodieCLIUtils.getLockOptions(metaClient.getBasePath().toString(),
-              metaClient.getBasePath().toUri().getScheme(), new 
TypedProperties())).asJava());
+          metaClient.getBasePath().toUri().getScheme(), new 
TypedProperties())).asJava());
 
       // [HUDI-7472] Ensure write-config contains the existing MDT partition 
to prevent those from getting deleted
       
metaClient.getTableConfig().getMetadataPartitions().forEach(partitionPath -> {
@@ -169,7 +172,10 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
     return writeConfig;
   }
 
-  private static boolean isEligibleForIndexing(HoodieTableMetaClient 
metaClient, String indexType, Map<String, String> options) {
+  private static boolean isEligibleForIndexing(HoodieTableMetaClient 
metaClient, String indexType, Map<String, String> options, Map<String, 
Map<String, String>> columns) throws Exception {
+    if (!validateDataTypeForSecondaryIndex(new ArrayList<>(columns.keySet()), 
new TableSchemaResolver(metaClient).getTableAvroSchema())) {
+      return false;
+    }
     // for secondary index, record index is a must
     if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
       // either record index is enabled or record index partition is already 
present
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index d2afdc5e784..6924ee787b3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieMetadataIndexException, 
HoodieWriteConflictException}
+import org.apache.hudi.exception.{HoodieMetadataException, 
HoodieMetadataIndexException, HoodieWriteConflictException}
 import 
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
 import 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, 
MetadataPartitionType, SparkHoodieBackedTableMetadataWriter}
@@ -1302,6 +1302,144 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testSecondaryIndexWithPrimitiveDataTypes(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      tableName += "test_secondary_index_with_primitive_data_types"
+
+      // Create table with different data types
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  string_col string,
+           |  int_col int,
+           |  bigint_col bigint,
+           |  double_col double,
+           |  decimal_col decimal(10,2),
+           |  timestamp_col timestamp,
+           |  boolean_col boolean,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+
+      // Insert dummy records
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 10, 100, 
1.1, 100.01, timestamp('2023-01-01 12:00:00'), true, 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'def', 20, 200, 
2.2, 200.02, timestamp('2023-01-02 12:00:00'), false, 'p1')")
+
+      // Create secondary indexes for different data types
+      val secondaryIndexColumns = Seq("string_col", "int_col", "bigint_col", 
"double_col", "decimal_col", "timestamp_col", "boolean_col")
+      secondaryIndexColumns.foreach { col =>
+        spark.sql(s"create index idx_$col on $tableName using 
secondary_index($col)")
+      }
+
+      // Validate indexes created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+
+      secondaryIndexColumns.foreach { col =>
+        
assert(metaClient.getTableConfig.getMetadataPartitions.contains(s"secondary_index_idx_$col"))
+      }
+
+      // get the timestamp_col for row1 (do not use hardcoded value as it may 
change in system where this test is executed)
+      val timestampCol = spark.sql(s"select timestamp_col from $tableName 
where record_key_col = 
'row1'").collect().head.getAs[java.sql.Timestamp](0).getTime * 1000
+      // Validate secondary index records for each column
+      checkAnswer(s"select key from hudi_metadata('$basePath') where type=7 
AND key LIKE '%${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1'")(
+        Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"1.1${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"10${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"100${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"100.01${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"$timestampCol${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+        Seq(s"true${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1")
+      )
+
+      // Validate data skipping for each secondary index
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+      secondaryIndexColumns.foreach { col =>
+        val (queryFilter, value) = col match {
+          case "string_col" => (s"$col = 'abc'", "abc")
+          case "int_col" => (s"$col = 10", "10")
+          case "bigint_col" => (s"$col = 100", "100")
+          case "double_col" => (s"$col = 1.1", "1.1")
+          case "decimal_col" => (s"$col = 100.01", "100.01")
+          case "timestamp_col" => (s"$col = '2023-01-01 12:00:00'", 
"2023-01-01 12:00:00")
+          case "boolean_col" => (s"$col = true", "true")
+        }
+        checkAnswer(s"select ts, record_key_col, cast($col AS STRING), 
partition_key_col from $tableName where $queryFilter and 
record_key_col='row1'")(
+          Seq(1, "row1", value, "p1")
+        )
+        if (col != "timestamp_col") {
+          verifyQueryPredicate(hudiOpts, col)
+        }
+      }
+    }
+  }
+
+  @Test
+  def testSecondaryIndexWithComplexTypes(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      var hudiOpts = commonOpts
+      hudiOpts = hudiOpts ++ Map(
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      tableName += "test_secondary_index_with_complex_data_types"
+
+      // Create table with complex data types
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  record_key_col string,
+           |  array_col array<int>,
+           |  map_col map<string, int>,
+           |  struct_col struct<field1:int, field2:string>,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+
+      // Insert dummy records
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values ('row1', array(1, 2, 3), 
map('key1', 10, 'key2', 20), named_struct('field1', 1, 'field2', 'value1'), 
'p1')")
+      spark.sql(s"insert into $tableName values ('row2', array(4, 5, 6), 
map('key1', 30, 'key2', 40), named_struct('field1', 2, 'field2', 'value2'), 
'p2')")
+
+      // Creation of secondary indexes for complex columns should fail
+      val secondaryIndexColumns = Seq("struct_col", "array_col", "map_col")
+      secondaryIndexColumns.foreach { col =>
+        assertThrows[HoodieMetadataIndexException] {
+          spark.sql(s"create index idx_$col on $tableName using 
secondary_index($col)")
+        }
+      }
+    }
+  }
+
+
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 4ecceee9fdb..09fba4294aa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -414,7 +414,14 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
           .save(basePath)
         // Verify secondary index for deletes
         validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys 
= true)
-        // Corrupt the data file that was written for the delete key in the 
first instant
+        // Corrupt the data file that was written for the delete key in the 
first instant.
+        // We are doing it to guard against a scenario where time travel query 
unintentionally looks up secondary index,
+        // according to which the data file is supposed to be skipped. 
Consider following scenario:
+        // 1. A record is deleted that was inserted in the first instant.
+        // 2. Secondary index gets updated and as per the latest snapshot of 
the index should not have the deleted record.
+        // 3. A time travel query is performed with data skipping enabled.
+        // 4. If it was to look up the secondary index, it would have skipped 
the data file, but the data file is still present.
+        // 5. Time travel query should throw an exception in this case.
         val firstCommitMetadata = 
deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get())
         val partitionToWriteStats = 
firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList)
         // Find the path for the given fileId

Reply via email to