This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 225c2ab5bd09332aeeffb7a72fcdca0758181155 Author: Y Ethan Guo <[email protected]> AuthorDate: Mon Sep 11 11:11:22 2023 -0700 [HUDI-6838] Fix file writers to honor bloom filter configs (#9669) --- .../org/apache/hudi/config/HoodieIndexConfig.java | 63 ++++++---------------- .../org/apache/hudi/config/HoodieWriteConfig.java | 8 +-- .../hudi/common/config/HoodieStorageConfig.java | 41 ++++++++++++++ .../hudi/io/storage/HoodieFileWriterFactory.java | 9 ++-- .../org/apache/spark/sql/hudi/SparkHelpers.scala | 7 +-- 5 files changed, 70 insertions(+), 58 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index c77b9780548..1ed3b1c3054 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -18,11 +18,11 @@ package org.apache.hudi.config; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIndexException; @@ -42,6 +42,10 @@ import java.util.Arrays; import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES; +import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE; +import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE; +import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE; import static org.apache.hudi.config.HoodieHBaseIndexConfig.GET_BATCH_SIZE; import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE; import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME; @@ -87,29 +91,6 @@ public class HoodieIndexConfig extends HoodieConfig { + "It will take precedence over the hoodie.index.type configuration if specified"); // ***** Bloom Index configs ***** - public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE = ConfigProperty - .key("hoodie.index.bloom.num_entries") - .defaultValue("60000") - .markAdvanced() - .withDocumentation("Only applies if index type is BLOOM. " - + "This is the number of entries to be stored in the bloom filter. " - + "The rationale for the default: Assume the maxParquetFileSize is 128MB and averageRecordSize is 1kb and " - + "hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. " - + "Warning: Setting this very low, will generate a lot of false positives and index lookup " - + "will have to scan a lot more files than it has to and setting this to a very high number will " - + "increase the size every base file linearly (roughly 4KB for every 50000 entries). " - + "This config is also used with DYNAMIC bloom filter which determines the initial size for the bloom."); - - public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE = ConfigProperty - .key("hoodie.index.bloom.fpp") - .defaultValue("0.000000001") - .markAdvanced() - .withDocumentation("Only applies if index type is BLOOM. " - + "Error rate allowed given the number of entries. This is used to calculate how many bits should be " - + "assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), " - + "we like to tradeoff disk space for lower false positives. " - + "If the number of entries added to bloom filter exceeds the configured value (hoodie.index.bloom.num_entries), " - + "then this fpp may not be honored."); public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM = ConfigProperty .key("hoodie.bloom.index.parallelism") @@ -166,20 +147,6 @@ public class HoodieIndexConfig extends HoodieConfig { + "When true, bucketized bloom filtering is enabled. " + "This reduces skew seen in sort based bloom index lookup"); - public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty - .key("hoodie.bloom.index.filter.type") - .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name()) - .withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name()) - .markAdvanced() - .withDocumentation(BloomFilterTypeCode.class); - - public static final ConfigProperty<String> BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = ConfigProperty - .key("hoodie.bloom.index.filter.dynamic.max.entries") - .defaultValue("100000") - .markAdvanced() - .withDocumentation("The threshold for the maximum number of keys to record in a dynamic Bloom filter row. " - + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0."); - public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING = ConfigProperty .key("hoodie.simple.index.use.caching") .defaultValue("true") @@ -395,22 +362,22 @@ public class HoodieIndexConfig extends HoodieConfig { @Deprecated public static final String DEFAULT_INDEX_CLASS = INDEX_CLASS_NAME.defaultValue(); /** - * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead */ @Deprecated public static final String BLOOM_FILTER_NUM_ENTRIES = BLOOM_FILTER_NUM_ENTRIES_VALUE.key(); /** - * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead */ @Deprecated public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue(); /** - * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and its methods instead */ @Deprecated public static final String BLOOM_FILTER_FPP = BLOOM_FILTER_FPP_VALUE.key(); /** - * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and its methods instead */ @Deprecated public static final String DEFAULT_BLOOM_FILTER_FPP = BLOOM_FILTER_FPP_VALUE.defaultValue(); @@ -455,25 +422,25 @@ public class HoodieIndexConfig extends HoodieConfig { @Deprecated public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = BLOOM_INDEX_BUCKETIZED_CHECKING.defaultValue(); /** - * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its methods instead */ @Deprecated public static final String BLOOM_INDEX_FILTER_TYPE = BLOOM_FILTER_TYPE.key(); /** - * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its methods instead */ @Deprecated public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BLOOM_FILTER_TYPE.defaultValue(); /** - * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead */ @Deprecated - public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(); + public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(); /** - * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead + * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead */ @Deprecated - public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue(); + public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue(); /** * @deprecated Use {@link #SIMPLE_INDEX_USE_CACHING} and its methods instead */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 01b8fa55948..d3985fd70b7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1765,11 +1765,11 @@ public class HoodieWriteConfig extends HoodieConfig { } public int getBloomFilterNumEntries() { - return getInt(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE); + return getInt(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE); } public double getBloomFilterFPP() { - return getDouble(HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE); + return getDouble(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE); } public String getHbaseZkQuorum() { @@ -1849,11 +1849,11 @@ public class HoodieWriteConfig extends HoodieConfig { } public String getBloomFilterType() { - return getString(HoodieIndexConfig.BLOOM_FILTER_TYPE); + return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE); } public int getDynamicBloomFilterMaxNumEntries() { - return getInt(HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES); + return getInt(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index cec7f8f18c5..2660b0b22c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.config; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; + import javax.annotation.concurrent.Immutable; import java.io.File; @@ -170,6 +172,45 @@ public class HoodieStorageConfig extends HoodieConfig { .withDocumentation("Expected additional compression as records move from log files to parquet. Used for merge_on_read " + "table to send inserts into log files & control the size of compacted parquet file."); + // Configs that control the bloom filter that is written to the file footer + public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty + .key("hoodie.bloom.index.filter.type") + .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name()) + .withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name()) + .markAdvanced() + .withDocumentation(BloomFilterTypeCode.class); + + public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE = ConfigProperty + .key("hoodie.index.bloom.num_entries") + .defaultValue("60000") + .markAdvanced() + .withDocumentation("Only applies if index type is BLOOM. " + + "This is the number of entries to be stored in the bloom filter. " + + "The rationale for the default: Assume the maxParquetFileSize is 128MB and averageRecordSize is 1kb and " + + "hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. " + + "Warning: Setting this very low, will generate a lot of false positives and index lookup " + + "will have to scan a lot more files than it has to and setting this to a very high number will " + + "increase the size every base file linearly (roughly 4KB for every 50000 entries). " + + "This config is also used with DYNAMIC bloom filter which determines the initial size for the bloom."); + + public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE = ConfigProperty + .key("hoodie.index.bloom.fpp") + .defaultValue("0.000000001") + .markAdvanced() + .withDocumentation("Only applies if index type is BLOOM. " + + "Error rate allowed given the number of entries. This is used to calculate how many bits should be " + + "assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), " + + "we like to tradeoff disk space for lower false positives. " + + "If the number of entries added to bloom filter exceeds the configured value (hoodie.index.bloom.num_entries), " + + "then this fpp may not be honored."); + + public static final ConfigProperty<String> BLOOM_FILTER_DYNAMIC_MAX_ENTRIES = ConfigProperty + .key("hoodie.bloom.index.filter.dynamic.max.entries") + .defaultValue("100000") + .markAdvanced() + .withDocumentation("The threshold for the maximum number of keys to record in a dynamic Bloom filter row. " + + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0."); + public static final ConfigProperty<String> HOODIE_AVRO_WRITE_SUPPORT_CLASS = ConfigProperty .key("hoodie.avro.write.support.class") .defaultValue("org.apache.hudi.avro.HoodieAvroWriteSupport") diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 456383d3741..a992886fcdc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -20,8 +20,8 @@ package org.apache.hudi.io.storage; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; @@ -122,7 +122,10 @@ public class HoodieFileWriterFactory { } protected BloomFilter createBloomFilter(HoodieConfig config) { - return BloomFilterFactory.createBloomFilter(60000, 0.000000001, 100000, - BloomFilterTypeCode.DYNAMIC_V0.name()); + return BloomFilterFactory.createBloomFilter( + config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE), + config.getDoubleOrDefault(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE), + config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES), + config.getStringOrDefault(HoodieStorageConfig.BLOOM_FILTER_TYPE)); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index e9034a034b3..6917a4360bf 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -24,9 +24,9 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.client.SparkTaskContextSupplier import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.config.HoodieStorageConfig +import org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils -import org.apache.hudi.config.HoodieIndexConfig import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName @@ -41,8 +41,9 @@ object SparkHelpers { def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile) val schema: Schema = sourceRecords.get(0).getSchema - val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, - HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue); + val filter: BloomFilter = BloomFilterFactory.createBloomFilter( + BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, + BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, BLOOM_FILTER_TYPE.defaultValue); val writeSupport: HoodieAvroWriteSupport[_] = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter), new Properties()) val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport[_]] =
