This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 30b4e8f3f31 [HUDI-9369] Simplify bloom filter config passing in metadata table writer (#13253) 30b4e8f3f31 is described below commit 30b4e8f3f3102ef31dc7120af8d7538f2bdb3b21 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Fri May 2 22:27:39 2025 -0700 [HUDI-9369] Simplify bloom filter config passing in metadata table writer (#13253) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 2 +- .../apache/hudi/config/TestHoodieWriteConfig.java | 16 ++++++++ .../client/utils/SparkMetadataWriterUtils.java | 30 ++++++++------- .../SparkHoodieBackedTableMetadataWriter.java | 6 +-- .../hudi/common/config/HoodieStorageConfig.java | 4 ++ .../common/config/TestHoodieStorageConfig.java | 43 ++++++++++++++++++++++ .../hudi/feature/index/TestExpressionIndex.scala | 2 +- 7 files changed, 84 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 770f0535d00..812d083a70e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2055,7 +2055,7 @@ public class HoodieWriteConfig extends HoodieConfig { } public String getBloomFilterType() { - return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE); + return getStorageConfig().getBloomFilterType(); } public int getDynamicBloomFilterMaxNumEntries() { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 716f62d2894..90709ce8c7d 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -22,7 +22,9 @@ import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.transaction.lock.NoopLockProvider; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -55,6 +57,7 @@ import java.util.Set; import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -709,6 +712,19 @@ public class TestHoodieWriteConfig { writeConfig.getViewStorageConfig().getMaxMemoryForFileGroupMap()); } + @Test + void testBloomFilterType() { + String bloomFilterType = BloomFilterTypeCode.SIMPLE.name(); + assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(), + bloomFilterType.toUpperCase()); + Properties props = new Properties(); + props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withProperties(props).build(); + assertEquals(bloomFilterType, config.getBloomFilterType()); + } + private HoodieWriteConfig createWriteConfig(Map<String, String> configs) { final Properties properties = new Properties(); configs.forEach(properties::setProperty); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index d8ad5943138..28ae8622cdf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineType; @@ -206,10 +207,11 @@ public class SparkMetadataWriterUtils { : new ExpressionIndexComputationMetadata(colStatRecords); } - public static ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime, - HoodieIndexDefinition indexDefinition) { + public static ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingBloomFilter( + Dataset<Row> dataset, String columnToIndex, HoodieStorageConfig storageConfig, String instantTime, + HoodieIndexDefinition indexDefinition) { String indexName = indexDefinition.getIndexName(); - setBloomFilterProps(metadataWriteConfig, indexDefinition.getIndexOptions()); + setBloomFilterProps(storageConfig, indexDefinition.getIndexOptions()); // Group data using expression index metadata and then create bloom filter on the group Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()) @@ -219,22 +221,22 @@ public class SparkMetadataWriterUtils { String partition = pair.getLeft().toString(); String relativeFilePath = pair.getRight().toString(); String fileName = FSUtils.getFileName(relativeFilePath, partition); - BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig); + BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(storageConfig); iterator.forEachRemaining(row -> { byte[] key = row.getAs(columnToIndex).toString().getBytes(); bloomFilter.add(key); }); ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString())); - HoodieRecord bloomFilterRecord = createBloomFilterMetadataRecord(partition, fileName, instantTime, metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName); + HoodieRecord bloomFilterRecord = createBloomFilterMetadataRecord(partition, fileName, instantTime, storageConfig.getBloomFilterType(), bloomByteBuffer, false, indexName); return Collections.singletonList(bloomFilterRecord).iterator(); }), Encoders.kryo(HoodieRecord.class)); return new ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD())); } - private static void setBloomFilterProps(HoodieWriteConfig metadataWriteConfig, Map<String, String> indexOptions) { + private static void setBloomFilterProps(HoodieStorageConfig storageConfig, Map<String, String> indexOptions) { BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> { if (indexOptions.containsKey(sourceKey)) { - metadataWriteConfig.getProps().setProperty(targetKey, indexOptions.get(sourceKey)); + storageConfig.getProps().setProperty(targetKey, indexOptions.get(sourceKey)); } }); } @@ -305,16 +307,15 @@ public class SparkMetadataWriterUtils { * @param instantTime Instant time * @param engineContext HoodieEngineContext * @param dataWriteConfig Write Config for the data table - * @param metadataWriteConfig Write config for the metadata table * @param partitionRecordsFunctionOpt Function used to generate partition stat records for the EI. It takes the column range metadata generated for the provided partition files as input * and uses those to generate the final partition stats * @return ExpressionIndexComputationMetadata containing both EI column stat records and partition stat records if partitionRecordsFunctionOpt is provided */ - @SuppressWarnings("checkstyle:LineLength") - public static ExpressionIndexComputationMetadata getExprIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, - HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String instantTime, - HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, HoodieWriteConfig metadataWriteConfig, - Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) { + public static ExpressionIndexComputationMetadata getExprIndexRecords( + List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String instantTime, + HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, + Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) { HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; if (indexDefinition.getSourceFields().isEmpty()) { // In case there are no columns to index, bail @@ -348,7 +349,8 @@ public class SparkMetadataWriterUtils { if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { return getExpressionIndexRecordsUsingColumnStats(rowDataset, expressionIndex, columnToIndex, partitionRecordsFunctionOpt); } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { - return getExpressionIndexRecordsUsingBloomFilter(rowDataset, columnToIndex, metadataWriteConfig, instantTime, indexDefinition); + return getExpressionIndexRecordsUsingBloomFilter( + rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(), instantTime, indexDefinition); } else { throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 863a4995f6e..875ebd858ad 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -18,7 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -41,6 +40,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; import org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata; import org.apache.hudi.metrics.DistributedRegistry; @@ -198,7 +198,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // with the expression index records from the unmodified files to get the new partition stat records HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata expressionIndexComputationMetadata = SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, instantTime, engineContext, dataWriteConfig, - metadataWriteConfig, partitionRecordsFunctionOpt); + partitionRecordsFunctionOpt); return expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent() ? expressionIndexComputationMetadata.getExpressionIndexRecords().union(expressionIndexComputationMetadata.getPartitionStatRecordsOption().get()) : expressionIndexComputationMetadata.getExpressionIndexRecords(); @@ -211,7 +211,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad Schema readerSchema, StorageConfiguration<?> storageConf, String instantTime) { ExpressionIndexComputationMetadata expressionIndexComputationMetadata = SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet, indexDefinition, - metaClient, parallelism, readerSchema, instantTime, engineContext, dataWriteConfig, metadataWriteConfig, + metaClient, parallelism, readerSchema, instantTime, engineContext, dataWriteConfig, Option.of(rangeMetadata -> HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata, true, Option.of(indexDefinition.getIndexName())))); HoodieData<HoodieRecord> exprIndexRecords = expressionIndexComputationMetadata.getExpressionIndexRecords(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index 88b30860e33..ffe96b1ac66 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -385,6 +385,10 @@ public class HoodieStorageConfig extends HoodieConfig { super(); } + public String getBloomFilterType() { + return getString(BLOOM_FILTER_TYPE); + } + public static HoodieStorageConfig.Builder newBuilder() { return new Builder(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java new file mode 100644 index 00000000000..2ec7cd356e1 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.config; + +import org.apache.hudi.common.bloom.BloomFilterTypeCode; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class TestHoodieStorageConfig { + @Test + void testHoodieStorageConfig() { + String bloomFilterType = BloomFilterTypeCode.SIMPLE.name(); + assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(), + bloomFilterType.toUpperCase()); + Properties props = new Properties(); + props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType); + HoodieStorageConfig config = HoodieStorageConfig.newBuilder() + .fromProperties(props).build(); + assertEquals(bloomFilterType, config.getBloomFilterType()); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala index 63ce1cb1c3e..5ea40b85592 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala @@ -2190,7 +2190,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES -> "1000" ) val bloomFilterRecords = SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5", - HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", + HoodieStorageConfig.newBuilder().build(), "", HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build()) .getExpressionIndexRecords // Since there is only one partition file pair there is only one bloom filter record