This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b73833d25 [HUDI-8768] Support bloom filter options when creating 
expr index using bloom filter (#12919)
f0b73833d25 is described below

commit f0b73833d25f8f85b7423b140cb1397d0fe1f15e
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Mar 11 00:35:56 2025 +0530

    [HUDI-8768] Support bloom filter options when creating expr index using 
bloom filter (#12919)
    
    * [HUDI-8768] Support bloom filter options when creating expr index using 
bloom filter
    
    * add index options validation in test
    
    * Refactoring and address more comments
    
    improve test
    
    * fix checkstyle
    
    * Update 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../client/utils/SparkMetadataWriterUtils.java     | 28 +++++--
 .../expression/ExpressionIndexSparkFunctions.java  | 10 ++-
 .../expression/HoodieSparkExpressionIndex.java     | 15 ++--
 .../expression/TestHoodieSparkExpressionIndex.java | 90 +++++++++++++++++-----
 .../index/expression/HoodieExpressionIndex.java    | 19 +++++
 .../spark/sql/hudi/command/IndexCommands.scala     |  1 -
 .../hudi/feature/index/TestExpressionIndex.scala   | 41 ++++++++--
 7 files changed, 163 insertions(+), 41 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index fb324261e5d..d8ad5943138 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -86,6 +86,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
@@ -100,6 +101,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT;
@@ -204,8 +206,11 @@ public class SparkMetadataWriterUtils {
         : new ExpressionIndexComputationMetadata(colStatRecords);
   }
 
-  public static ExpressionIndexComputationMetadata 
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
-                                                                               
              HoodieWriteConfig metadataWriteConfig, String instantTime, String 
indexName) {
+  public static ExpressionIndexComputationMetadata 
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime,
+                                                                               
              HoodieIndexDefinition indexDefinition) {
+    String indexName = indexDefinition.getIndexName();
+    setBloomFilterProps(metadataWriteConfig, 
indexDefinition.getIndexOptions());
+
     // Group data using expression index metadata and then create bloom filter 
on the group
     Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getExpressionIndexColumnNames())
         // row.get(1) refers to partition path value and row.get(2) refers to 
file name.
@@ -226,6 +231,14 @@ public class SparkMetadataWriterUtils {
     return new 
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
   }
 
+  private static void setBloomFilterProps(HoodieWriteConfig 
metadataWriteConfig, Map<String, String> indexOptions) {
+    BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
+      if (indexOptions.containsKey(sourceKey)) {
+        metadataWriteConfig.getProps().setProperty(targetKey, 
indexOptions.get(sourceKey));
+      }
+    });
+  }
+
   public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext 
sqlContext,
                                             HoodieTableMetaClient metaClient, 
Schema schema,
                                             HoodieWriteConfig dataWriteConfig, 
boolean isBaseFile) {
@@ -327,8 +340,7 @@ public class SparkMetadataWriterUtils {
     Dataset<Row> rowDataset = 
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
 structType);
 
     // Apply expression index and generate the column to index
-    HoodieExpressionIndex<Column, Column> expressionIndex =
-        new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
+    HoodieExpressionIndex<Column, Column> expressionIndex = new 
HoodieSparkExpressionIndex(indexDefinition);
     Column indexedColumn = 
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
     rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
 
@@ -336,7 +348,7 @@ public class SparkMetadataWriterUtils {
     if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
       return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
     } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-      return getExpressionIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime, 
indexDefinition.getIndexName());
+      return getExpressionIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime, indexDefinition);
     } else {
       throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
     }
@@ -387,9 +399,9 @@ public class SparkMetadataWriterUtils {
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
       Schema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema)
           .orElseThrow(() -> new IllegalStateException(String.format("Expected 
writer schema in commit metadata %s", commitMetadata)));
-      List<Pair<String,Schema>> columnsToIndexSchemaMap = 
columnsToIndex.stream()
-          .map(columnToIndex -> Pair.of(columnToIndex, 
HoodieAvroUtils.getSchemaForField(tableSchema, 
columnToIndex).getValue().schema())).collect(
-          Collectors.toList());
+      List<Pair<String, Schema>> columnsToIndexSchemaMap = 
columnsToIndex.stream()
+          .map(columnToIndex -> Pair.of(columnToIndex, 
HoodieAvroUtils.getSchemaForField(tableSchema, 
columnToIndex).getValue().schema()))
+          .collect(Collectors.toList());
       // filter for supported types
       final List<String> validColumnsToIndex = columnsToIndexSchemaMap.stream()
           .filter(colSchemaPair -> 
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey())
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
index b319a143dc8..1035666dbd7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.DAYS_OPTION;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.FORMAT_OPTION;
@@ -40,6 +41,7 @@ import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.POSITION_OP
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.REGEX_GROUP_INDEX_OPTION;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.REPLACEMENT_OPTION;
 import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.TRIM_STRING_OPTION;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 
 public class ExpressionIndexSparkFunctions {
 
@@ -103,8 +105,12 @@ public class ExpressionIndexSparkFunctions {
 
     Column apply(List<Column> columns, Map<String, String> options);
 
-    default void validateOptions(Map<String, String> options) {
-      Set<String> validOptions = getValidOptions();
+    default void validateOptions(Map<String, String> options, String 
indexType) {
+      Set<String> validOptions = new HashSet<>(getValidOptions());
+      // add bloom filters options if index type is bloom_filters
+      if (indexType.equals(PARTITION_NAME_BLOOM_FILTERS)) {
+        validOptions.addAll(BLOOM_FILTER_CONFIG_MAPPING.keySet());
+      }
       Set<String> invalidOptions = new HashSet<>(options.keySet());
       invalidOptions.removeAll(validOptions);
       ValidationUtils.checkArgument(invalidOptions.isEmpty(), 
String.format("Input options %s are not valid for spark function %s", 
invalidOptions, this));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
index 0abdae73683..a55554623e3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.index.expression;
 
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 
@@ -33,6 +34,7 @@ public class HoodieSparkExpressionIndex implements 
HoodieExpressionIndex<Column,
 
   private String indexName;
   private String indexFunction;
+  private String indexType;
   private List<String> orderedSourceFields;
   private Map<String, String> options;
   private ExpressionIndexSparkFunctions.SparkFunction sparkFunction;
@@ -40,11 +42,12 @@ public class HoodieSparkExpressionIndex implements 
HoodieExpressionIndex<Column,
   public HoodieSparkExpressionIndex() {
   }
 
-  public HoodieSparkExpressionIndex(String indexName, String indexFunction, 
List<String> orderedSourceFields, Map<String, String> options) {
-    this.indexName = indexName;
-    this.indexFunction = indexFunction;
-    this.orderedSourceFields = orderedSourceFields;
-    this.options = options;
+  public HoodieSparkExpressionIndex(HoodieIndexDefinition indexDefinition) {
+    this.indexName = indexDefinition.getIndexName();
+    this.indexFunction = indexDefinition.getIndexFunction();
+    this.indexType = indexDefinition.getIndexType();
+    this.orderedSourceFields = indexDefinition.getSourceFields();
+    this.options = indexDefinition.getIndexOptions();
 
     // Check if the function from the expression exists in our map
     this.sparkFunction = 
ExpressionIndexSparkFunctions.SparkFunction.getSparkFunction(indexFunction);
@@ -73,7 +76,7 @@ public class HoodieSparkExpressionIndex implements 
HoodieExpressionIndex<Column,
     if (orderedSourceValues.size() != orderedSourceFields.size()) {
       throw new IllegalArgumentException("Mismatch in number of source values 
and fields in the expression");
     }
-    sparkFunction.validateOptions(options);
+    sparkFunction.validateOptions(options, indexType);
     return sparkFunction.apply(orderedSourceValues, options);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
index 2ed36e555a8..44287cd3ea1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.index.expression;
 
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
 import org.apache.spark.sql.Column;
@@ -36,6 +38,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_TYPE;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.FALSE_POSITIVE_RATE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static org.apache.spark.sql.functions.col;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -66,12 +74,14 @@ public class TestHoodieSparkExpressionIndex extends 
HoodieSparkClientTestHarness
     df.createOrReplaceTempView("testData");
 
     // Initialize the HoodieSparkExpressionIndex with the year function
-    HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
-        "yearIndex",
-        "year",
-        Arrays.asList("timestampColumn"),
-        new HashMap<>()
-    );
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName("yearIndex")
+        .withIndexFunction("year")
+        .withIndexType(PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(Arrays.asList("timestampColumn"))
+        .withIndexOptions(new HashMap<>())
+        .build();
+    HoodieSparkExpressionIndex index = new 
HoodieSparkExpressionIndex(indexDefinition);
 
     // Apply the function using the index
     Column yearColumn = index.apply(Arrays.asList(col("timestampColumn")));
@@ -100,12 +110,14 @@ public class TestHoodieSparkExpressionIndex extends 
HoodieSparkClientTestHarness
     df.createOrReplaceTempView("testData");
 
     // Initialize the HoodieSparkExpressionIndex with the hour function
-    HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
-        "hourIndex",
-        "hour",
-        Arrays.asList("timestampColumn"),
-        new HashMap<>()
-    );
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName("hourIndex")
+        .withIndexFunction("hour")
+        .withIndexType(PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(Arrays.asList("timestampColumn"))
+        .withIndexOptions(new HashMap<>())
+        .build();
+    HoodieSparkExpressionIndex index = new 
HoodieSparkExpressionIndex(indexDefinition);
 
     // Apply the function using the index
     Column hourColumn = index.apply(Arrays.asList(col("timestampColumn")));
@@ -124,12 +136,54 @@ public class TestHoodieSparkExpressionIndex extends 
HoodieSparkClientTestHarness
   public void testApplyYearFunctionWithWrongNumberOfArguments() {
     // Setup index with the wrong number of source fields
     List<Column> sourceColumns = Arrays.asList(col("timestampColumn"), 
col("extraColumn"));
-    HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
-        "yearIndex",
-        "year",
-        Arrays.asList("timestampColumn", "extraColumn"),
-        Collections.emptyMap()
-    );
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName("yearIndex")
+        .withIndexFunction("year")
+        .withIndexType(PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(Arrays.asList("timestampColumn", "extraColumn"))
+        .withIndexOptions(Collections.emptyMap())
+        .build();
+    HoodieSparkExpressionIndex index = new 
HoodieSparkExpressionIndex(indexDefinition);
     assertThrows(IllegalArgumentException.class, () -> 
index.apply(sourceColumns));
   }
+
+  @Test
+  public void testUpperFunctionWithBloomFilters() {
+    // Create a test DataFrame with name column
+    Dataset<Row> df = sparkSession.createDataFrame(Arrays.asList(
+        RowFactory.create("John Doe"),
+        RowFactory.create("Jane Smith")
+    ), DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("name", DataTypes.StringType, false)
+    )));
+    // Register the DataFrame as a temp view so we can query it
+    df.createOrReplaceTempView("testData");
+
+    // Initialize the HoodieSparkExpressionIndex with the upper function
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName("upperIndex")
+        .withIndexFunction("upper")
+        .withIndexType(PARTITION_NAME_BLOOM_FILTERS)
+        .withSourceFields(Arrays.asList("name"))
+        .withIndexOptions(new HashMap<String, String>() {
+          {
+            put(BLOOM_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name());
+            put(BLOOM_FILTER_NUM_ENTRIES, "10000");
+            put(FALSE_POSITIVE_RATE, "0.01");
+            put(DYNAMIC_BLOOM_MAX_ENTRIES, "100000");
+          }
+        })
+        .build();
+
+    // Apply the function using the index
+    HoodieSparkExpressionIndex index = new 
HoodieSparkExpressionIndex(indexDefinition);
+    Column upperColumn = index.apply(Arrays.asList(col("name")));
+    assertEquals("upper(name)", upperColumn.toString());
+
+    // validate data
+    Dataset<Row> resultDf = df.withColumn("upper(name)", upperColumn);
+    List<Row> results = resultDf.select("upper(name)").collectAsList();
+    assertEquals("JOHN DOE", results.get(0).getAs("upper(name)").toString());
+    assertEquals("JANE SMITH", results.get(1).getAs("upper(name)").toString());
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
index 2629aaa0fcc..f977233de0f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
@@ -19,8 +19,13 @@
 
 package org.apache.hudi.index.expression;
 
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.util.CollectionUtils;
+
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Interface representing a expression index in Hudi.
@@ -47,6 +52,20 @@ public interface HoodieExpressionIndex<S, T> extends 
Serializable {
   String DAYS_OPTION = "days";
   String FORMAT_OPTION = "format";
   String IDENTITY_TRANSFORM = "identity";
+  // Bloom filter options
+  String BLOOM_FILTER_TYPE = "filtertype";
+  String BLOOM_FILTER_NUM_ENTRIES = "numentries";
+  String FALSE_POSITIVE_RATE = "fpp";
+  String DYNAMIC_BLOOM_MAX_ENTRIES = "maxentries";
+  static final Map<String, String> BLOOM_FILTER_CONFIG_MAPPING = 
CollectionUtils.createImmutableMap(
+      new HashMap<String, String>() {
+        {
+          put(BLOOM_FILTER_TYPE, HoodieStorageConfig.BLOOM_FILTER_TYPE.key());
+          put(BLOOM_FILTER_NUM_ENTRIES, 
HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key());
+          put(FALSE_POSITIVE_RATE, 
HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key());
+          put(DYNAMIC_BLOOM_MAX_ENTRIES, 
HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key());
+        }
+      });
 
   /**
    * Get the name of the index.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 1bacbe62a09..5510a188d56 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -57,7 +57,6 @@ case class CreateIndexCommand(table: CatalogTable,
 
     if (indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
       || 
indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
-      // validate that only overwrite with latest payloads can enabled SI
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
         options.asJava.getOrDefault(EXPRESSION_OPTION, 
ExpressionIndexSparkFunctions.IDENTITY_FUNCTION).equals(ExpressionIndexSparkFunctions.IDENTITY_FUNCTION))
 {
         throw new HoodieIndexException("Column stats index without expression 
on any column can be created using datasource configs. " +
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
index 9d4b2262839..e76eb8bf680 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
@@ -22,12 +22,13 @@ package org.apache.spark.sql.hudi.command.index
 import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport, 
HoodieFileIndex, HoodieSparkUtils}
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.avro.model.HoodieMetadataBloomFilter
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkMetadataWriterUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.{FileSystemViewManager, 
HoodieTableFileSystemView}
 import org.apache.hudi.common.testutils.HoodieTestUtils
@@ -37,7 +38,7 @@ import org.apache.hudi.hive.{HiveSyncTool, 
HoodieHiveSyncClient}
 import org.apache.hudi.hive.testutils.HiveTestUtil
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.index.expression.HoodieExpressionIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
MetadataPartitionType}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieMetadataPayload, MetadataPartitionType}
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, 
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
@@ -1776,7 +1777,26 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase 
{
              |""".stripMargin)
 
         // create index using bloom filters on city column with upper() 
function
-        spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(expr='upper')")
+        spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(expr='upper', " +
+          s"${HoodieExpressionIndex.FALSE_POSITIVE_RATE}='0.01', 
${HoodieExpressionIndex.BLOOM_FILTER_TYPE}='SIMPLE', 
${HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES}='1000')")
+        var metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(s"expr_index_idx_bloom_$tableName"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        assertEquals(2, 
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+        val indexDefinition: HoodieIndexDefinition = 
metaClient.getIndexMetadata.get.getIndexDefinitions.get(s"expr_index_idx_bloom_$tableName")
+        // validate index options
+        assertEquals("0.01", 
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.FALSE_POSITIVE_RATE))
+        assertEquals("SIMPLE", 
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.BLOOM_FILTER_TYPE))
+        assertEquals("1000", 
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES))
+
+        // validate index metadata
+        val indexMetadataDf = spark.sql(s"select key, BloomFilterMetadata from 
hudi_metadata('$tableName') where BloomFilterMetadata is not null")
+        assertEquals(4, indexMetadataDf.count()) // corresponding to 4 files
+        val indexMetadata = indexMetadataDf.collect()
+        indexMetadata.foreach(row => {
+          val bloomFilterMetadata = row.getStruct(1)
+          assertTrue(bloomFilterMetadata.getString(0).equals("SIMPLE"))
+        })
 
         // Pruning takes place only if query uses upper function on city
         checkAnswer(s"select id, rider from $tableName where upper(city) in 
('sunnyvale', 'sg')")()
@@ -1785,7 +1805,6 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
           Seq("trip2", "rider-C")
         )
         // verify file pruning
-        var metaClient = createMetaClient(spark, basePath)
         val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
         val cityColumn = AttributeReference("city", StringType)()
         val upperCityExpr = Upper(cityColumn) // Apply the `upper` function to 
the city column
@@ -2161,11 +2180,21 @@ class TestExpressionIndex extends 
HoodieSparkSqlTestBase {
     df = 
df.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
lit("c/d"))
       
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH, 
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
       .withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
lit(100))
-    val bloomFilterRecords = 
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5", 
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+    val indexOptions = Map(
+      HoodieExpressionIndex.BLOOM_FILTER_TYPE -> "DYNAMIC_V0",
+      HoodieExpressionIndex.FALSE_POSITIVE_RATE -> "0.01",
+      HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES -> "1000",
+      HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES -> "1000"
+    )
+    val bloomFilterRecords = 
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
+      HoodieWriteConfig.newBuilder().withPath("a/b").build(), "",
+        
HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build())
       .getExpressionIndexRecords
     // Since there is only one partition file pair there is only one bloom 
filter record
-    assertEquals(1, bloomFilterRecords.collectAsList().size())
+    assertEquals(1, bloomFilterRecords.count())
     assertFalse(bloomFilterRecords.isEmpty)
+    val bloomFilter: HoodieMetadataBloomFilter = 
bloomFilterRecords.collectAsList().get(0).getData.asInstanceOf[HoodieMetadataPayload].getBloomFilterMetadata.get()
+    assertTrue(bloomFilter.getType.equals("DYNAMIC_V0"))
   }
 
   private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient,

Reply via email to