This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 30b4e8f3f31 [HUDI-9369] Simplify bloom filter config passing in 
metadata table writer (#13253)
30b4e8f3f31 is described below

commit 30b4e8f3f3102ef31dc7120af8d7538f2bdb3b21
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Fri May 2 22:27:39 2025 -0700

    [HUDI-9369] Simplify bloom filter config passing in metadata table writer 
(#13253)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 16 ++++++++
 .../client/utils/SparkMetadataWriterUtils.java     | 30 ++++++++-------
 .../SparkHoodieBackedTableMetadataWriter.java      |  6 +--
 .../hudi/common/config/HoodieStorageConfig.java    |  4 ++
 .../common/config/TestHoodieStorageConfig.java     | 43 ++++++++++++++++++++++
 .../hudi/feature/index/TestExpressionIndex.scala   |  2 +-
 7 files changed, 84 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 770f0535d00..812d083a70e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2055,7 +2055,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getBloomFilterType() {
-    return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE);
+    return getStorageConfig().getBloomFilterType();
   }
 
   public int getDynamicBloomFilterMaxNumEntries() {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 716f62d2894..90709ce8c7d 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -22,7 +22,9 @@ import 
org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.transaction.lock.NoopLockProvider;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -55,6 +57,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -709,6 +712,19 @@ public class TestHoodieWriteConfig {
         writeConfig.getViewStorageConfig().getMaxMemoryForFileGroupMap());
   }
 
+  @Test
+  void testBloomFilterType() {
+    String bloomFilterType = BloomFilterTypeCode.SIMPLE.name();
+    
assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(),
+        bloomFilterType.toUpperCase());
+    Properties props = new Properties();
+    props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType);
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withProperties(props).build();
+    assertEquals(bloomFilterType, config.getBloomFilterType());
+  }
+
   private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
     final Properties properties = new Properties();
     configs.forEach(properties::setProperty);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index d8ad5943138..28ae8622cdf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.EngineType;
@@ -206,10 +207,11 @@ public class SparkMetadataWriterUtils {
         : new ExpressionIndexComputationMetadata(colStatRecords);
   }
 
-  public static ExpressionIndexComputationMetadata 
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime,
-                                                                               
              HoodieIndexDefinition indexDefinition) {
+  public static ExpressionIndexComputationMetadata 
getExpressionIndexRecordsUsingBloomFilter(
+      Dataset<Row> dataset, String columnToIndex, HoodieStorageConfig 
storageConfig, String instantTime,
+      HoodieIndexDefinition indexDefinition) {
     String indexName = indexDefinition.getIndexName();
-    setBloomFilterProps(metadataWriteConfig, 
indexDefinition.getIndexOptions());
+    setBloomFilterProps(storageConfig, indexDefinition.getIndexOptions());
 
     // Group data using expression index metadata and then create bloom filter 
on the group
     Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, 
SparkMetadataWriterUtils.getExpressionIndexColumnNames())
@@ -219,22 +221,22 @@ public class SparkMetadataWriterUtils {
           String partition = pair.getLeft().toString();
           String relativeFilePath = pair.getRight().toString();
           String fileName = FSUtils.getFileName(relativeFilePath, partition);
-          BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
+          BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(storageConfig);
           iterator.forEachRemaining(row -> {
             byte[] key = row.getAs(columnToIndex).toString().getBytes();
             bloomFilter.add(key);
           });
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
-          HoodieRecord bloomFilterRecord = 
createBloomFilterMetadataRecord(partition, fileName, instantTime, 
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
+          HoodieRecord bloomFilterRecord = 
createBloomFilterMetadataRecord(partition, fileName, instantTime, 
storageConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
           return Collections.singletonList(bloomFilterRecord).iterator();
         }), Encoders.kryo(HoodieRecord.class));
     return new 
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
   }
 
-  private static void setBloomFilterProps(HoodieWriteConfig 
metadataWriteConfig, Map<String, String> indexOptions) {
+  private static void setBloomFilterProps(HoodieStorageConfig storageConfig, 
Map<String, String> indexOptions) {
     BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
       if (indexOptions.containsKey(sourceKey)) {
-        metadataWriteConfig.getProps().setProperty(targetKey, 
indexOptions.get(sourceKey));
+        storageConfig.getProps().setProperty(targetKey, 
indexOptions.get(sourceKey));
       }
     });
   }
@@ -305,16 +307,15 @@ public class SparkMetadataWriterUtils {
    * @param instantTime                     Instant time
    * @param engineContext                   HoodieEngineContext
    * @param dataWriteConfig                 Write Config for the data table
-   * @param metadataWriteConfig             Write config for the metadata table
    * @param partitionRecordsFunctionOpt     Function used to generate 
partition stat records for the EI. It takes the column range metadata generated 
for the provided partition files as input
    *                                        and uses those to generate the 
final partition stats
    * @return ExpressionIndexComputationMetadata containing both EI column stat 
records and partition stat records if partitionRecordsFunctionOpt is provided
    */
-  @SuppressWarnings("checkstyle:LineLength")
-  public static ExpressionIndexComputationMetadata 
getExprIndexRecords(List<Pair<String, Pair<String, Long>>> 
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-                                                                       
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String 
instantTime,
-                                                                       
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, 
HoodieWriteConfig metadataWriteConfig,
-                                                                       
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, 
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+  public static ExpressionIndexComputationMetadata getExprIndexRecords(
+      List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, 
HoodieIndexDefinition indexDefinition,
+      HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, 
String instantTime,
+      HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
+      Option<Function<HoodiePairData<String, 
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> 
partitionRecordsFunctionOpt) {
     HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
     if (indexDefinition.getSourceFields().isEmpty()) {
       // In case there are no columns to index, bail
@@ -348,7 +349,8 @@ public class SparkMetadataWriterUtils {
     if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
       return getExpressionIndexRecordsUsingColumnStats(rowDataset, 
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
     } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
-      return getExpressionIndexRecordsUsingBloomFilter(rowDataset, 
columnToIndex, metadataWriteConfig, instantTime, indexDefinition);
+      return getExpressionIndexRecordsUsingBloomFilter(
+          rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(), 
instantTime, indexDefinition);
     } else {
       throw new UnsupportedOperationException(indexDefinition.getIndexType() + 
" is not yet supported");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 863a4995f6e..875ebd858ad 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -41,6 +40,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
 import 
org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata;
 import org.apache.hudi.metrics.DistributedRegistry;
@@ -198,7 +198,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     // with the expression index records from the unmodified files to get the 
new partition stat records
     HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata 
expressionIndexComputationMetadata =
         SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs, 
indexDefinition, dataMetaClient, parallelism, readerSchema, instantTime, 
engineContext, dataWriteConfig,
-            metadataWriteConfig, partitionRecordsFunctionOpt);
+            partitionRecordsFunctionOpt);
     return 
expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent()
         ? 
expressionIndexComputationMetadata.getExpressionIndexRecords().union(expressionIndexComputationMetadata.getPartitionStatRecordsOption().get())
         : expressionIndexComputationMetadata.getExpressionIndexRecords();
@@ -211,7 +211,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
                                                                Schema 
readerSchema, StorageConfiguration<?> storageConf,
                                                                String 
instantTime) {
     ExpressionIndexComputationMetadata expressionIndexComputationMetadata = 
SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet, 
indexDefinition,
-        metaClient, parallelism, readerSchema, instantTime, engineContext, 
dataWriteConfig, metadataWriteConfig,
+        metaClient, parallelism, readerSchema, instantTime, engineContext, 
dataWriteConfig,
         Option.of(rangeMetadata ->
             
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata,
 true, Option.of(indexDefinition.getIndexName()))));
     HoodieData<HoodieRecord> exprIndexRecords = 
expressionIndexComputationMetadata.getExpressionIndexRecords();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 88b30860e33..ffe96b1ac66 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -385,6 +385,10 @@ public class HoodieStorageConfig extends HoodieConfig {
     super();
   }
 
+  public String getBloomFilterType() {
+    return getString(BLOOM_FILTER_TYPE);
+  }
+
   public static HoodieStorageConfig.Builder newBuilder() {
     return new Builder();
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
new file mode 100644
index 00000000000..2ec7cd356e1
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class TestHoodieStorageConfig {
+  @Test
+  void testHoodieStorageConfig() {
+    String bloomFilterType = BloomFilterTypeCode.SIMPLE.name();
+    
assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(),
+        bloomFilterType.toUpperCase());
+    Properties props = new Properties();
+    props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType);
+    HoodieStorageConfig config = HoodieStorageConfig.newBuilder()
+        .fromProperties(props).build();
+    assertEquals(bloomFilterType, config.getBloomFilterType());
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
index 63ce1cb1c3e..5ea40b85592 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
@@ -2190,7 +2190,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
       HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES -> "1000"
     )
     val bloomFilterRecords = 
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
-      HoodieWriteConfig.newBuilder().withPath("a/b").build(), "",
+        HoodieStorageConfig.newBuilder().build(), "",
         
HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build())
       .getExpressionIndexRecords
     // Since there is only one partition file pair there is only one bloom 
filter record

Reply via email to