This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 97cf0b8cf PARQUET-2254: Support building bloom filter that adapts to 
the data
97cf0b8cf is described below

commit 97cf0b8cff8c4e2e829660f71e750f8d8f21d0a6
Author: Chenliang Lu <31469905+yab...@users.noreply.github.com>
AuthorDate: Wed May 24 09:32:49 2023 +0800

    PARQUET-2254: Support building bloom filter that adapts to the data
    
    This closes #1042
---
 .../apache/parquet/column/ParquetProperties.java   |  48 +++-
 .../parquet/column/impl/ColumnWriterBase.java      |   9 +-
 .../bloomfilter/AdaptiveBlockSplitBloomFilter.java | 305 +++++++++++++++++++++
 .../bloomfilter/TestBlockSplitBloomFilter.java     |  48 +++-
 parquet-hadoop/README.md                           |  20 +-
 .../apache/parquet/hadoop/ParquetFileWriter.java   |   2 +
 .../apache/parquet/hadoop/ParquetOutputFormat.java |  12 +
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  22 ++
 .../TestAdaptiveBlockSplitBloomFiltering.java      |  78 ++++++
 .../apache/parquet/hadoop/TestBloomFiltering.java  | 154 +++++++----
 10 files changed, 637 insertions(+), 61 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 3ff58a888..dda58736f 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.column;
 
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
 import java.util.Objects;
 import java.util.OptionalDouble;
 import java.util.OptionalLong;
@@ -38,8 +40,6 @@ import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.schema.MessageType;
 
-import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
-
 /**
  * This class represents all the configurable Parquet properties.
  */
@@ -59,6 +59,8 @@ public class ParquetProperties {
   public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
   public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
   public static final double DEFAULT_BLOOM_FILTER_FPP = 0.01;
+  public static final boolean DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED = false;
+  public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5;
 
   public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
 
@@ -105,6 +107,8 @@ public class ParquetProperties {
   private final ColumnProperty<Double> bloomFilterFPPs;
   private final int maxBloomFilterBytes;
   private final ColumnProperty<Boolean> bloomFilterEnabled;
+  private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
+  private final ColumnProperty<Integer> numBloomFilterCandidates;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
   private final boolean enableByteStreamSplit;
@@ -128,6 +132,8 @@ public class ParquetProperties {
     this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
     this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
     this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
+    this.adaptiveBloomFilterEnabled = 
builder.adaptiveBloomFilterEnabled.build();
+    this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
     this.pageRowCountLimit = builder.pageRowCountLimit;
     this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
     this.enableByteStreamSplit = builder.enableByteStreamSplit;
@@ -275,6 +281,14 @@ public class ParquetProperties {
     return maxBloomFilterBytes;
   }
 
+  public boolean getAdaptiveBloomFilterEnabled(ColumnDescriptor column) {
+    return adaptiveBloomFilterEnabled.getValue(column);
+  }
+
+  public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
+    return numBloomFilterCandidates.getValue(column);
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -317,6 +331,8 @@ public class ParquetProperties {
     private final ColumnProperty.Builder<Long> bloomFilterNDVs;
     private final ColumnProperty.Builder<Double> bloomFilterFPPs;
     private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+    private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
+    private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
     private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = 
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
@@ -327,6 +343,8 @@ public class ParquetProperties {
       bloomFilterEnabled = 
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
       bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
       bloomFilterFPPs = 
ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
+      adaptiveBloomFilterEnabled = 
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED);
+      numBloomFilterCandidates = 
ColumnProperty.<Integer>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER);
     }
 
     private Builder(ParquetProperties toCopy) {
@@ -344,6 +362,8 @@ public class ParquetProperties {
       this.bloomFilterNDVs = 
ColumnProperty.<Long>builder(toCopy.bloomFilterNDVs);
       this.bloomFilterFPPs = 
ColumnProperty.<Double>builder(toCopy.bloomFilterFPPs);
       this.bloomFilterEnabled = 
ColumnProperty.<Boolean>builder(toCopy.bloomFilterEnabled);
+      this.adaptiveBloomFilterEnabled = 
ColumnProperty.<Boolean>builder(toCopy.adaptiveBloomFilterEnabled);
+      this.numBloomFilterCandidates = 
ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
       this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
       this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
     }
@@ -503,6 +523,30 @@ public class ParquetProperties {
       return this;
     }
 
+    /**
+     * Whether to use adaptive bloom filter to automatically adjust the bloom 
filter size according to
+     * `parquet.bloom.filter.max.bytes`.
+     * If NDV (number of distinct values) for a specified column is set, it 
will be ignored
+     *
+     * @param enabled whether to use adaptive bloom filter
+     */
+    public Builder withAdaptiveBloomFilterEnabled(boolean enabled) {
+      this.adaptiveBloomFilterEnabled.withDefaultValue(enabled);
+      return this;
+    }
+
+    /**
+     * When `AdaptiveBloomFilter` is enabled, set how many bloom filter 
candidates to use.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param number the number of candidates
+     */
+    public Builder withBloomFilterCandidatesNumber(String columnPath, int 
number) {
+      Preconditions.checkArgument(number > 0, "Invalid candidates number for 
column \"%s\": %d", columnPath, number);
+      this.numBloomFilterCandidates.withDefaultValue(number);
+      return this;
+    }
+
     /**
      * Enable or disable the bloom filter for the specified column.
      * One may either disable bloom filters for all columns by invoking {@link 
#withBloomFilterEnabled(boolean)} with a
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 36b609972..8e11676b5 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -32,11 +32,11 @@ import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import 
org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.parquet.bytes.BytesInput;
 
 /**
  * Base implementation for {@link ColumnWriter} to be extended to specialize 
for V1 and V2 pages.
@@ -97,7 +97,12 @@ abstract class ColumnWriterBase implements ColumnWriter {
       int optimalNumOfBits = 
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
       this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, 
maxBloomFilterSize);
     } else {
-      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, 
maxBloomFilterSize);
+      if(props.getAdaptiveBloomFilterEnabled(path)) {
+        int numCandidates = props.getBloomFilterCandidatesCount(path);
+        this.bloomFilter = new 
AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, numCandidates, 
fpp.getAsDouble(), path);
+      } else {
+        this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, 
maxBloomFilterSize);
+      }
     }
   }
 
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java
new file mode 100644
index 000000000..77f8f902f
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values.
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter` 
as candidates and inserts values in
+ * the candidates at the same time. Some candidates that are too small will be 
eliminated during the insertion process.
+ * Finally we will choose the most appropriate size candidate to write out.
+ */
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of one candidate, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and also used as an approximate 
deduplication counter
+  private BloomFilterCandidate largestCandidate;
+
+  // the accumulator of the number of distinct hash values that have been 
inserted so far
+  private long numDistinctHashValues = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  // indicates the step size to find the NDV value corresponding to numBytes
+  private static final int NDV_STEP = 500;
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  private int minimumCandidateNdv = 16;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  /**
+   * @param maximumBytes  the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, int numCandidates, 
double fpp, ColumnDescriptor column) {
+    this(maximumBytes, HashStrategy.XXH64, fpp, numCandidates, column);
+  }
+
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, HashStrategy 
hashStrategy, double fpp,
+    int numCandidates, ColumnDescriptor column) {
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(maximumBytes, numCandidates, fpp);
+  }
+
+  /**
+   * This method will generate candidates according to the maximum acceptable 
bytes size of bloom filter.
+   * Because the bytes size of the candidate need to be a power of 2, here we 
set the candidate size to be
+   * a proportion of `maxBytes` like `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int numCandidates, double fpp) {
+    int candidateByteSize = calculateBoundedPowerOfTwo(maxBytes);
+    for (int i = 0; i < numCandidates; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateBoundedPowerOfTwo(candidateByteSize / 2);
+    }
+    if (candidates.isEmpty()) {
+      // `maxBytes` is too small, but at least one candidate will be 
generated, 32 bytes size and can accept 16 distinct values.
+      candidates.add(new BloomFilterCandidate(minimumCandidateNdv, 
minimumBytes, minimumBytes, maximumBytes, hashStrategy));
+    }
+    largestCandidate = 
candidates.stream().max(BloomFilterCandidate::compareTo).get();
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    while (optimalBytes < numBytes) {
+      expectedNDV += NDV_STEP;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= NDV_STEP;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bytes size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateBoundedPowerOfTwo(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the largest power of two less than 
`numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? 
Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the 
optimal bloom filter can accept {}"
+        + " distinct values, byte size is {}.",
+      columnName, numDistinctHashValues, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the 
bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Insertion has been mark as finalized, no more data is allowed!");
+    if (!largestCandidate.bloomFilter.findHash(hash)) {
+      numDistinctHashValues++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter 
(leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < 
numDistinctHashValues && candidate != largestCandidate);
+    candidates.forEach(candidate -> 
candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    return largestCandidate.bloomFilter.findHash(hash);
+  }
+
+  @Override
+  public long hash(Object value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public HashStrategy getHashStrategy() {
+    return largestCandidate.bloomFilter.getHashStrategy();
+  }
+
+  @Override
+  public Algorithm getAlgorithm() {
+    return largestCandidate.bloomFilter.getAlgorithm();
+  }
+
+  @Override
+  public Compression getCompression() {
+    return largestCandidate.bloomFilter.getCompression();
+  }
+
+  @Override
+  public long hash(int value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(long value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(double value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(float value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(Binary value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  protected class BloomFilterCandidate implements 
Comparable<BloomFilterCandidate> {
+    // the bloom filter candidate
+    final private BlockSplitBloomFilter bloomFilter;
+    // the max excepted number of distinct value in the candidate
+    final private int expectedNDV;
+
+    public BloomFilterCandidate(int expectedNDV, int candidateBytes,
+      int minimumBytes, int maximumBytes, HashStrategy hashStrategy) {
+      this.bloomFilter = new BlockSplitBloomFilter(candidateBytes, 
minimumBytes, maximumBytes, hashStrategy);
+      this.expectedNDV = expectedNDV;
+    }
+
+    public BlockSplitBloomFilter getBloomFilter() {
+      return bloomFilter;
+    }
+
+    public int getExpectedNDV() {
+      return expectedNDV;
+    }
+
+    @Override
+    public int compareTo(BloomFilterCandidate o) {
+      return this.bloomFilter.getBitsetSize() - o.bloomFilter.getBitsetSize();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      BloomFilterCandidate that = (BloomFilterCandidate) o;
+      return expectedNDV == that.expectedNDV &&
+        Objects.equals(bloomFilter, that.bloomFilter);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(bloomFilter, expectedNDV);
+    }
+  }
+}
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 23668a9b6..02a0566b7 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -18,6 +18,11 @@
  */
 package org.apache.parquet.column.values.bloomfilter;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -25,17 +30,15 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
-import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.parquet.io.api.Binary;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+
+import org.apache.parquet.io.api.Binary;
+
+import net.openhft.hashing.LongHashFunction;
 
 public class TestBlockSplitBloomFilter {
 
@@ -184,6 +187,39 @@ public class TestBlockSplitBloomFilter {
     assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testAdaptiveBloomFilter() {
+    int maxBloomFilterSize = 1024 * 1024;
+    int candidateNumber = 10;
+    AdaptiveBlockSplitBloomFilter adaptiveBloomFilter = new 
AdaptiveBlockSplitBloomFilter(maxBloomFilterSize,
+      candidateNumber, 0.01, null);
+
+    assertEquals(candidateNumber, adaptiveBloomFilter.getCandidates().size());
+
+    Set<String> existedValue = new HashSet<>();
+    while (existedValue.size() < 10000) {
+      String str = RandomStringUtils.randomAlphabetic(1, 64);
+      
adaptiveBloomFilter.insertHash(adaptiveBloomFilter.hash(Binary.fromString(str)));
+      existedValue.add(str);
+    }
+    // removed some small bloom filter
+    assertEquals(7, adaptiveBloomFilter.getCandidates().size());
+    BlockSplitBloomFilter optimalCandidate = 
adaptiveBloomFilter.optimalCandidate().getBloomFilter();
+    for (String value : existedValue) {
+      
assertTrue(optimalCandidate.findHash(optimalCandidate.hash(Binary.fromString(value))));
+    }
+
+    int maxCandidateNDV = adaptiveBloomFilter.getCandidates().stream()
+      
.max(AdaptiveBlockSplitBloomFilter.BloomFilterCandidate::compareTo).get().getExpectedNDV();
+    while (existedValue.size() < maxCandidateNDV + 1) {
+      String str = RandomStringUtils.randomAlphabetic(1, 64);
+      
adaptiveBloomFilter.insertHash(adaptiveBloomFilter.hash(Binary.fromString(str)));
+      existedValue.add(str);
+    }
+    // the number of distinct value exceeds the maximum candidate's expected 
NDV, so only the maximum candidate is kept
+    assertEquals(1, adaptiveBloomFilter.getCandidates().size());
+  }
+
   @Test
   public void testMergeBloomFilter() throws IOException {
     int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index deb82e05d..c27c5f2fc 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -197,7 +197,7 @@ This property is the length to be used for truncating 
binary values if possible
 
 **Property:** `parquet.bloom.filter.enabled`  
 **Description:** Whether to enable writing bloom filter.  
-If it is true, the bloom filter will be enable for all columns. If it is 
false, it will be disabled for all columns.  
+If it is true, the bloom filter will be enabled for all columns. If it is 
false, it will be disabled for all columns.  
 It is also possible to enable it for some columns by specifying the column 
name within the property followed by #.  
 **Default value:** `false`  
 **Example:**
@@ -211,6 +211,24 @@ conf.set("parquet.bloom.filter.enabled#column.path", 
false);
 
 ---
 
+**Property:** `parquet.bloom.filter.adaptive.enabled`  
+**Description:** Whether to enable writing adaptive bloom filter.  
+If it is true, the bloom filter will be generated with the optimal bit size 
+according to the number of real data distinct values. If it is false, it will 
not take effect.
+Note that the maximum bytes of the bloom filter will not exceed 
`parquet.bloom.filter.max.bytes` configuration (if it is 
+set too small, the generated bloom filter will not be efficient).
+**Default value:** `false`
+
+---
+
+**Property:** `parquet.bloom.filter.candidates.number`  
+**Description:** The number of candidate bloom filters written at the same 
time.  
+When `parquet.bloom.filter.adaptive.enabled` is true, multiple candidate bloom 
filters will be inserted 
+at the same time, finally a bloom filter with the optimal bit size will be 
selected and written to the file.
+**Default value:** `5`
+
+---
+
 **Property:** `parquet.bloom.filter.expected.ndv`  
 **Description:** The expected number of distinct values in a column, it is 
used to compute the optimal size of the bloom filter.  
 Note that if this property is not set, the bloom filter will use the maximum 
size.  
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 9cd7f1381..7ec38ee24 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -892,6 +892,8 @@ public class ParquetFileWriter {
       }
       if (isWriteBloomFilter) {
         currentBloomFilters.put(String.join(".", descriptor.getPath()), 
bloomFilter);
+      } else {
+        LOG.info("No need to write bloom filter because column {} data pages 
are all encoded as dictionary.", descriptor.getPath());
       }
     }
     LOG.debug("{}: write data pages", out.getPos());
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 78942b1d6..fe718c063 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.hadoop;
 
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED;
+import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED;
 import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
@@ -152,6 +153,8 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
   public static final String BLOOM_FILTER_EXPECTED_NDV = 
"parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = 
"parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.adaptive.enabled";
+  public static final String BLOOM_FILTER_CANDIDATES_NUMBER = 
"parquet.bloom.filter.candidates.number";
   public static final String PAGE_ROW_COUNT_LIMIT = 
"parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = 
"parquet.page.write-checksum.enabled";
 
@@ -228,6 +231,10 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
     return conf.getBoolean(BLOOM_FILTER_ENABLED, DEFAULT_BLOOM_FILTER_ENABLED);
   }
 
+  public static boolean getAdaptiveBloomFilterEnabled(Configuration conf) {
+    return conf.getBoolean(ADAPTIVE_BLOOM_FILTER_ENABLED, 
DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED);
+  }
+
   public static int getBlockSize(JobContext jobContext) {
     return getBlockSize(getConfiguration(jobContext));
   }
@@ -453,6 +460,7 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
         .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
         .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
         .withBloomFilterEnabled(getBloomFilterEnabled(conf))
+        .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
         .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
     new ColumnConfigParser()
@@ -462,6 +470,10 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
         .withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, 
-1L), propsBuilder::withBloomFilterNDV)
         .withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, 
ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
             propsBuilder::withBloomFilterFPP)
+        .withColumnConfig(
+          BLOOM_FILTER_CANDIDATES_NUMBER,
+          key -> conf.getInt(key, 
ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
+          propsBuilder::withBloomFilterCandidatesNumber)
         .parseConfig(conf);
 
     ParquetProperties props = propsBuilder.build();
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 869378117..740026651 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -619,6 +619,28 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * When NDV (number of distinct values) for a specified column is not set, 
whether to use
+     * `AdaptiveBloomFilter` to automatically adjust the BloomFilter size 
according to `parquet.bloom.filter.max.bytes`
+     *
+     * @param enabled whether to write bloom filter for the column
+     */
+    public SELF withAdaptiveBloomFilterEnabled(boolean enabled) {
+      encodingPropsBuilder.withAdaptiveBloomFilterEnabled(enabled);
+      return self();
+    }
+
+    /**
+     * When `AdaptiveBloomFilter` is enabled, set how many bloom filter 
candidates to use.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param number the number of candidate
+     */
+    public SELF withBloomFilterCandidateNumber(String columnPath, int number) {
+      encodingPropsBuilder.withBloomFilterCandidatesNumber(columnPath, number);
+      return self();
+    }
+
     /**
      * Sets the bloom filter enabled/disabled
      *
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java
new file mode 100644
index 000000000..ded4afc0d
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class TestAdaptiveBlockSplitBloomFiltering extends TestBloomFiltering {
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    createFiles(true);
+  }
+
+  public TestAdaptiveBlockSplitBloomFiltering(Path file, boolean isEncrypted) {
+    super(file, isEncrypted);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    super.testSimpleFiltering();
+  }
+
+  @Test
+  public void testNestedFiltering() throws IOException {
+    super.testNestedFiltering();
+  }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {
+    FileDecryptionProperties fileDecryptionProperties = 
getFileDecryptionProperties();
+    final ParquetReadOptions readOptions = 
ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
+    InputFile inputFile = HadoopInputFile.fromPath(getFile(), new 
Configuration());
+    try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, 
readOptions)) {
+      fileReader.getRowGroups().forEach(block -> {
+        BloomFilterReader bloomFilterReader = 
fileReader.getBloomFilterDataReader(block);
+        block.getColumns().stream()
+          .filter(column -> column.getBloomFilterOffset() > 0)
+          .forEach(column -> {
+            int bitsetSize = 
bloomFilterReader.readBloomFilter(column).getBitsetSize();
+            // set 10 candidates:
+            // [byteSize=2048, expectedNVD=1500], [byteSize=4096, 
expectedNVD=3000], [byteSize=6500, expectedNVD=8192],
+            // [byteSize=16384, expectedNVD=13500], [byteSize=32768, 
expectedNVD=27000] ......
+            // number of distinct values is less than 100, so the byteSize 
should be less than 2048.
+            assertTrue(bitsetSize <= 2048);
+          });
+      });
+    }
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 42a284a37..105a0327b 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -19,27 +19,15 @@
 
 package org.apache.parquet.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.io.api.Binary;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -58,9 +46,31 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.api.Binary;
 
 @RunWith(Parameterized.class)
 public class TestBloomFiltering {
@@ -197,17 +207,7 @@ public class TestBloomFiltering {
 
   private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean 
useOtherFiltering,
                                                boolean useBloomFilter) throws 
IOException {
-    FileDecryptionProperties fileDecryptionProperties = null;
-    if (isEncrypted) {
-      DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new 
DecryptionKeyRetrieverMock()
-        .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
-        .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
-        .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
-
-      fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withKeyRetriever(decryptionKeyRetrieverMock)
-        .build();
-    }
+    FileDecryptionProperties fileDecryptionProperties = 
getFileDecryptionProperties();
 
     return PhoneBookWriter.readUsers(ParquetReader.builder(new 
GroupReadSupport(), file)
       .withFilter(FilterCompat.get(filter))
@@ -219,6 +219,20 @@ public class TestBloomFiltering {
       .useColumnIndexFilter(useOtherFiltering), true);
   }
 
+  public FileDecryptionProperties getFileDecryptionProperties() {
+    if (!isEncrypted) {
+      return null;
+    }
+    DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new 
DecryptionKeyRetrieverMock()
+      .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+      .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+      .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+    return FileDecryptionProperties.builder()
+      .withKeyRetriever(decryptionKeyRetrieverMock)
+      .build();
+  }
+
   // Assumes that both lists are in the same order
   private static void assertContains(Stream<PhoneBookWriter.User> expected, 
List<PhoneBookWriter.User> actual) {
     Iterator<PhoneBookWriter.User> expIt = expected.iterator();
@@ -280,35 +294,56 @@ public class TestBloomFiltering {
     return encryptionProperties;
   }
 
-  private static void writePhoneBookToFile(Path file,
-                                           ParquetProperties.WriterVersion 
parquetVersion,
-                                           FileEncryptionProperties 
encryptionProperties) throws IOException {
+  protected static void writePhoneBookToFile(Path file,
+    ParquetProperties.WriterVersion parquetVersion,
+    FileEncryptionProperties encryptionProperties,
+    boolean useAdaptiveBloomFilter) throws IOException {
     int pageSize = DATA.size() / 100;     // Ensure that several pages will be 
created
     int rowGroupSize = pageSize * 4;    // Ensure that there are more 
row-groups created
-    PhoneBookWriter.write(ExampleParquetWriter.builder(file)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
+    ExampleParquetWriter.Builder writeBuilder = 
ExampleParquetWriter.builder(file)
+      .withWriteMode(OVERWRITE)
+      .withRowGroupSize(rowGroupSize)
+      .withPageSize(pageSize)
+      .withEncryption(encryptionProperties)
+      .withWriterVersion(parquetVersion);
+    if (useAdaptiveBloomFilter) {
+      writeBuilder
+        .withAdaptiveBloomFilterEnabled(true)
+        .withBloomFilterEnabled("location.lat", true)
+        .withBloomFilterCandidateNumber("location.lat", 10)
+        .withBloomFilterEnabled("name", true)
+        .withBloomFilterCandidateNumber("name", 10)
+        .withBloomFilterEnabled("id", true)
+        .withBloomFilterCandidateNumber("id", 10);
+    } else {
+      writeBuilder
         .withBloomFilterNDV("location.lat", 10000L)
         .withBloomFilterNDV("name", 10000L)
-        .withBloomFilterNDV("id", 10000L)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(parquetVersion),
-      DATA);
+        .withBloomFilterNDV("id", 10000L);
+    }
+    PhoneBookWriter.write(writeBuilder, DATA);
   }
 
   private static void deleteFile(Path file) throws IOException {
     file.getFileSystem(new Configuration()).delete(file, false);
   }
 
+  public Path getFile() {
+    return file;
+  }
+
   @BeforeClass
   public static void createFiles() throws IOException {
-    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, 
null);
-    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, 
null);
+    createFiles(false);
+  }
+
+  public static void createFiles(boolean useAdaptiveBloomFilter) throws 
IOException {
+    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, 
null, useAdaptiveBloomFilter);
+    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, 
null, useAdaptiveBloomFilter);
 
     FileEncryptionProperties encryptionProperties = 
getFileEncryptionProperties();
-    writePhoneBookToFile(FILE_V1_E, 
ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
-    writePhoneBookToFile(FILE_V2_E, 
ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+    writePhoneBookToFile(FILE_V1_E, 
ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties, 
useAdaptiveBloomFilter);
+    writePhoneBookToFile(FILE_V2_E, 
ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties, 
useAdaptiveBloomFilter);
   }
 
   @AfterClass
@@ -364,4 +399,23 @@ public class TestBloomFiltering {
       },
       eq(doubleColumn("location.lat"), 99.9));
   }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {
+    FileDecryptionProperties fileDecryptionProperties = 
getFileDecryptionProperties();
+    final ParquetReadOptions readOptions = 
ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
+    InputFile inputFile = HadoopInputFile.fromPath(getFile(), new 
Configuration());
+    try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, 
readOptions)) {
+      fileReader.getRowGroups().forEach(block -> {
+        BloomFilterReader bloomFilterReader = 
fileReader.getBloomFilterDataReader(block);
+        block.getColumns().stream()
+          .filter(column -> column.getBloomFilterOffset() > 0)
+          .forEach(column -> {
+            int bitsetSize = 
bloomFilterReader.readBloomFilter(column).getBitsetSize();
+            // when setting nvd to a fixed value 10000L, bitsetSize will 
always be 16384
+            assertEquals(16384, bitsetSize);
+          });
+      });
+    }
+  }
 }


Reply via email to