This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 50298c45627 [HUDI-6838] Fix file writers to honor bloom filter configs
(#9669)
50298c45627 is described below
commit 50298c45627ce1406282da79c3e5e32b6b487685
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Sep 11 11:11:22 2023 -0700
[HUDI-6838] Fix file writers to honor bloom filter configs (#9669)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 63 ++++++----------------
.../org/apache/hudi/config/HoodieWriteConfig.java | 8 +--
.../hudi/common/config/HoodieStorageConfig.java | 41 ++++++++++++++
.../hudi/io/storage/HoodieFileWriterFactory.java | 9 ++--
.../org/apache/spark/sql/hudi/SparkHelpers.scala | 7 +--
5 files changed, 70 insertions(+), 58 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index c77b9780548..1ed3b1c3054 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -18,11 +18,11 @@
package org.apache.hudi.config;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIndexException;
@@ -42,6 +42,10 @@ import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.GET_BATCH_SIZE;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
@@ -87,29 +91,6 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "It will take precedence over the hoodie.index.type configuration
if specified");
// ***** Bloom Index configs *****
- public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE =
ConfigProperty
- .key("hoodie.index.bloom.num_entries")
- .defaultValue("60000")
- .markAdvanced()
- .withDocumentation("Only applies if index type is BLOOM. "
- + "This is the number of entries to be stored in the bloom filter. "
- + "The rationale for the default: Assume the maxParquetFileSize is
128MB and averageRecordSize is 1kb and "
- + "hence we approx a total of 130K records in a file. The default
(60000) is roughly half of this approximation. "
- + "Warning: Setting this very low, will generate a lot of false
positives and index lookup "
- + "will have to scan a lot more files than it has to and setting
this to a very high number will "
- + "increase the size every base file linearly (roughly 4KB for every
50000 entries). "
- + "This config is also used with DYNAMIC bloom filter which
determines the initial size for the bloom.");
-
- public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE =
ConfigProperty
- .key("hoodie.index.bloom.fpp")
- .defaultValue("0.000000001")
- .markAdvanced()
- .withDocumentation("Only applies if index type is BLOOM. "
- + "Error rate allowed given the number of entries. This is used to
calculate how many bits should be "
- + "assigned for the bloom filter and the number of hash functions.
This is usually set very low (default: 0.000000001), "
- + "we like to tradeoff disk space for lower false positives. "
- + "If the number of entries added to bloom filter exceeds the
configured value (hoodie.index.bloom.num_entries), "
- + "then this fpp may not be honored.");
public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM =
ConfigProperty
.key("hoodie.bloom.index.parallelism")
@@ -166,20 +147,6 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "When true, bucketized bloom filtering is enabled. "
+ "This reduces skew seen in sort based bloom index lookup");
- public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
- .key("hoodie.bloom.index.filter.type")
- .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
- .withValidValues(BloomFilterTypeCode.SIMPLE.name(),
BloomFilterTypeCode.DYNAMIC_V0.name())
- .markAdvanced()
- .withDocumentation(BloomFilterTypeCode.class);
-
- public static final ConfigProperty<String>
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = ConfigProperty
- .key("hoodie.bloom.index.filter.dynamic.max.entries")
- .defaultValue("100000")
- .markAdvanced()
- .withDocumentation("The threshold for the maximum number of keys to
record in a dynamic Bloom filter row. "
- + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.");
-
public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING =
ConfigProperty
.key("hoodie.simple.index.use.caching")
.defaultValue("true")
@@ -395,22 +362,22 @@ public class HoodieIndexConfig extends HoodieConfig {
@Deprecated
public static final String DEFAULT_INDEX_CLASS =
INDEX_CLASS_NAME.defaultValue();
/**
- * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods
instead
+ * @deprecated Use {@link
HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead
*/
@Deprecated
public static final String BLOOM_FILTER_NUM_ENTRIES =
BLOOM_FILTER_NUM_ENTRIES_VALUE.key();
/**
- * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods
instead
+ * @deprecated Use {@link
HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead
*/
@Deprecated
public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES =
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue();
/**
- * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead
+ * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and
its methods instead
*/
@Deprecated
public static final String BLOOM_FILTER_FPP = BLOOM_FILTER_FPP_VALUE.key();
/**
- * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead
+ * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and
its methods instead
*/
@Deprecated
public static final String DEFAULT_BLOOM_FILTER_FPP =
BLOOM_FILTER_FPP_VALUE.defaultValue();
@@ -455,25 +422,25 @@ public class HoodieIndexConfig extends HoodieConfig {
@Deprecated
public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING =
BLOOM_INDEX_BUCKETIZED_CHECKING.defaultValue();
/**
- * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead
+ * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its
methods instead
*/
@Deprecated
public static final String BLOOM_INDEX_FILTER_TYPE = BLOOM_FILTER_TYPE.key();
/**
- * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead
+ * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its
methods instead
*/
@Deprecated
public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE =
BLOOM_FILTER_TYPE.defaultValue();
/**
- * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its
methods instead
+ * @deprecated Use {@link
HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead
*/
@Deprecated
- public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES =
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key();
+ public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES =
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key();
/**
- * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its
methods instead
+ * @deprecated Use {@link
HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead
*/
@Deprecated
- public static final String
DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES =
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue();
+ public static final String
DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES =
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue();
/**
* @deprecated Use {@link #SIMPLE_INDEX_USE_CACHING} and its methods instead
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9840e933362..5f83a67486a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1757,11 +1757,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getBloomFilterNumEntries() {
- return getInt(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE);
+ return getInt(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE);
}
public double getBloomFilterFPP() {
- return getDouble(HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE);
+ return getDouble(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE);
}
public String getHbaseZkQuorum() {
@@ -1841,11 +1841,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public String getBloomFilterType() {
- return getString(HoodieIndexConfig.BLOOM_FILTER_TYPE);
+ return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE);
}
public int getDynamicBloomFilterMaxNumEntries() {
- return getInt(HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
+ return getInt(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index cec7f8f18c5..2660b0b22c8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -18,6 +18,8 @@
package org.apache.hudi.common.config;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+
import javax.annotation.concurrent.Immutable;
import java.io.File;
@@ -170,6 +172,45 @@ public class HoodieStorageConfig extends HoodieConfig {
.withDocumentation("Expected additional compression as records move from
log files to parquet. Used for merge_on_read "
+ "table to send inserts into log files & control the size of
compacted parquet file.");
+ // Configs that control the bloom filter that is written to the file footer
+ public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
+ .key("hoodie.bloom.index.filter.type")
+ .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
+ .withValidValues(BloomFilterTypeCode.SIMPLE.name(),
BloomFilterTypeCode.DYNAMIC_V0.name())
+ .markAdvanced()
+ .withDocumentation(BloomFilterTypeCode.class);
+
+ public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE =
ConfigProperty
+ .key("hoodie.index.bloom.num_entries")
+ .defaultValue("60000")
+ .markAdvanced()
+ .withDocumentation("Only applies if index type is BLOOM. "
+ + "This is the number of entries to be stored in the bloom filter. "
+ + "The rationale for the default: Assume the maxParquetFileSize is
128MB and averageRecordSize is 1kb and "
+ + "hence we approx a total of 130K records in a file. The default
(60000) is roughly half of this approximation. "
+ + "Warning: Setting this very low, will generate a lot of false
positives and index lookup "
+ + "will have to scan a lot more files than it has to and setting
this to a very high number will "
+ + "increase the size every base file linearly (roughly 4KB for every
50000 entries). "
+ + "This config is also used with DYNAMIC bloom filter which
determines the initial size for the bloom.");
+
+ public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE =
ConfigProperty
+ .key("hoodie.index.bloom.fpp")
+ .defaultValue("0.000000001")
+ .markAdvanced()
+ .withDocumentation("Only applies if index type is BLOOM. "
+ + "Error rate allowed given the number of entries. This is used to
calculate how many bits should be "
+ + "assigned for the bloom filter and the number of hash functions.
This is usually set very low (default: 0.000000001), "
+ + "we like to tradeoff disk space for lower false positives. "
+ + "If the number of entries added to bloom filter exceeds the
configured value (hoodie.index.bloom.num_entries), "
+ + "then this fpp may not be honored.");
+
+ public static final ConfigProperty<String> BLOOM_FILTER_DYNAMIC_MAX_ENTRIES
= ConfigProperty
+ .key("hoodie.bloom.index.filter.dynamic.max.entries")
+ .defaultValue("100000")
+ .markAdvanced()
+ .withDocumentation("The threshold for the maximum number of keys to
record in a dynamic Bloom filter row. "
+ + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.");
+
public static final ConfigProperty<String> HOODIE_AVRO_WRITE_SUPPORT_CLASS =
ConfigProperty
.key("hoodie.avro.write.support.class")
.defaultValue("org.apache.hudi.avro.HoodieAvroWriteSupport")
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 456383d3741..a992886fcdc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -20,8 +20,8 @@ package org.apache.hudi.io.storage;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -122,7 +122,10 @@ public class HoodieFileWriterFactory {
}
protected BloomFilter createBloomFilter(HoodieConfig config) {
- return BloomFilterFactory.createBloomFilter(60000, 0.000000001, 100000,
- BloomFilterTypeCode.DYNAMIC_V0.name());
+ return BloomFilterFactory.createBloomFilter(
+
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE),
+ config.getDoubleOrDefault(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE),
+
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES),
+ config.getStringOrDefault(HoodieStorageConfig.BLOOM_FILTER_TYPE));
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index e9034a034b3..6917a4360bf 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -24,9 +24,9 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.config.HoodieStorageConfig
+import
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
-import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter,
HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -41,8 +41,9 @@ object SparkHelpers {
def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile:
Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf,
sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
- val filter: BloomFilter =
BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
-
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt,
HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
+ val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
+ BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
+ BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt,
BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport[_] = new
HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema),
schema, org.apache.hudi.common.util.Option.of(filter), new Properties())
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport[_]] =