This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa308f603 [parquet] Support to enable parquet bloomfilter (#4479)
fa308f603 is described below
commit fa308f603c3e6fa25ca2896ee1228b8e23aac73f
Author: WenjunMin <[email protected]>
AuthorDate: Fri Nov 8 15:56:41 2024 +0800
[parquet] Support to enable parquet bloomfilter (#4479)
---
.../apache/paimon/format/FormatReadWriteTest.java | 4 +-
.../paimon/format/parquet/ColumnConfigParser.java | 77 ++++++++++++++++++++
.../parquet/writer/RowDataParquetBuilder.java | 83 +++++++++++++++-------
.../format/parquet/ParquetFormatReadWriteTest.java | 52 ++++++++++++++
4 files changed, 187 insertions(+), 29 deletions(-)
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index d393a9192..d3114cee6 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -60,8 +60,8 @@ public abstract class FormatReadWriteTest {
private final String formatType;
- private FileIO fileIO;
- private Path file;
+ protected FileIO fileIO;
+ protected Path file;
protected FormatReadWriteTest(String formatType) {
this.formatType = formatType;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
new file mode 100644
index 000000000..a4e33807f
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * Parses the specified key-values in the format of root.key#column.path from
a {@link
+ * Configuration} object.
+ *
+ * <p>NOTE: The file was copied from Apache parquet project.
+ */
+public class ColumnConfigParser {
+
+ private static class ConfigHelper<T> {
+ private final String prefix;
+ private final Function<String, T> function;
+ private final BiConsumer<String, T> consumer;
+
+ public ConfigHelper(
+ String prefix, Function<String, T> function,
BiConsumer<String, T> consumer) {
+ this.prefix = prefix;
+ this.function = function;
+ this.consumer = consumer;
+ }
+
+ public void processKey(String key) {
+ if (key.startsWith(prefix)) {
+ String columnPath = key.substring(prefix.length());
+ T value = function.apply(key);
+ consumer.accept(columnPath, value);
+ }
+ }
+ }
+
+ private final List<ConfigHelper<?>> helpers = new ArrayList<>();
+
+ public <T> ColumnConfigParser withColumnConfig(
+ String rootKey, Function<String, T> function, BiConsumer<String,
T> consumer) {
+ helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
+ return this;
+ }
+
+ public void parseConfig(Configuration conf) {
+ for (Map.Entry<String, String> entry : conf) {
+ for (ConfigHelper<?> helper : helpers) {
+ // We retrieve the value from function instead of parsing from
the string here to
+ // use the exact
+ // implementations
+ // in Configuration
+ helper.processKey(entry.getKey());
+ }
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index 26d38a1c1..da55f9494 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.writer;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.parquet.ColumnConfigParser;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
@@ -46,33 +47,61 @@ public class RowDataParquetBuilder implements
ParquetBuilder<InternalRow> {
@Override
public ParquetWriter<InternalRow> createWriter(OutputFile out, String
compression)
throws IOException {
- return new ParquetRowDataBuilder(out, rowType)
- .withConf(conf)
-
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
- .withRowGroupSize(
- conf.getLong(
- ParquetOutputFormat.BLOCK_SIZE,
ParquetWriter.DEFAULT_BLOCK_SIZE))
- .withPageSize(
- conf.getInt(ParquetOutputFormat.PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE))
- .withDictionaryPageSize(
- conf.getInt(
- ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
-
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
- .withMaxPaddingSize(
- conf.getInt(
- ParquetOutputFormat.MAX_PADDING_BYTES,
- ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
- .withDictionaryEncoding(
- conf.getBoolean(
- ParquetOutputFormat.ENABLE_DICTIONARY,
-
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
-
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
- .withWriterVersion(
- ParquetProperties.WriterVersion.fromString(
- conf.get(
- ParquetOutputFormat.WRITER_VERSION,
-
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
- .build();
+ ParquetRowDataBuilder builder =
+ new ParquetRowDataBuilder(out, rowType)
+ .withConf(conf)
+ .withCompressionCodec(
+
CompressionCodecName.fromConf(getCompression(compression)))
+ .withRowGroupSize(
+ conf.getLong(
+ ParquetOutputFormat.BLOCK_SIZE,
+ ParquetWriter.DEFAULT_BLOCK_SIZE))
+ .withPageSize(
+ conf.getInt(
+ ParquetOutputFormat.PAGE_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE))
+ .withDictionaryPageSize(
+ conf.getInt(
+
ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
+
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
+ .withMaxPaddingSize(
+ conf.getInt(
+ ParquetOutputFormat.MAX_PADDING_BYTES,
+
ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
+ .withDictionaryEncoding(
+ conf.getBoolean(
+ ParquetOutputFormat.ENABLE_DICTIONARY,
+
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
+
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
+ .withWriterVersion(
+ ParquetProperties.WriterVersion.fromString(
+ conf.get(
+
ParquetOutputFormat.WRITER_VERSION,
+
ParquetProperties.DEFAULT_WRITER_VERSION
+ .toString())))
+ .withBloomFilterEnabled(
+ conf.getBoolean(
+
ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+
ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED));
+ new ColumnConfigParser()
+ .withColumnConfig(
+ ParquetOutputFormat.ENABLE_DICTIONARY,
+ key -> conf.getBoolean(key, false),
+ builder::withDictionaryEncoding)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+ key -> conf.getBoolean(key, false),
+ builder::withBloomFilterEnabled)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV,
+ key -> conf.getLong(key, -1L),
+ builder::withBloomFilterNDV)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_FPP,
+ key -> conf.getDouble(key,
ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
+ builder::withBloomFilterFPP)
+ .parseConfig(conf);
+ return builder.build();
}
public String getCompression(String compression) {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index d5338b1e7..221d524ff 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -18,10 +18,27 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
/** A parquet {@link FormatReadWriteTest}. */
public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
@@ -35,4 +52,39 @@ public class ParquetFormatReadWriteTest extends
FormatReadWriteTest {
return new ParquetFileFormat(
new FileFormatFactory.FormatContext(new Options(), 1024,
1024));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEnableBloomFilter(boolean enabled) throws Exception {
+ Options options = new Options();
+ options.set("parquet.bloom.filter.enabled", String.valueOf(enabled));
+ ParquetFileFormat format =
+ new ParquetFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.BIGINT());
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ rowType = (RowType) rowType.notNull();
+ }
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out,
"zstd");
+ writer.addElement(GenericRow.of(1, 1L));
+ writer.addElement(GenericRow.of(2, 2L));
+ writer.addElement(GenericRow.of(3, null));
+ writer.close();
+ out.close();
+
+ try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO,
file)) {
+ ParquetMetadata parquetMetadata = reader.getFooter();
+ List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
+ for (BlockMetaData blockMetaData : blockMetaDataList) {
+ List<ColumnChunkMetaData> columnChunkMetaDataList =
blockMetaData.getColumns();
+ for (ColumnChunkMetaData columnChunkMetaData :
columnChunkMetaDataList) {
+ BloomFilter filter =
reader.readBloomFilter(columnChunkMetaData);
+ Assertions.assertThat(enabled == (filter !=
null)).isTrue();
+ }
+ }
+ }
+ }
}