This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e763aca67 PARQUET-2226: Support merge bloom filters (#1020)
e763aca67 is described below

commit e763aca676d4154ef9dcd2cc1f6ac1c6d88db9ad
Author: ChenLiang.Lu <[email protected]>
AuthorDate: Mon Jan 16 16:56:37 2023 +0800

    PARQUET-2226: Support merge bloom filters (#1020)
---
 .../values/bloomfilter/BlockSplitBloomFilter.java  | 27 +++++++++++++++
 .../column/values/bloomfilter/BloomFilter.java     | 24 +++++++++++++
 .../bloomfilter/TestBlockSplitBloomFilter.java     | 40 ++++++++++++++++++++++
 3 files changed, 91 insertions(+)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index 7a9b7a9db..d385ce268 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -22,6 +22,7 @@ package org.apache.parquet.column.values.bloomfilter;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.io.api.Binary;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -394,4 +395,30 @@ public class BlockSplitBloomFilter implements BloomFilter {
   public long hash(Binary value) {
     return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public boolean canMergeFrom(BloomFilter otherBloomFilter) {
+    return otherBloomFilter != null
+      && getBitsetSize() == otherBloomFilter.getBitsetSize()
+      && getAlgorithm() == otherBloomFilter.getAlgorithm()
+      && getHashStrategy() == otherBloomFilter.getHashStrategy();
+  }
+
+  @Override
+  public void merge(BloomFilter otherBloomFilter) throws IOException {
+    Preconditions.checkArgument(otherBloomFilter != null,
+      "The BloomFilter to merge shouldn't be null");
+    Preconditions.checkArgument(canMergeFrom(otherBloomFilter),
+      "BloomFilters must have the same size of bitset, hashStrategy and 
algorithm." +
+        "This BloomFilter's size of bitset is %s , hashStrategy is %s, 
algorithm is %s ," +
+        "but the other BloomFilter's size of bitset is %s , hashStrategy is 
%s, algorithm is %s.",
+      getBitsetSize(), getHashStrategy(), getAlgorithm(),
+      otherBloomFilter.getBitsetSize(), otherBloomFilter.getHashStrategy(), 
otherBloomFilter.getAlgorithm());
+    ByteArrayOutputStream otherOutputStream = new ByteArrayOutputStream();
+    otherBloomFilter.writeTo(otherOutputStream);
+    byte[] otherBits = otherOutputStream.toByteArray();
+    for (int i = 0; i < otherBits.length; i++) {
+      bitset[i] |= otherBits[i];
+    }
+  }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index 19a3de6cf..9445615f9 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -176,4 +176,28 @@ public interface BloomFilter {
    * @return compress algorithm that the bloom filter apply
    */
   Compression getCompression();
+
+  /**
+   * Determines whether a given Bloom filter can be merged into this Bloom 
filter. For two Bloom
+   * filters to merge, they must:
+   * <ul>
+   * <li> have the same bit size </li>
+   * <li> have the same algorithm</li>
+   * <li> have the same hash strategy</li>
+   * </ul>
+   *
+   * @param otherBloomFilter The Bloom filter to merge this Bloom filter with.
+   */
+  default boolean canMergeFrom(BloomFilter otherBloomFilter) {
+    throw new UnsupportedOperationException("Merge API is not implemented.");
+  }
+
+  /**
+   * Merges this Bloom filter with another Bloom filter by performing a 
bitwise OR of the underlying bitsets
+   *
+   * @param otherBloomFilter The Bloom filter to merge this Bloom filter with.
+   */
+  default void merge(BloomFilter otherBloomFilter) throws IOException {
+    throw new UnsupportedOperationException("Merge API is not implemented.");
+  }
 }
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 6adabe79d..23668a9b6 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.bloomfilter;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.HashSet;
@@ -27,10 +28,12 @@ import java.util.Set;
 import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.parquet.io.api.Binary;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -181,6 +184,43 @@ public class TestBlockSplitBloomFilter {
     assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testMergeBloomFilter() throws IOException {
+    int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
+    BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
+    BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+    for (int i = 0; i < 1024; i++) {
+      mergedBloomFilter.insertHash(mergedBloomFilter.hash(i));
+    }
+    for (int i = 1024; i < 2048; i++) {
+      otherBloomFilter.insertHash(otherBloomFilter.hash(i));
+      // Before merging BloomFilter, `mergedBloomFilter` doesn't have any 
value in `otherBloomFilter`
+      assertFalse(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+    }
+    mergedBloomFilter.merge(otherBloomFilter);
+    // After merging BloomFilter, `mergedBloomFilter` should have all values 
in `otherBloomFilter`
+    for (int i = 0; i < 2048; i++) {
+      assertTrue(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+    }
+    for (int i = 2048; i < 3096; i++) {
+      assertFalse(otherBloomFilter.findHash(otherBloomFilter.hash(i)));
+      assertFalse(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+    }
+  }
+
+  @Test
+  public void testMergeBloomFilterFailed() throws IOException {
+    int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
+    BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+    BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes * 1024);
+    try {
+      mergedBloomFilter.merge(otherBloomFilter);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      // expected, BloomFilters should have the same size of bitsets
+    }
+  }
+
   /**
    * Test data is output of the following program with xxHash implementation
    * from https://github.com/Cyan4973/xxHash with commit 
c8c4cc0f812719ce1f5b2c291159658980e7c255

Reply via email to