This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f0b73833d25 [HUDI-8768] Support bloom filter options when creating
expr index using bloom filter (#12919)
f0b73833d25 is described below
commit f0b73833d25f8f85b7423b140cb1397d0fe1f15e
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Mar 11 00:35:56 2025 +0530
[HUDI-8768] Support bloom filter options when creating expr index using
bloom filter (#12919)
* [HUDI-8768] Support bloom filter options when creating expr index using
bloom filter
* add index options validation in test
* Refactoring and address more comments
improve test
* fix checkstyle
* Update
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
---------
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../client/utils/SparkMetadataWriterUtils.java | 28 +++++--
.../expression/ExpressionIndexSparkFunctions.java | 10 ++-
.../expression/HoodieSparkExpressionIndex.java | 15 ++--
.../expression/TestHoodieSparkExpressionIndex.java | 90 +++++++++++++++++-----
.../index/expression/HoodieExpressionIndex.java | 19 +++++
.../spark/sql/hudi/command/IndexCommands.scala | 1 -
.../hudi/feature/index/TestExpressionIndex.scala | 41 ++++++++--
7 files changed, 163 insertions(+), 41 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index fb324261e5d..d8ad5943138 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -86,6 +86,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
@@ -100,6 +101,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT;
@@ -204,8 +206,11 @@ public class SparkMetadataWriterUtils {
: new ExpressionIndexComputationMetadata(colStatRecords);
}
- public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex,
-
HoodieWriteConfig metadataWriteConfig, String instantTime, String
indexName) {
+ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime,
+
HoodieIndexDefinition indexDefinition) {
+ String indexName = indexDefinition.getIndexName();
+ setBloomFilterProps(metadataWriteConfig,
indexDefinition.getIndexOptions());
+
// Group data using expression index metadata and then create bloom filter
on the group
Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex,
SparkMetadataWriterUtils.getExpressionIndexColumnNames())
// row.get(1) refers to partition path value and row.get(2) refers to
file name.
@@ -226,6 +231,14 @@ public class SparkMetadataWriterUtils {
return new
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
}
+ private static void setBloomFilterProps(HoodieWriteConfig
metadataWriteConfig, Map<String, String> indexOptions) {
+ BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
+ if (indexOptions.containsKey(sourceKey)) {
+ metadataWriteConfig.getProps().setProperty(targetKey,
indexOptions.get(sourceKey));
+ }
+ });
+ }
+
public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext
sqlContext,
HoodieTableMetaClient metaClient,
Schema schema,
HoodieWriteConfig dataWriteConfig,
boolean isBaseFile) {
@@ -327,8 +340,7 @@ public class SparkMetadataWriterUtils {
Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
// Apply expression index and generate the column to index
- HoodieExpressionIndex<Column, Column> expressionIndex =
- new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ HoodieExpressionIndex<Column, Column> expressionIndex = new
HoodieSparkExpressionIndex(indexDefinition);
Column indexedColumn =
expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
@@ -336,7 +348,7 @@ public class SparkMetadataWriterUtils {
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
} else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName());
+ return getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime, indexDefinition);
} else {
throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
}
@@ -387,9 +399,9 @@ public class SparkMetadataWriterUtils {
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Schema tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema)
.orElseThrow(() -> new IllegalStateException(String.format("Expected
writer schema in commit metadata %s", commitMetadata)));
- List<Pair<String,Schema>> columnsToIndexSchemaMap =
columnsToIndex.stream()
- .map(columnToIndex -> Pair.of(columnToIndex,
HoodieAvroUtils.getSchemaForField(tableSchema,
columnToIndex).getValue().schema())).collect(
- Collectors.toList());
+ List<Pair<String, Schema>> columnsToIndexSchemaMap =
columnsToIndex.stream()
+ .map(columnToIndex -> Pair.of(columnToIndex,
HoodieAvroUtils.getSchemaForField(tableSchema,
columnToIndex).getValue().schema()))
+ .collect(Collectors.toList());
// filter for supported types
final List<String> validColumnsToIndex = columnsToIndexSchemaMap.stream()
.filter(colSchemaPair ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey())
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
index b319a143dc8..1035666dbd7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.DAYS_OPTION;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.FORMAT_OPTION;
@@ -40,6 +41,7 @@ import static
org.apache.hudi.index.expression.HoodieExpressionIndex.POSITION_OP
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REGEX_GROUP_INDEX_OPTION;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REPLACEMENT_OPTION;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.TRIM_STRING_OPTION;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
public class ExpressionIndexSparkFunctions {
@@ -103,8 +105,12 @@ public class ExpressionIndexSparkFunctions {
Column apply(List<Column> columns, Map<String, String> options);
- default void validateOptions(Map<String, String> options) {
- Set<String> validOptions = getValidOptions();
+ default void validateOptions(Map<String, String> options, String
indexType) {
+ Set<String> validOptions = new HashSet<>(getValidOptions());
+ // add bloom filters options if index type is bloom_filters
+ if (indexType.equals(PARTITION_NAME_BLOOM_FILTERS)) {
+ validOptions.addAll(BLOOM_FILTER_CONFIG_MAPPING.keySet());
+ }
Set<String> invalidOptions = new HashSet<>(options.keySet());
invalidOptions.removeAll(validOptions);
ValidationUtils.checkArgument(invalidOptions.isEmpty(),
String.format("Input options %s are not valid for spark function %s",
invalidOptions, this));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
index 0abdae73683..a55554623e3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
@@ -20,6 +20,7 @@
package org.apache.hudi.index.expression;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
@@ -33,6 +34,7 @@ public class HoodieSparkExpressionIndex implements
HoodieExpressionIndex<Column,
private String indexName;
private String indexFunction;
+ private String indexType;
private List<String> orderedSourceFields;
private Map<String, String> options;
private ExpressionIndexSparkFunctions.SparkFunction sparkFunction;
@@ -40,11 +42,12 @@ public class HoodieSparkExpressionIndex implements
HoodieExpressionIndex<Column,
public HoodieSparkExpressionIndex() {
}
- public HoodieSparkExpressionIndex(String indexName, String indexFunction,
List<String> orderedSourceFields, Map<String, String> options) {
- this.indexName = indexName;
- this.indexFunction = indexFunction;
- this.orderedSourceFields = orderedSourceFields;
- this.options = options;
+ public HoodieSparkExpressionIndex(HoodieIndexDefinition indexDefinition) {
+ this.indexName = indexDefinition.getIndexName();
+ this.indexFunction = indexDefinition.getIndexFunction();
+ this.indexType = indexDefinition.getIndexType();
+ this.orderedSourceFields = indexDefinition.getSourceFields();
+ this.options = indexDefinition.getIndexOptions();
// Check if the function from the expression exists in our map
this.sparkFunction =
ExpressionIndexSparkFunctions.SparkFunction.getSparkFunction(indexFunction);
@@ -73,7 +76,7 @@ public class HoodieSparkExpressionIndex implements
HoodieExpressionIndex<Column,
if (orderedSourceValues.size() != orderedSourceFields.size()) {
throw new IllegalArgumentException("Mismatch in number of source values
and fields in the expression");
}
- sparkFunction.validateOptions(options);
+ sparkFunction.validateOptions(options, indexType);
return sparkFunction.apply(orderedSourceValues, options);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
index 2ed36e555a8..44287cd3ea1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
@@ -19,6 +19,8 @@
package org.apache.hudi.index.expression;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.sql.Column;
@@ -36,6 +38,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.BLOOM_FILTER_TYPE;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.FALSE_POSITIVE_RATE;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -66,12 +74,14 @@ public class TestHoodieSparkExpressionIndex extends
HoodieSparkClientTestHarness
df.createOrReplaceTempView("testData");
// Initialize the HoodieSparkExpressionIndex with the year function
- HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
- "yearIndex",
- "year",
- Arrays.asList("timestampColumn"),
- new HashMap<>()
- );
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName("yearIndex")
+ .withIndexFunction("year")
+ .withIndexType(PARTITION_NAME_COLUMN_STATS)
+ .withSourceFields(Arrays.asList("timestampColumn"))
+ .withIndexOptions(new HashMap<>())
+ .build();
+ HoodieSparkExpressionIndex index = new
HoodieSparkExpressionIndex(indexDefinition);
// Apply the function using the index
Column yearColumn = index.apply(Arrays.asList(col("timestampColumn")));
@@ -100,12 +110,14 @@ public class TestHoodieSparkExpressionIndex extends
HoodieSparkClientTestHarness
df.createOrReplaceTempView("testData");
// Initialize the HoodieSparkExpressionIndex with the hour function
- HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
- "hourIndex",
- "hour",
- Arrays.asList("timestampColumn"),
- new HashMap<>()
- );
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName("hourIndex")
+ .withIndexFunction("hour")
+ .withIndexType(PARTITION_NAME_COLUMN_STATS)
+ .withSourceFields(Arrays.asList("timestampColumn"))
+ .withIndexOptions(new HashMap<>())
+ .build();
+ HoodieSparkExpressionIndex index = new
HoodieSparkExpressionIndex(indexDefinition);
// Apply the function using the index
Column hourColumn = index.apply(Arrays.asList(col("timestampColumn")));
@@ -124,12 +136,54 @@ public class TestHoodieSparkExpressionIndex extends
HoodieSparkClientTestHarness
public void testApplyYearFunctionWithWrongNumberOfArguments() {
// Setup index with the wrong number of source fields
List<Column> sourceColumns = Arrays.asList(col("timestampColumn"),
col("extraColumn"));
- HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
- "yearIndex",
- "year",
- Arrays.asList("timestampColumn", "extraColumn"),
- Collections.emptyMap()
- );
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName("yearIndex")
+ .withIndexFunction("year")
+ .withIndexType(PARTITION_NAME_COLUMN_STATS)
+ .withSourceFields(Arrays.asList("timestampColumn", "extraColumn"))
+ .withIndexOptions(Collections.emptyMap())
+ .build();
+ HoodieSparkExpressionIndex index = new
HoodieSparkExpressionIndex(indexDefinition);
assertThrows(IllegalArgumentException.class, () ->
index.apply(sourceColumns));
}
+
+ @Test
+ public void testUpperFunctionWithBloomFilters() {
+ // Create a test DataFrame with name column
+ Dataset<Row> df = sparkSession.createDataFrame(Arrays.asList(
+ RowFactory.create("John Doe"),
+ RowFactory.create("Jane Smith")
+ ), DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("name", DataTypes.StringType, false)
+ )));
+ // Register the DataFrame as a temp view so we can query it
+ df.createOrReplaceTempView("testData");
+
+ // Initialize the HoodieSparkExpressionIndex with the upper function
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName("upperIndex")
+ .withIndexFunction("upper")
+ .withIndexType(PARTITION_NAME_BLOOM_FILTERS)
+ .withSourceFields(Arrays.asList("name"))
+ .withIndexOptions(new HashMap<String, String>() {
+ {
+ put(BLOOM_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name());
+ put(BLOOM_FILTER_NUM_ENTRIES, "10000");
+ put(FALSE_POSITIVE_RATE, "0.01");
+ put(DYNAMIC_BLOOM_MAX_ENTRIES, "100000");
+ }
+ })
+ .build();
+
+ // Apply the function using the index
+ HoodieSparkExpressionIndex index = new
HoodieSparkExpressionIndex(indexDefinition);
+ Column upperColumn = index.apply(Arrays.asList(col("name")));
+ assertEquals("upper(name)", upperColumn.toString());
+
+ // validate data
+ Dataset<Row> resultDf = df.withColumn("upper(name)", upperColumn);
+ List<Row> results = resultDf.select("upper(name)").collectAsList();
+ assertEquals("JOHN DOE", results.get(0).getAs("upper(name)").toString());
+ assertEquals("JANE SMITH", results.get(1).getAs("upper(name)").toString());
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
index 2629aaa0fcc..f977233de0f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
@@ -19,8 +19,13 @@
package org.apache.hudi.index.expression;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.util.CollectionUtils;
+
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Interface representing a expression index in Hudi.
@@ -47,6 +52,20 @@ public interface HoodieExpressionIndex<S, T> extends
Serializable {
String DAYS_OPTION = "days";
String FORMAT_OPTION = "format";
String IDENTITY_TRANSFORM = "identity";
+ // Bloom filter options
+ String BLOOM_FILTER_TYPE = "filtertype";
+ String BLOOM_FILTER_NUM_ENTRIES = "numentries";
+ String FALSE_POSITIVE_RATE = "fpp";
+ String DYNAMIC_BLOOM_MAX_ENTRIES = "maxentries";
+ static final Map<String, String> BLOOM_FILTER_CONFIG_MAPPING =
CollectionUtils.createImmutableMap(
+ new HashMap<String, String>() {
+ {
+ put(BLOOM_FILTER_TYPE, HoodieStorageConfig.BLOOM_FILTER_TYPE.key());
+ put(BLOOM_FILTER_NUM_ENTRIES,
HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key());
+ put(FALSE_POSITIVE_RATE,
HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key());
+ put(DYNAMIC_BLOOM_MAX_ENTRIES,
HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key());
+ }
+ });
/**
* Get the name of the index.
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 1bacbe62a09..5510a188d56 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -57,7 +57,6 @@ case class CreateIndexCommand(table: CatalogTable,
if (indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
||
indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
- // validate that only overwrite with latest payloads can enabled SI
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
options.asJava.getOrDefault(EXPRESSION_OPTION,
ExpressionIndexSparkFunctions.IDENTITY_FUNCTION).equals(ExpressionIndexSparkFunctions.IDENTITY_FUNCTION))
{
throw new HoodieIndexException("Column stats index without expression
on any column can be created using datasource configs. " +
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
index 9d4b2262839..e76eb8bf680 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
@@ -22,12 +22,13 @@ package org.apache.spark.sql.hudi.command.index
import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport,
HoodieFileIndex, HoodieSparkUtils}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.avro.model.HoodieMetadataBloomFilter
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkMetadataWriterUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.{FileSystemViewManager,
HoodieTableFileSystemView}
import org.apache.hudi.common.testutils.HoodieTestUtils
@@ -37,7 +38,7 @@ import org.apache.hudi.hive.{HiveSyncTool,
HoodieHiveSyncClient}
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.expression.HoodieExpressionIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
MetadataPartitionType}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieMetadataPayload, MetadataPartitionType}
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
@@ -1776,7 +1777,26 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
|""".stripMargin)
// create index using bloom filters on city column with upper()
function
- spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper')")
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper', " +
+ s"${HoodieExpressionIndex.FALSE_POSITIVE_RATE}='0.01',
${HoodieExpressionIndex.BLOOM_FILTER_TYPE}='SIMPLE',
${HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES}='1000')")
+ var metaClient = createMetaClient(spark, basePath)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(s"expr_index_idx_bloom_$tableName"))
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ assertEquals(2,
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+ val indexDefinition: HoodieIndexDefinition =
metaClient.getIndexMetadata.get.getIndexDefinitions.get(s"expr_index_idx_bloom_$tableName")
+ // validate index options
+ assertEquals("0.01",
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.FALSE_POSITIVE_RATE))
+ assertEquals("SIMPLE",
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.BLOOM_FILTER_TYPE))
+ assertEquals("1000",
indexDefinition.getIndexOptions.get(HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES))
+
+ // validate index metadata
+ val indexMetadataDf = spark.sql(s"select key, BloomFilterMetadata from
hudi_metadata('$tableName') where BloomFilterMetadata is not null")
+ assertEquals(4, indexMetadataDf.count()) // corresponding to 4 files
+ val indexMetadata = indexMetadataDf.collect()
+ indexMetadata.foreach(row => {
+ val bloomFilterMetadata = row.getStruct(1)
+ assertTrue(bloomFilterMetadata.getString(0).equals("SIMPLE"))
+ })
// Pruning takes place only if query uses upper function on city
checkAnswer(s"select id, rider from $tableName where upper(city) in
('sunnyvale', 'sg')")()
@@ -1785,7 +1805,6 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
Seq("trip2", "rider-C")
)
// verify file pruning
- var metaClient = createMetaClient(spark, basePath)
val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
val cityColumn = AttributeReference("city", StringType)()
val upperCityExpr = Upper(cityColumn) // Apply the `upper` function to
the city column
@@ -2161,11 +2180,21 @@ class TestExpressionIndex extends
HoodieSparkSqlTestBase {
df =
df.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
lit("c/d"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
lit(100))
- val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+ val indexOptions = Map(
+ HoodieExpressionIndex.BLOOM_FILTER_TYPE -> "DYNAMIC_V0",
+ HoodieExpressionIndex.FALSE_POSITIVE_RATE -> "0.01",
+ HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES -> "1000",
+ HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES -> "1000"
+ )
+ val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
+ HoodieWriteConfig.newBuilder().withPath("a/b").build(), "",
+
HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build())
.getExpressionIndexRecords
// Since there is only one partition file pair there is only one bloom
filter record
- assertEquals(1, bloomFilterRecords.collectAsList().size())
+ assertEquals(1, bloomFilterRecords.count())
assertFalse(bloomFilterRecords.isEmpty)
+ val bloomFilter: HoodieMetadataBloomFilter =
bloomFilterRecords.collectAsList().get(0).getData.asInstanceOf[HoodieMetadataPayload].getBloomFilterMetadata.get()
+ assertTrue(bloomFilter.getType.equals("DYNAMIC_V0"))
}
private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient,