This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa308f603 [parquet] Support to enable parquet bloomfilter (#4479)
fa308f603 is described below

commit fa308f603c3e6fa25ca2896ee1228b8e23aac73f
Author: WenjunMin <[email protected]>
AuthorDate: Fri Nov 8 15:56:41 2024 +0800

    [parquet] Support to enable parquet bloomfilter (#4479)
---
 .../apache/paimon/format/FormatReadWriteTest.java  |  4 +-
 .../paimon/format/parquet/ColumnConfigParser.java  | 77 ++++++++++++++++++++
 .../parquet/writer/RowDataParquetBuilder.java      | 83 +++++++++++++++-------
 .../format/parquet/ParquetFormatReadWriteTest.java | 52 ++++++++++++++
 4 files changed, 187 insertions(+), 29 deletions(-)

diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index d393a9192..d3114cee6 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -60,8 +60,8 @@ public abstract class FormatReadWriteTest {
 
     private final String formatType;
 
-    private FileIO fileIO;
-    private Path file;
+    protected FileIO fileIO;
+    protected Path file;
 
     protected FormatReadWriteTest(String formatType) {
         this.formatType = formatType;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
new file mode 100644
index 000000000..a4e33807f
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * Parses the specified key-values in the format of root.key#column.path from 
a {@link
+ * Configuration} object.
+ *
+ * <p>NOTE: The file was copied from Apache parquet project.
+ */
+public class ColumnConfigParser {
+
+    private static class ConfigHelper<T> {
+        private final String prefix;
+        private final Function<String, T> function;
+        private final BiConsumer<String, T> consumer;
+
+        public ConfigHelper(
+                String prefix, Function<String, T> function, 
BiConsumer<String, T> consumer) {
+            this.prefix = prefix;
+            this.function = function;
+            this.consumer = consumer;
+        }
+
+        public void processKey(String key) {
+            if (key.startsWith(prefix)) {
+                String columnPath = key.substring(prefix.length());
+                T value = function.apply(key);
+                consumer.accept(columnPath, value);
+            }
+        }
+    }
+
+    private final List<ConfigHelper<?>> helpers = new ArrayList<>();
+
+    public <T> ColumnConfigParser withColumnConfig(
+            String rootKey, Function<String, T> function, BiConsumer<String, 
T> consumer) {
+        helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
+        return this;
+    }
+
+    public void parseConfig(Configuration conf) {
+        for (Map.Entry<String, String> entry : conf) {
+            for (ConfigHelper<?> helper : helpers) {
+                // We retrieve the value from function instead of parsing from 
the string here to
+                // use the exact
+                // implementations
+                // in Configuration
+                helper.processKey(entry.getKey());
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index 26d38a1c1..da55f9494 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet.writer;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.parquet.ColumnConfigParser;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
 
@@ -46,33 +47,61 @@ public class RowDataParquetBuilder implements 
ParquetBuilder<InternalRow> {
     @Override
     public ParquetWriter<InternalRow> createWriter(OutputFile out, String 
compression)
             throws IOException {
-        return new ParquetRowDataBuilder(out, rowType)
-                .withConf(conf)
-                
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
-                .withRowGroupSize(
-                        conf.getLong(
-                                ParquetOutputFormat.BLOCK_SIZE, 
ParquetWriter.DEFAULT_BLOCK_SIZE))
-                .withPageSize(
-                        conf.getInt(ParquetOutputFormat.PAGE_SIZE, 
ParquetWriter.DEFAULT_PAGE_SIZE))
-                .withDictionaryPageSize(
-                        conf.getInt(
-                                ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
-                                
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
-                .withMaxPaddingSize(
-                        conf.getInt(
-                                ParquetOutputFormat.MAX_PADDING_BYTES,
-                                ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
-                .withDictionaryEncoding(
-                        conf.getBoolean(
-                                ParquetOutputFormat.ENABLE_DICTIONARY,
-                                
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
-                
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
-                .withWriterVersion(
-                        ParquetProperties.WriterVersion.fromString(
-                                conf.get(
-                                        ParquetOutputFormat.WRITER_VERSION,
-                                        
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
-                .build();
+        ParquetRowDataBuilder builder =
+                new ParquetRowDataBuilder(out, rowType)
+                        .withConf(conf)
+                        .withCompressionCodec(
+                                
CompressionCodecName.fromConf(getCompression(compression)))
+                        .withRowGroupSize(
+                                conf.getLong(
+                                        ParquetOutputFormat.BLOCK_SIZE,
+                                        ParquetWriter.DEFAULT_BLOCK_SIZE))
+                        .withPageSize(
+                                conf.getInt(
+                                        ParquetOutputFormat.PAGE_SIZE,
+                                        ParquetWriter.DEFAULT_PAGE_SIZE))
+                        .withDictionaryPageSize(
+                                conf.getInt(
+                                        
ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
+                                        
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
+                        .withMaxPaddingSize(
+                                conf.getInt(
+                                        ParquetOutputFormat.MAX_PADDING_BYTES,
+                                        
ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
+                        .withDictionaryEncoding(
+                                conf.getBoolean(
+                                        ParquetOutputFormat.ENABLE_DICTIONARY,
+                                        
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
+                        
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
+                        .withWriterVersion(
+                                ParquetProperties.WriterVersion.fromString(
+                                        conf.get(
+                                                
ParquetOutputFormat.WRITER_VERSION,
+                                                
ParquetProperties.DEFAULT_WRITER_VERSION
+                                                        .toString())))
+                        .withBloomFilterEnabled(
+                                conf.getBoolean(
+                                        
ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+                                        
ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED));
+        new ColumnConfigParser()
+                .withColumnConfig(
+                        ParquetOutputFormat.ENABLE_DICTIONARY,
+                        key -> conf.getBoolean(key, false),
+                        builder::withDictionaryEncoding)
+                .withColumnConfig(
+                        ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+                        key -> conf.getBoolean(key, false),
+                        builder::withBloomFilterEnabled)
+                .withColumnConfig(
+                        ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV,
+                        key -> conf.getLong(key, -1L),
+                        builder::withBloomFilterNDV)
+                .withColumnConfig(
+                        ParquetOutputFormat.BLOOM_FILTER_FPP,
+                        key -> conf.getDouble(key, 
ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
+                        builder::withBloomFilterFPP)
+                .parseConfig(conf);
+        return builder.build();
     }
 
     public String getCompression(String compression) {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index d5338b1e7..221d524ff 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -18,10 +18,27 @@
 
 package org.apache.paimon.format.parquet;
 
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory;
 import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 /** A parquet {@link FormatReadWriteTest}. */
 public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
@@ -35,4 +52,39 @@ public class ParquetFormatReadWriteTest extends 
FormatReadWriteTest {
         return new ParquetFileFormat(
                 new FileFormatFactory.FormatContext(new Options(), 1024, 
1024));
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEnableBloomFilter(boolean enabled) throws Exception {
+        Options options = new Options();
+        options.set("parquet.bloom.filter.enabled", String.valueOf(enabled));
+        ParquetFileFormat format =
+                new ParquetFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.BIGINT());
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            rowType = (RowType) rowType.notNull();
+        }
+
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
"zstd");
+        writer.addElement(GenericRow.of(1, 1L));
+        writer.addElement(GenericRow.of(2, 2L));
+        writer.addElement(GenericRow.of(3, null));
+        writer.close();
+        out.close();
+
+        try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, 
file)) {
+            ParquetMetadata parquetMetadata = reader.getFooter();
+            List<BlockMetaData> blockMetaDataList = 
parquetMetadata.getBlocks();
+            for (BlockMetaData blockMetaData : blockMetaDataList) {
+                List<ColumnChunkMetaData> columnChunkMetaDataList = 
blockMetaData.getColumns();
+                for (ColumnChunkMetaData columnChunkMetaData : 
columnChunkMetaDataList) {
+                    BloomFilter filter = 
reader.readBloomFilter(columnChunkMetaData);
+                    Assertions.assertThat(enabled == (filter != 
null)).isTrue();
+                }
+            }
+        }
+    }
 }

Reply via email to