This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 50298c45627 [HUDI-6838] Fix file writers to honor bloom filter configs 
(#9669)
50298c45627 is described below

commit 50298c45627ce1406282da79c3e5e32b6b487685
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Sep 11 11:11:22 2023 -0700

    [HUDI-6838] Fix file writers to honor bloom filter configs (#9669)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 63 ++++++----------------
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  8 +--
 .../hudi/common/config/HoodieStorageConfig.java    | 41 ++++++++++++++
 .../hudi/io/storage/HoodieFileWriterFactory.java   |  9 ++--
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   |  7 +--
 5 files changed, 70 insertions(+), 58 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index c77b9780548..1ed3b1c3054 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.config;
 
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIndexException;
@@ -42,6 +42,10 @@ import java.util.Arrays;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.GET_BATCH_SIZE;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
@@ -87,29 +91,6 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "It will take precedence over the hoodie.index.type configuration 
if specified");
 
   // ***** Bloom Index configs *****
-  public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE = 
ConfigProperty
-      .key("hoodie.index.bloom.num_entries")
-      .defaultValue("60000")
-      .markAdvanced()
-      .withDocumentation("Only applies if index type is BLOOM. "
-          + "This is the number of entries to be stored in the bloom filter. "
-          + "The rationale for the default: Assume the maxParquetFileSize is 
128MB and averageRecordSize is 1kb and "
-          + "hence we approx a total of 130K records in a file. The default 
(60000) is roughly half of this approximation. "
-          + "Warning: Setting this very low, will generate a lot of false 
positives and index lookup "
-          + "will have to scan a lot more files than it has to and setting 
this to a very high number will "
-          + "increase the size every base file linearly (roughly 4KB for every 
50000 entries). "
-          + "This config is also used with DYNAMIC bloom filter which 
determines the initial size for the bloom.");
-
-  public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE = 
ConfigProperty
-      .key("hoodie.index.bloom.fpp")
-      .defaultValue("0.000000001")
-      .markAdvanced()
-      .withDocumentation("Only applies if index type is BLOOM. "
-          + "Error rate allowed given the number of entries. This is used to 
calculate how many bits should be "
-          + "assigned for the bloom filter and the number of hash functions. 
This is usually set very low (default: 0.000000001), "
-          + "we like to tradeoff disk space for lower false positives. "
-          + "If the number of entries added to bloom filter exceeds the 
configured value (hoodie.index.bloom.num_entries), "
-          + "then this fpp may not be honored.");
 
   public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM = 
ConfigProperty
       .key("hoodie.bloom.index.parallelism")
@@ -166,20 +147,6 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "When true, bucketized bloom filtering is enabled. "
           + "This reduces skew seen in sort based bloom index lookup");
 
-  public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
-      .key("hoodie.bloom.index.filter.type")
-      .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
-      .withValidValues(BloomFilterTypeCode.SIMPLE.name(), 
BloomFilterTypeCode.DYNAMIC_V0.name())
-      .markAdvanced()
-      .withDocumentation(BloomFilterTypeCode.class);
-
-  public static final ConfigProperty<String> 
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = ConfigProperty
-      .key("hoodie.bloom.index.filter.dynamic.max.entries")
-      .defaultValue("100000")
-      .markAdvanced()
-      .withDocumentation("The threshold for the maximum number of keys to 
record in a dynamic Bloom filter row. "
-          + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.");
-
   public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING = 
ConfigProperty
       .key("hoodie.simple.index.use.caching")
       .defaultValue("true")
@@ -395,22 +362,22 @@ public class HoodieIndexConfig extends HoodieConfig {
   @Deprecated
   public static final String DEFAULT_INDEX_CLASS = 
INDEX_CLASS_NAME.defaultValue();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods 
instead
+   * @deprecated Use {@link 
HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead
    */
   @Deprecated
   public static final String BLOOM_FILTER_NUM_ENTRIES = 
BLOOM_FILTER_NUM_ENTRIES_VALUE.key();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods 
instead
+   * @deprecated Use {@link 
HoodieStorageConfig#BLOOM_FILTER_NUM_ENTRIES_VALUE} and its methods instead
    */
   @Deprecated
   public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = 
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead
+   * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and 
its methods instead
    */
   @Deprecated
   public static final String BLOOM_FILTER_FPP = BLOOM_FILTER_FPP_VALUE.key();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_FPP_VALUE} and its methods instead
+   * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_FPP_VALUE} and 
its methods instead
    */
   @Deprecated
   public static final String DEFAULT_BLOOM_FILTER_FPP = 
BLOOM_FILTER_FPP_VALUE.defaultValue();
@@ -455,25 +422,25 @@ public class HoodieIndexConfig extends HoodieConfig {
   @Deprecated
   public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = 
BLOOM_INDEX_BUCKETIZED_CHECKING.defaultValue();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead
+   * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its 
methods instead
    */
   @Deprecated
   public static final String BLOOM_INDEX_FILTER_TYPE = BLOOM_FILTER_TYPE.key();
   /**
-   * @deprecated Use {@link #BLOOM_FILTER_TYPE} and its methods instead
+   * @deprecated Use {@link HoodieStorageConfig#BLOOM_FILTER_TYPE} and its 
methods instead
    */
   @Deprecated
   public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = 
BLOOM_FILTER_TYPE.defaultValue();
   /**
-   * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its 
methods instead
+   * @deprecated Use {@link 
HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead
    */
   @Deprecated
-  public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = 
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key();
+  public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = 
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key();
   /**
-   * @deprecated Use {@link #BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES} and its 
methods instead
+   * @deprecated Use {@link 
HoodieStorageConfig#BLOOM_FILTER_DYNAMIC_MAX_ENTRIES} and its methods instead
    */
   @Deprecated
-  public static final String 
DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = 
BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue();
+  public static final String 
DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = 
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue();
   /**
    * @deprecated Use {@link #SIMPLE_INDEX_USE_CACHING} and its methods instead
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9840e933362..5f83a67486a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1757,11 +1757,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getBloomFilterNumEntries() {
-    return getInt(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE);
+    return getInt(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE);
   }
 
   public double getBloomFilterFPP() {
-    return getDouble(HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE);
+    return getDouble(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE);
   }
 
   public String getHbaseZkQuorum() {
@@ -1841,11 +1841,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getBloomFilterType() {
-    return getString(HoodieIndexConfig.BLOOM_FILTER_TYPE);
+    return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE);
   }
 
   public int getDynamicBloomFilterMaxNumEntries() {
-    return getInt(HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
+    return getInt(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index cec7f8f18c5..2660b0b22c8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.config;
 
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+
 import javax.annotation.concurrent.Immutable;
 
 import java.io.File;
@@ -170,6 +172,45 @@ public class HoodieStorageConfig extends HoodieConfig {
       .withDocumentation("Expected additional compression as records move from 
log files to parquet. Used for merge_on_read "
           + "table to send inserts into log files & control the size of 
compacted parquet file.");
 
+  // Configs that control the bloom filter that is written to the file footer
+  public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
+      .key("hoodie.bloom.index.filter.type")
+      .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
+      .withValidValues(BloomFilterTypeCode.SIMPLE.name(), 
BloomFilterTypeCode.DYNAMIC_V0.name())
+      .markAdvanced()
+      .withDocumentation(BloomFilterTypeCode.class);
+
+  public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES_VALUE = 
ConfigProperty
+      .key("hoodie.index.bloom.num_entries")
+      .defaultValue("60000")
+      .markAdvanced()
+      .withDocumentation("Only applies if index type is BLOOM. "
+          + "This is the number of entries to be stored in the bloom filter. "
+          + "The rationale for the default: Assume the maxParquetFileSize is 
128MB and averageRecordSize is 1kb and "
+          + "hence we approx a total of 130K records in a file. The default 
(60000) is roughly half of this approximation. "
+          + "Warning: Setting this very low, will generate a lot of false 
positives and index lookup "
+          + "will have to scan a lot more files than it has to and setting 
this to a very high number will "
+          + "increase the size every base file linearly (roughly 4KB for every 
50000 entries). "
+          + "This config is also used with DYNAMIC bloom filter which 
determines the initial size for the bloom.");
+
+  public static final ConfigProperty<String> BLOOM_FILTER_FPP_VALUE = 
ConfigProperty
+      .key("hoodie.index.bloom.fpp")
+      .defaultValue("0.000000001")
+      .markAdvanced()
+      .withDocumentation("Only applies if index type is BLOOM. "
+          + "Error rate allowed given the number of entries. This is used to 
calculate how many bits should be "
+          + "assigned for the bloom filter and the number of hash functions. 
This is usually set very low (default: 0.000000001), "
+          + "we like to tradeoff disk space for lower false positives. "
+          + "If the number of entries added to bloom filter exceeds the 
configured value (hoodie.index.bloom.num_entries), "
+          + "then this fpp may not be honored.");
+
+  public static final ConfigProperty<String> BLOOM_FILTER_DYNAMIC_MAX_ENTRIES 
= ConfigProperty
+      .key("hoodie.bloom.index.filter.dynamic.max.entries")
+      .defaultValue("100000")
+      .markAdvanced()
+      .withDocumentation("The threshold for the maximum number of keys to 
record in a dynamic Bloom filter row. "
+          + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.");
+
   public static final ConfigProperty<String> HOODIE_AVRO_WRITE_SUPPORT_CLASS = 
ConfigProperty
       .key("hoodie.avro.write.support.class")
       .defaultValue("org.apache.hudi.avro.HoodieAvroWriteSupport")
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 456383d3741..a992886fcdc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -20,8 +20,8 @@ package org.apache.hudi.io.storage;
 
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -122,7 +122,10 @@ public class HoodieFileWriterFactory {
   }
 
   protected BloomFilter createBloomFilter(HoodieConfig config) {
-    return BloomFilterFactory.createBloomFilter(60000, 0.000000001, 100000,
-        BloomFilterTypeCode.DYNAMIC_V0.name());
+    return BloomFilterFactory.createBloomFilter(
+        
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE),
+        config.getDoubleOrDefault(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE),
+        
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES),
+        config.getStringOrDefault(HoodieStorageConfig.BLOOM_FILTER_TYPE));
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index e9034a034b3..6917a4360bf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -24,9 +24,9 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport
 import org.apache.hudi.client.SparkTaskContextSupplier
 import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
 import org.apache.hudi.common.config.HoodieStorageConfig
+import 
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
 BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
 import org.apache.hudi.common.util.BaseFileUtils
-import org.apache.hudi.config.HoodieIndexConfig
 import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, 
HoodieParquetConfig}
 import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -41,8 +41,9 @@ object SparkHelpers {
   def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: 
Path, destinationFile: Path, keysToSkip: Set[String]) {
     val sourceRecords = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, 
sourceFile)
     val schema: Schema = sourceRecords.get(0).getSchema
-    val filter: BloomFilter = 
BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
 HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
-      
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, 
HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
+    val filter: BloomFilter = BloomFilterFactory.createBloomFilter(
+      BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, 
BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
+      BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, 
BLOOM_FILTER_TYPE.defaultValue);
     val writeSupport: HoodieAvroWriteSupport[_] = new 
HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema),
       schema, org.apache.hudi.common.util.Option.of(filter), new Properties())
     val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport[_]] =

Reply via email to