This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new e763aca67 PARQUET-2226: Support merge bloom filters (#1020)
e763aca67 is described below
commit e763aca676d4154ef9dcd2cc1f6ac1c6d88db9ad
Author: ChenLiang.Lu <[email protected]>
AuthorDate: Mon Jan 16 16:56:37 2023 +0800
PARQUET-2226: Support merge bloom filters (#1020)
---
.../values/bloomfilter/BlockSplitBloomFilter.java | 27 +++++++++++++++
.../column/values/bloomfilter/BloomFilter.java | 24 +++++++++++++
.../bloomfilter/TestBlockSplitBloomFilter.java | 40 ++++++++++++++++++++++
3 files changed, 91 insertions(+)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
index 7a9b7a9db..d385ce268 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -22,6 +22,7 @@ package org.apache.parquet.column.values.bloomfilter;
import org.apache.parquet.Preconditions;
import org.apache.parquet.io.api.Binary;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -394,4 +395,30 @@ public class BlockSplitBloomFilter implements BloomFilter {
public long hash(Binary value) {
return hashFunction.hashBytes(value.getBytes());
}
+
+ @Override
+ public boolean canMergeFrom(BloomFilter otherBloomFilter) {
+ return otherBloomFilter != null
+ && getBitsetSize() == otherBloomFilter.getBitsetSize()
+ && getAlgorithm() == otherBloomFilter.getAlgorithm()
+ && getHashStrategy() == otherBloomFilter.getHashStrategy();
+ }
+
+ @Override
+ public void merge(BloomFilter otherBloomFilter) throws IOException {
+ Preconditions.checkArgument(otherBloomFilter != null,
+ "The BloomFilter to merge shouldn't be null");
+ Preconditions.checkArgument(canMergeFrom(otherBloomFilter),
+ "BloomFilters must have the same size of bitset, hashStrategy and
algorithm." +
+ "This BloomFilter's size of bitset is %s , hashStrategy is %s,
algorithm is %s ," +
+ "but the other BloomFilter's size of bitset is %s , hashStrategy is
%s, algorithm is %s.",
+ getBitsetSize(), getHashStrategy(), getAlgorithm(),
+ otherBloomFilter.getBitsetSize(), otherBloomFilter.getHashStrategy(),
otherBloomFilter.getAlgorithm());
+ ByteArrayOutputStream otherOutputStream = new ByteArrayOutputStream();
+ otherBloomFilter.writeTo(otherOutputStream);
+ byte[] otherBits = otherOutputStream.toByteArray();
+ for (int i = 0; i < otherBits.length; i++) {
+ bitset[i] |= otherBits[i];
+ }
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
index 19a3de6cf..9445615f9 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -176,4 +176,28 @@ public interface BloomFilter {
* @return compress algorithm that the bloom filter apply
*/
Compression getCompression();
+
+ /**
+ * Determines whether a given Bloom filter can be merged into this Bloom
filter. For two Bloom
+ * filters to merge, they must:
+ * <ul>
+ * <li> have the same bit size </li>
+ * <li> have the same algorithm</li>
+ * <li> have the same hash strategy</li>
+ * </ul>
+ *
+ * @param otherBloomFilter The Bloom filter to merge this Bloom filter with.
+ */
+ default boolean canMergeFrom(BloomFilter otherBloomFilter) {
+ throw new UnsupportedOperationException("Merge API is not implemented.");
+ }
+
+ /**
+ * Merges this Bloom filter with another Bloom filter by performing a
bitwise OR of the underlying bitsets
+ *
+ * @param otherBloomFilter The Bloom filter to merge this Bloom filter with.
+ */
+ default void merge(BloomFilter otherBloomFilter) throws IOException {
+ throw new UnsupportedOperationException("Merge API is not implemented.");
+ }
}
diff --git
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 6adabe79d..23668a9b6 100644
---
a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++
b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.column.values.bloomfilter;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashSet;
@@ -27,10 +28,12 @@ import java.util.Set;
import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.parquet.io.api.Binary;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -181,6 +184,43 @@ public class TestBlockSplitBloomFilter {
assertTrue(bytes < 5 * 1024 * 1024);
}
+ @Test
+ public void testMergeBloomFilter() throws IOException {
+ int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
+ BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
+ BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+ for (int i = 0; i < 1024; i++) {
+ mergedBloomFilter.insertHash(mergedBloomFilter.hash(i));
+ }
+ for (int i = 1024; i < 2048; i++) {
+ otherBloomFilter.insertHash(otherBloomFilter.hash(i));
+ // Before merging BloomFilter, `mergedBloomFilter` doesn't have any
value in `otherBloomFilter`
+ assertFalse(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+ }
+ mergedBloomFilter.merge(otherBloomFilter);
+ // After merging BloomFilter, `mergedBloomFilter` should have all values
in `otherBloomFilter`
+ for (int i = 0; i < 2048; i++) {
+ assertTrue(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+ }
+ for (int i = 2048; i < 3096; i++) {
+ assertFalse(otherBloomFilter.findHash(otherBloomFilter.hash(i)));
+ assertFalse(mergedBloomFilter.findHash(mergedBloomFilter.hash(i)));
+ }
+ }
+
+ @Test
+ public void testMergeBloomFilterFailed() throws IOException {
+ int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
+ BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+ BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes * 1024);
+ try {
+ mergedBloomFilter.merge(otherBloomFilter);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ // expected, BloomFilters should have the same size of bitsets
+ }
+ }
+
/**
* Test data is output of the following program with xxHash implementation
* from https://github.com/Cyan4973/xxHash with commit
c8c4cc0f812719ce1f5b2c291159658980e7c255