This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e9313638f PARQUET-2260: Bloom filter size shouldn't exceed the 
configured maxBytes (#1043)
e9313638f is described below

commit e9313638f765899555f748da211064363ae5978c
Author: ChenliangLu <31469905+yab...@users.noreply.github.com>
AuthorDate: Thu Apr 6 12:38:43 2023 +0800

    PARQUET-2260: Bloom filter size shouldn't exceed the configured maxBytes 
(#1043)
    
    If parquet.bloom.filter.max.bytes configuration is not a power of 2 value, 
the size of the bloom filter generated will exceed this value. For example, if 
set parquet.bloom.filter.max.bytes as 1024 * 1024+1= 1048577 , the bytes size 
of bloom filter generated will be 1024 * 1024 * 2 = 2097152. This does not 
match the definition of the parameter. This commit fixes the issue.
---
 .../parquet/column/impl/ColumnWriterBase.java      |  2 +-
 .../org/apache/parquet/hadoop/ParquetWriter.java   | 11 ++++
 .../apache/parquet/hadoop/TestParquetWriter.java   | 73 +++++++++++++++++-----
 3 files changed, 68 insertions(+), 18 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index d215fe6c8..36b609972 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -97,7 +97,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
       int optimalNumOfBits = 
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
       this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, 
maxBloomFilterSize);
     } else {
-      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
+      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, 
maxBloomFilterSize);
     }
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index f18d507ba..869378117 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -589,6 +589,17 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * Set max Bloom filter bytes for related columns.
+     *
+     * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a 
column.
+     * @return this builder for method chaining
+     */
+    public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+      encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes);
+      return self();
+    }
+
     /**
      * Sets the NDV (number of distinct values) for the specified column.
      *
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 376da5f2b..b404b4fba 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -19,11 +19,6 @@
 package org.apache.parquet.hadoop;
 
 import static java.util.Arrays.asList;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
 import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
@@ -34,7 +29,12 @@ import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
 import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
 import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,38 +44,38 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
-import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.InvalidSchemaException;
-import org.apache.parquet.schema.Types;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
-import org.junit.rules.TemporaryFolder;
+import org.apache.parquet.schema.Types;
+
+import net.openhft.hashing.LongHashFunction;
 
 public class TestParquetWriter {
 
@@ -344,6 +344,45 @@ public class TestParquetWriter {
     }
   }
 
+  /**
+   * If `parquet.bloom.filter.max.bytes` is set, the bytes size of bloom 
filter should not
+   * be larger than this value
+   */
+  @Test
+  public void testBloomFilterMaxBytesSize() throws IOException {
+    Set<String> distinctStrings = new HashSet<>();
+    while (distinctStrings.size() < 1000) {
+      String str = RandomStringUtils.randomAlphabetic(10);
+      distinctStrings.add(str);
+    }
+    int maxBloomFilterBytes = 1024 * 1024 + 1;
+    MessageType schema = Types.buildMessage().
+      required(BINARY).as(stringType()).named("name").named("msg");
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    File file = temp.newFile();
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withConf(conf)
+      .withDictionaryEncoding(false)
+      .withBloomFilterEnabled("name", true)
+      .withMaxBloomFilterBytes(maxBloomFilterBytes)
+      .build()) {
+      java.util.Iterator<String> iterator = distinctStrings.iterator();
+      while (iterator.hasNext()) {
+        writer.write(factory.newGroup().append("name", iterator.next()));
+      }
+    }
+    try (ParquetFileReader reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+      BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+      BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
+        .readBloomFilter(blockMetaData.getColumns().get(0));
+      assertEquals(bloomFilter.getBitsetSize(), maxBloomFilterBytes);
+    }
+  }
+
   @Test
   public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException 
{
     
testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,

Reply via email to