yihua commented on code in PR #12919:
URL: https://github.com/apache/hudi/pull/12919#discussion_r1983802748
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,6 +230,21 @@ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingB
return new
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
}
+ private static void setBloomFilterProps(HoodieWriteConfig
metadataWriteConfig, HoodieIndexDefinition indexDefinition) {
+ if
(indexDefinition.getIndexOptions().containsKey(HoodieExpressionIndex.FALSE_POSITIVE_RATE))
{
+
metadataWriteConfig.getProps().setProperty(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key(),
+
indexDefinition.getIndexOptions().get(HoodieExpressionIndex.FALSE_POSITIVE_RATE));
+ }
+ if
(indexDefinition.getIndexOptions().containsKey(HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES))
{
+
metadataWriteConfig.getProps().setProperty(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key(),
+
indexDefinition.getIndexOptions().get(HoodieExpressionIndex.BLOOM_FILTER_NUM_ENTRIES));
+ }
+ if
(indexDefinition.getIndexOptions().containsKey(HoodieExpressionIndex.BLOOM_FILTER_TYPE))
{
Review Comment:
Do we consider adding `BLOOM_FILTER_DYNAMIC_MAX_ENTRIES` too?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -204,8 +205,11 @@ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingC
: new ExpressionIndexComputationMetadata(colStatRecords);
}
- public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex,
-
HoodieWriteConfig metadataWriteConfig, String instantTime, String
indexName) {
+ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime,
+
HoodieIndexDefinition indexDefinition) {
+ String indexName = indexDefinition.getIndexName();
+ setBloomFilterProps(metadataWriteConfig, indexDefinition);
Review Comment:
Pass in the `Map<String, String>` options would be better as only the
options are needed.
```suggestion
setBloomFilterProps(metadataWriteConfig,
indexDefinition.getIndexOptions());
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala:
##########
@@ -1776,7 +1776,16 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
|""".stripMargin)
// create index using bloom filters on city column with upper()
function
- spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper')")
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper', fpp='0.01', filterType='SIMPLE',
numEntries='1000')")
+ var metaClient = createMetaClient(spark, basePath)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(s"expr_index_idx_bloom_$tableName"))
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ assertEquals(2,
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+ val indexDefinition: HoodieIndexDefinition =
metaClient.getIndexMetadata.get.getIndexDefinitions.get(s"expr_index_idx_bloom_$tableName")
+ // validate index options
+ assertEquals("0.01", indexDefinition.getIndexOptions.get("fpp"))
+ assertEquals("SIMPLE",
indexDefinition.getIndexOptions.get("filterType"))
+ assertEquals("1000", indexDefinition.getIndexOptions.get("numEntries"))
Review Comment:
looks good. Is it possible to validate that the bloom filters written
conform to these params?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -226,6 +230,21 @@ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingB
return new
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
}
+ private static void setBloomFilterProps(HoodieWriteConfig
metadataWriteConfig, HoodieIndexDefinition indexDefinition) {
+ if
(indexDefinition.getIndexOptions().containsKey(HoodieExpressionIndex.FALSE_POSITIVE_RATE))
{
Review Comment:
nit: would be better to have a config key mapping of `Map<String, String>`
to store `HoodieExpressionIndex.FALSE_POSITIVE_RATE ->
HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key()` so we can avoid repeated code
on setting each property.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala:
##########
@@ -2161,7 +2169,13 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
df =
df.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
lit("c/d"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
lit(100))
- val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+ val indexOptions = Map(
+ "fpp"-> "0.01",
+ "numEntries" -> "1000"
+ )
+ val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
+ HoodieWriteConfig.newBuilder().withPath("a/b").build(), "",
+
HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build())
Review Comment:
Similar here to see if we can validate the bloom filters conform to the
params.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala:
##########
@@ -2161,7 +2169,13 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
df =
df.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
lit("c/d"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
.withColumn(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
lit(100))
- val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+ val indexOptions = Map(
+ "fpp"-> "0.01",
+ "numEntries" -> "1000"
Review Comment:
Also add `filterType` or `filter_type`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -328,15 +347,16 @@ public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<S
// Apply expression index and generate the column to index
HoodieExpressionIndex<Column, Column> expressionIndex =
- new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
+ new HoodieSparkExpressionIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getIndexType(),
indexDefinition.getSourceFields(),
+ indexDefinition.getIndexOptions());
Review Comment:
Any reason of not passing in the `indexDefinition` as a whole to the
constructor?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java:
##########
@@ -127,6 +130,7 @@ public void
testApplyYearFunctionWithWrongNumberOfArguments() {
HoodieSparkExpressionIndex index = new HoodieSparkExpressionIndex(
"yearIndex",
"year",
+ PARTITION_NAME_COLUMN_STATS,
Review Comment:
Add a new test on `PARTITION_NAME_BLOOM_FILTERS` too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]