This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new e9313638f PARQUET-2260: Bloom filter size shouldn't exceed the configured maxBytes (#1043) e9313638f is described below commit e9313638f765899555f748da211064363ae5978c Author: ChenliangLu <31469905+yab...@users.noreply.github.com> AuthorDate: Thu Apr 6 12:38:43 2023 +0800 PARQUET-2260: Bloom filter size shouldn't exceed the configured maxBytes (#1043) If parquet.bloom.filter.max.bytes configuration is not a power of 2 value, the size of the bloom filter generated will exceed this value. For example, if set parquet.bloom.filter.max.bytes as 1024 * 1024+1= 1048577 , the bytes size of bloom filter generated will be 1024 * 1024 * 2 = 2097152. This does not match the definition of the parameter. This commit fixes the issue. --- .../parquet/column/impl/ColumnWriterBase.java | 2 +- .../org/apache/parquet/hadoop/ParquetWriter.java | 11 ++++ .../apache/parquet/hadoop/TestParquetWriter.java | 73 +++++++++++++++++----- 3 files changed, 68 insertions(+), 18 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index d215fe6c8..36b609972 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -97,7 +97,7 @@ abstract class ColumnWriterBase implements ColumnWriter { int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble()); this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); } else { - this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize); + this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index f18d507ba..869378117 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -589,6 +589,17 @@ public class ParquetWriter<T> implements Closeable { return self(); } + /** + * Set max Bloom filter bytes for related columns. + * + * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column. + * @return this builder for method chaining + */ + public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) { + encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes); + return self(); + } + /** * Sets the NDV (number of distinct values) for the specified column. * diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 376da5f2b..b404b4fba 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -19,11 +19,6 @@ package org.apache.parquet.hadoop; import static java.util.Arrays.asList; -import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.Type.Repetition.REQUIRED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; @@ -34,7 +29,12 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -44,38 +44,38 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; -import org.apache.parquet.column.values.bloomfilter.BloomFilter; -import org.apache.parquet.example.data.GroupFactory; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.hadoop.util.HadoopOutputFile; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.InvalidSchemaException; -import org.apache.parquet.schema.Types; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; -import org.junit.rules.TemporaryFolder; +import org.apache.parquet.schema.Types; + +import net.openhft.hashing.LongHashFunction; public class TestParquetWriter { @@ -344,6 +344,45 @@ public class TestParquetWriter { } } + /** + * If `parquet.bloom.filter.max.bytes` is set, the bytes size of bloom filter should not + * be larger than this value + */ + @Test + public void testBloomFilterMaxBytesSize() throws IOException { + Set<String> distinctStrings = new HashSet<>(); + while (distinctStrings.size() < 1000) { + String str = RandomStringUtils.randomAlphabetic(10); + distinctStrings.add(str); + } + int maxBloomFilterBytes = 1024 * 1024 + 1; + MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withDictionaryEncoding(false) + .withBloomFilterEnabled("name", true) + .withMaxBloomFilterBytes(maxBloomFilterBytes) + .build()) { + java.util.Iterator<String> iterator = distinctStrings.iterator(); + while (iterator.hasNext()) { + writer.write(factory.newGroup().append("name", iterator.next())); + } + } + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); + assertEquals(bloomFilter.getBitsetSize(), maxBloomFilterBytes); + } + } + @Test public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,