This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new e9313638f PARQUET-2260: Bloom filter size shouldn't exceed the
configured maxBytes (#1043)
e9313638f is described below
commit e9313638f765899555f748da211064363ae5978c
Author: ChenliangLu <[email protected]>
AuthorDate: Thu Apr 6 12:38:43 2023 +0800
PARQUET-2260: Bloom filter size shouldn't exceed the configured maxBytes
(#1043)
If parquet.bloom.filter.max.bytes configuration is not a power of 2 value,
the size of the bloom filter generated will exceed this value. For example, if
set parquet.bloom.filter.max.bytes as 1024 * 1024+1= 1048577 , the bytes size
of bloom filter generated will be 1024 * 1024 * 2 = 2097152. This does not
match the definition of the parameter. This commit fixes the issue.
---
.../parquet/column/impl/ColumnWriterBase.java | 2 +-
.../org/apache/parquet/hadoop/ParquetWriter.java | 11 ++++
.../apache/parquet/hadoop/TestParquetWriter.java | 73 +++++++++++++++++-----
3 files changed, 68 insertions(+), 18 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index d215fe6c8..36b609972 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -97,7 +97,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
int optimalNumOfBits =
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8,
maxBloomFilterSize);
} else {
- this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
+ this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize,
maxBloomFilterSize);
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index f18d507ba..869378117 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -589,6 +589,17 @@ public class ParquetWriter<T> implements Closeable {
return self();
}
+ /**
+ * Set max Bloom filter bytes for related columns.
+ *
+ * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a
column.
+ * @return this builder for method chaining
+ */
+ public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+ encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes);
+ return self();
+ }
+
/**
* Sets the NDV (number of distinct values) for the specified column.
*
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 376da5f2b..b404b4fba 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -19,11 +19,6 @@
package org.apache.parquet.hadoop;
import static java.util.Arrays.asList;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
@@ -34,7 +29,12 @@ import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -44,38 +44,38 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.InvalidSchemaException;
-import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
-import org.junit.rules.TemporaryFolder;
+import org.apache.parquet.schema.Types;
+
+import net.openhft.hashing.LongHashFunction;
public class TestParquetWriter {
@@ -344,6 +344,45 @@ public class TestParquetWriter {
}
}
+ /**
+ * If `parquet.bloom.filter.max.bytes` is set, the bytes size of bloom
filter should not
+ * be larger than this value
+ */
+ @Test
+ public void testBloomFilterMaxBytesSize() throws IOException {
+ Set<String> distinctStrings = new HashSet<>();
+ while (distinctStrings.size() < 1000) {
+ String str = RandomStringUtils.randomAlphabetic(10);
+ distinctStrings.add(str);
+ }
+ int maxBloomFilterBytes = 1024 * 1024 + 1;
+ MessageType schema = Types.buildMessage().
+ required(BINARY).as(stringType()).named("name").named("msg");
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+ GroupFactory factory = new SimpleGroupFactory(schema);
+ File file = temp.newFile();
+ file.delete();
+ Path path = new Path(file.getAbsolutePath());
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withDictionaryEncoding(false)
+ .withBloomFilterEnabled("name", true)
+ .withMaxBloomFilterBytes(maxBloomFilterBytes)
+ .build()) {
+ java.util.Iterator<String> iterator = distinctStrings.iterator();
+ while (iterator.hasNext()) {
+ writer.write(factory.newGroup().append("name", iterator.next()));
+ }
+ }
+ try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+ BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+ BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
+ .readBloomFilter(blockMetaData.getColumns().get(0));
+ assertEquals(bloomFilter.getBitsetSize(), maxBloomFilterBytes);
+ }
+ }
+
@Test
public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException
{
testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,