This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 942d35d9c [core] Add bloom filter for file index (#3141)
942d35d9c is described below
commit 942d35d9c0a3f7b1c87367232cc5974e18aa9aad
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 8 11:17:05 2024 +0800
[core] Add bloom filter for file index (#3141)
---
paimon-common/pom.xml | 11 ++
.../apache/paimon/fileindex/FileIndexFormat.java | 9 +-
.../apache/paimon/fileindex/FileIndexReader.java | 2 -
.../org/apache/paimon/fileindex/FileIndexer.java | 10 +-
.../bloomfilter/BloomFilterFileIndex.java | 127 +++++++++++++
.../paimon/fileindex/bloomfilter/FastHash.java | 199 +++++++++++++++++++++
.../java/org/apache/paimon/options/Options.java | 4 +
.../org/apache/paimon/utils/BloomFilter64.java | 84 +++++++++
paimon-common/src/main/resources/META-INF/NOTICE | 1 +
.../bloomfilter/BloomFilterFileIndexTest.java | 127 +++++++++++++
.../fileindex/bloomfilter/FastHashVisitorTest.java | 103 +++++++++++
.../org/apache/paimon/utils/BloomFilter64Test.java | 60 +++++++
12 files changed, 730 insertions(+), 7 deletions(-)
diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index 95d6458d6..38f89999f 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -141,6 +141,12 @@ under the License.
<version>1.0.5</version>
</dependency>
+ <dependency>
+ <groupId>net.openhft</groupId>
+ <artifactId>zero-allocation-hashing</artifactId>
+ <version>0.16</version>
+ </dependency>
+
<!-- Test -->
<dependency>
@@ -273,6 +279,7 @@ under the License.
<include>org.codehaus.janino:*</include>
<include>it.unimi.dsi:fastutil</include>
<include>org.roaringbitmap:RoaringBitmap</include>
+
<include>net.openhft:zero-allocation-hashing</include>
</includes>
</artifactSet>
<filters>
@@ -312,6 +319,10 @@ under the License.
<pattern>org.roaringbitmap</pattern>
<shadedPattern>org.apache.paimon.shade.org.roaringbitmap</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>net.openhft.hashing</pattern>
+
<shadedPattern>org.apache.paimon.shade.net.openhft.hashing</shadedPattern>
+ </relocation>
</relocations>
<minimizeJar>true</minimizeJar>
</configuration>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index c9b827ee7..a7a3c534e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.fileindex;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
@@ -241,9 +242,11 @@ public final class FileIndexFormat {
return readColumnInputStream(columnName)
.map(
serializedBytes ->
- FileIndexer.create(type,
fields.get(columnName).type())
- .createReader()
- .recoverFrom(serializedBytes))
+ FileIndexer.create(
+ type,
+
fields.get(columnName).type(),
+ new Options())
+ .createReader(serializedBytes))
.orElse(null);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index 6d9404564..8a881b4b8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -29,8 +29,6 @@ import java.util.List;
*/
public interface FileIndexReader extends FunctionVisitor<Boolean> {
- FileIndexReader recoverFrom(byte[] serializedBytes);
-
@Override
default Boolean visitIsNotNull(FieldRef fieldRef) {
return true;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
index e7e3d40bf..fa3085d79 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
@@ -18,17 +18,23 @@
package org.apache.paimon.fileindex;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
+import static
org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex.BLOOM_FILTER;
+
/** File index interface. To build a file index. */
public interface FileIndexer {
FileIndexWriter createWriter();
- FileIndexReader createReader();
+ FileIndexReader createReader(byte[] serializedBytes);
- static FileIndexer create(String type, DataType dataType) {
+ static FileIndexer create(String type, DataType dataType, Options options)
{
switch (type) {
+ case BLOOM_FILTER:
+ return new BloomFilterFileIndex(dataType, options);
default:
throw new RuntimeException("Doesn't support filter type: " +
type);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
new file mode 100644
index 000000000..c0486d535
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.BloomFilter64;
+
+import org.apache.hadoop.util.bloom.HashFunction;
+
+import java.util.BitSet;
+
+/**
+ * Bloom filter for file index.
+ *
+ * <p>Note: This class use {@link BloomFilter64} as a base filter. Store the
num hash function (one
+ * integer) and bit set bytes only. Use {@link HashFunction} to hash the
objects, which hash bytes
+ * type(like varchar, binary, etc.) using xx hash, hash numeric type by
specified number hash(see
+ *
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).
+ */
+public class BloomFilterFileIndex implements FileIndexer {
+
+ public static final String BLOOM_FILTER = "bloom";
+
+ private static final int DEFAULT_ITEMS = 1_000_000;
+ private static final double DEFAULT_FPP = 0.1;
+
+ private static final String ITEMS = "items";
+ private static final String FPP = "fpp";
+
+ private final DataType dataType;
+ private final int items;
+ private final double fpp;
+
+ public BloomFilterFileIndex(DataType dataType, Options options) {
+ this.dataType = dataType;
+ this.items = options.getInteger(ITEMS, DEFAULT_ITEMS);
+ this.fpp = options.getDouble(FPP, DEFAULT_FPP);
+ }
+
+ public String name() {
+ return BLOOM_FILTER;
+ }
+
+ @Override
+ public FileIndexWriter createWriter() {
+ return new Writer(dataType, items, fpp);
+ }
+
+ @Override
+ public FileIndexReader createReader(byte[] serializedBytes) {
+ return new Reader(dataType, serializedBytes);
+ }
+
+ private static class Writer implements FileIndexWriter {
+
+ private final BloomFilter64 filter;
+ private final FastHash hashFunction;
+
+ public Writer(DataType type, int items, double fpp) {
+ this.filter = new BloomFilter64(items, fpp);
+ this.hashFunction = FastHash.getHashFunction(type);
+ }
+
+ @Override
+ public void write(Object key) {
+ filter.addHash(hashFunction.hash(key));
+ }
+
+ @Override
+ public byte[] serializedBytes() {
+ int numHashFunctions = filter.getNumHashFunctions();
+ byte[] bytes = filter.getBitSet().toByteArray();
+ byte[] serialized = new byte[bytes.length + Integer.BYTES];
+ serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
+ serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
+ serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
+ serialized[3] = (byte) (numHashFunctions & 0xFF);
+ System.arraycopy(bytes, 0, serialized, 4, bytes.length);
+ return serialized;
+ }
+ }
+
+ private static class Reader implements FileIndexReader {
+
+ private final BloomFilter64 filter;
+ private final FastHash hashFunction;
+
+ public Reader(DataType type, byte[] serializedBytes) {
+ int numHashFunctions =
+ ((serializedBytes[0] << 24)
+ + (serializedBytes[1] << 16)
+ + (serializedBytes[2] << 8)
+ + serializedBytes[3]);
+ byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
+ System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
+ BitSet bitSet = BitSet.valueOf(bytes);
+ this.filter = new BloomFilter64(numHashFunctions, bitSet);
+ this.hashFunction = FastHash.getHashFunction(type);
+ }
+
+ @Override
+ public Boolean visitEqual(FieldRef fieldRef, Object key) {
+ return filter.testHash(hashFunction.hash(key));
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
new file mode 100644
index 000000000..d76b86c2e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import net.openhft.hashing.LongHashFunction;
+
+/** Hash object to 64 bits hash code. */
+public interface FastHash {
+
+ long hash(Object o);
+
+ static FastHash getHashFunction(DataType type) {
+ return type.accept(FastHashVisitor.INSTANCE);
+ }
+
+ /** Fast hash for object differs to DataType. */
+ class FastHashVisitor implements DataTypeVisitor<FastHash> {
+
+ private static final FastHashVisitor INSTANCE = new FastHashVisitor();
+
+ @Override
+ public FastHash visit(CharType charType) {
+ return o -> hash64(((BinaryString) o).toBytes());
+ }
+
+ @Override
+ public FastHash visit(VarCharType varCharType) {
+ return o -> hash64(((BinaryString) o).toBytes());
+ }
+
+ @Override
+ public FastHash visit(BooleanType booleanType) {
+ throw new UnsupportedOperationException("Does not support type
boolean");
+ }
+
+ @Override
+ public FastHash visit(BinaryType binaryType) {
+ return o -> hash64((byte[]) o);
+ }
+
+ @Override
+ public FastHash visit(VarBinaryType varBinaryType) {
+ return o -> hash64((byte[]) o);
+ }
+
+ @Override
+ public FastHash visit(DecimalType decimalType) {
+ throw new UnsupportedOperationException("Does not support
decimal");
+ }
+
+ @Override
+ public FastHash visit(TinyIntType tinyIntType) {
+ return o -> getLongHash((byte) o);
+ }
+
+ @Override
+ public FastHash visit(SmallIntType smallIntType) {
+ return o -> getLongHash((short) o);
+ }
+
+ @Override
+ public FastHash visit(IntType intType) {
+ return o -> getLongHash((int) o);
+ }
+
+ @Override
+ public FastHash visit(BigIntType bigIntType) {
+ return o -> getLongHash((long) o);
+ }
+
+ @Override
+ public FastHash visit(FloatType floatType) {
+ return o -> getLongHash(Float.floatToIntBits((float) o));
+ }
+
+ @Override
+ public FastHash visit(DoubleType doubleType) {
+ return o -> getLongHash(Double.doubleToLongBits((double) o));
+ }
+
+ @Override
+ public FastHash visit(DateType dateType) {
+ return o -> getLongHash((int) o);
+ }
+
+ @Override
+ public FastHash visit(TimeType timeType) {
+ return o -> getLongHash((int) o);
+ }
+
+ @Override
+ public FastHash visit(TimestampType timestampType) {
+ final int precision = timestampType.getPrecision();
+ return o -> {
+ if (o == null) {
+ return 0;
+ }
+ if (precision <= 3) {
+ return getLongHash(((Timestamp) o).getMillisecond());
+ }
+
+ return getLongHash(((Timestamp) o).toMicros());
+ };
+ }
+
+ @Override
+ public FastHash visit(LocalZonedTimestampType localZonedTimestampType)
{
+ final int precision = localZonedTimestampType.getPrecision();
+ return o -> {
+ if (o == null) {
+ return 0;
+ }
+ if (precision <= 3) {
+ return getLongHash(((Timestamp) o).getMillisecond());
+ }
+
+ return getLongHash(((Timestamp) o).toMicros());
+ };
+ }
+
+ @Override
+ public FastHash visit(ArrayType arrayType) {
+ throw new UnsupportedOperationException("Does not support type
array");
+ }
+
+ @Override
+ public FastHash visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException("Does not support type
mutiset");
+ }
+
+ @Override
+ public FastHash visit(MapType mapType) {
+ throw new UnsupportedOperationException("Does not support type
map");
+ }
+
+ @Override
+ public FastHash visit(RowType rowType) {
+ throw new UnsupportedOperationException("Does not support type
row");
+ }
+
+ // Thomas Wang's integer hash function
+ //
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+ static long getLongHash(long key) {
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key = key ^ (key >> 24);
+ key = (key + (key << 3)) + (key << 8); // key * 265
+ key = key ^ (key >> 14);
+ key = (key + (key << 2)) + (key << 4); // key * 21
+ key = key ^ (key >> 28);
+ key = key + (key << 31);
+ return key;
+ }
+
+ static long hash64(byte[] data) {
+ return LongHashFunction.xx().hashBytes(data);
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index d658ce123..161eef067 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -167,6 +167,10 @@ public class Options implements Serializable {
return
getRawValue(key).map(OptionsUtils::convertToInt).orElse(defaultValue);
}
+ public synchronized double getDouble(String key, double defaultValue) {
+ return
getRawValue(key).map(OptionsUtils::convertToDouble).orElse(defaultValue);
+ }
+
public synchronized String getString(String key, String defaultValue) {
return
getRawValue(key).map(OptionsUtils::convertToString).orElse(defaultValue);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
new file mode 100644
index 000000000..75b8661a2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.BitSet;
+
+/** Bloom filter 64 handle 64 bits hash. */
+public final class BloomFilter64 {
+
+ private final BitSet bitSet;
+ private final int numBits;
+ private final int numHashFunctions;
+
+ public BloomFilter64(long items, double fpp) {
+ int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+ this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+ this.numHashFunctions =
+ Math.max(1, (int) Math.round((double) numBits / items *
Math.log(2)));
+ this.bitSet = new BitSet(numBits);
+ }
+
+ public BloomFilter64(int numHashFunctions, BitSet bitSet) {
+ this.numHashFunctions = numHashFunctions;
+ this.numBits = bitSet.size();
+ this.bitSet = bitSet;
+ }
+
+ public void addHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ bitSet.set(pos);
+ }
+ }
+
+ public boolean testHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ if (!bitSet.get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int getNumHashFunctions() {
+ return numHashFunctions;
+ }
+
+ public BitSet getBitSet() {
+ return bitSet;
+ }
+}
diff --git a/paimon-common/src/main/resources/META-INF/NOTICE
b/paimon-common/src/main/resources/META-INF/NOTICE
index 1f8f8a058..cd1ab89b8 100644
--- a/paimon-common/src/main/resources/META-INF/NOTICE
+++ b/paimon-common/src/main/resources/META-INF/NOTICE
@@ -14,3 +14,4 @@ You find them under licenses/LICENSE.antlr-runtime and
licenses/LICENSE.janino.
- org.codehaus.janino:janino:3.0.11
- org.codehaus.janino:commons-compiler:3.0.11
- it.unimi.dsi:fastutil:8.5.12
+- net.openhft:zero-allocation-hashing:0.16
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
new file mode 100644
index 000000000..9834598c8
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/** Tests for {@link BloomFilterFileIndex}. */
+public class BloomFilterFileIndexTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testAddFindByRandom() {
+ BloomFilterFileIndex filter =
+ new BloomFilterFileIndex(
+ DataTypes.BYTES(),
+ new Options(
+ new HashMap<String, String>() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ }
+ }));
+ FileIndexWriter writer = filter.createWriter();
+ List<byte[]> testData = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ testData.add(random());
+ }
+
+ // test empty bytes
+ testData.add(new byte[0]);
+
+ testData.forEach(writer::write);
+
+ FileIndexReader reader = filter.createReader(writer.serializedBytes());
+
+ for (byte[] bytes : testData) {
+ Assertions.assertThat(reader.visitEqual(null, bytes)).isTrue();
+ }
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ byte[] ra = random();
+ if (reader.visitEqual(null, ra)) {
+ errorCount++;
+ }
+ }
+
+ // ffp should be less than 0.03
+ Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+ }
+
+ @Test
+ public void testAddFindByRandomLong() {
+ BloomFilterFileIndex filter =
+ new BloomFilterFileIndex(
+ DataTypes.BIGINT(),
+ new Options(
+ new HashMap<String, String>() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ }
+ }));
+ FileIndexWriter writer = filter.createWriter();
+ List<Long> testData = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ testData.add(RANDOM.nextLong());
+ }
+
+ testData.forEach(writer::write);
+
+ FileIndexReader reader = filter.createReader(writer.serializedBytes());
+
+ for (Long value : testData) {
+ Assertions.assertThat(reader.visitEqual(null, value)).isTrue();
+ }
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ Long ra = RANDOM.nextLong();
+ if (reader.visitEqual(null, ra)) {
+ errorCount++;
+ }
+ }
+
+ // ffp should be less than 0.03
+ Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+ }
+
+ private byte[] random() {
+ byte[] b = new byte[Math.abs(RANDOM.nextInt(400) + 1)];
+ RANDOM.nextBytes(b);
+ return b;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
new file mode 100644
index 000000000..a661da8b6
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+/** Test for {@link FastHash.FastHashVisitor}. */
+public class FastHashVisitorTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testTinyIntType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.TINYINT());
+ byte c = (byte) RANDOM.nextInt();
+
Assertions.assertThat(function.hash(c)).isEqualTo(FastHash.FastHashVisitor.getLongHash(c));
+ }
+
+ @Test
+ public void testSmallIntType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.SMALLINT());
+ short c = (short) RANDOM.nextInt();
+
Assertions.assertThat(function.hash(c)).isEqualTo(FastHash.FastHashVisitor.getLongHash(c));
+ }
+
+ @Test
+ public void testIntType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.INT());
+ int c = RANDOM.nextInt();
+ Assertions.assertThat(function.hash(c))
+ .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+ }
+
+ @Test
+ public void testBigIntType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.BIGINT());
+ long c = RANDOM.nextLong();
+ Assertions.assertThat(function.hash(c))
+ .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+ }
+
+ @Test
+ public void testFloatType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.FLOAT());
+ float c = RANDOM.nextFloat();
+ Assertions.assertThat(function.hash(c))
+
.isEqualTo((FastHash.FastHashVisitor.getLongHash(Float.floatToIntBits(c))));
+ }
+
+ @Test
+ public void testDoubleType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.DOUBLE());
+ double c = RANDOM.nextDouble();
+ Assertions.assertThat(function.hash(c))
+
.isEqualTo((FastHash.FastHashVisitor.getLongHash(Double.doubleToLongBits(c))));
+ }
+
+ @Test
+ public void testDateType() {
+ FastHash function = FastHash.getHashFunction(DataTypes.DATE());
+ int c = RANDOM.nextInt();
+ Assertions.assertThat(function.hash(c))
+ .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+ }
+
+ @Test
+ public void testTimestampType() {
+ FastHash function =
FastHash.getHashFunction(DataTypes.TIMESTAMP_MILLIS());
+ Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis());
+ Assertions.assertThat(function.hash(c))
+
.isEqualTo((FastHash.FastHashVisitor.getLongHash(c.getMillisecond())));
+ }
+
+ @Test
+ public void testLocalZonedTimestampType() {
+ FastHash function =
FastHash.getHashFunction(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+ Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis());
+ Assertions.assertThat(function.hash(c))
+
.isEqualTo((FastHash.FastHashVisitor.getLongHash(c.getMillisecond())));
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java
b/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java
new file mode 100644
index 000000000..10dab2950
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/** Test for {@link BloomFilter64}. */
+public class BloomFilter64Test {
+
+ private static final Random RANDOM = new Random();
+
+ @RepeatedTest(10000)
+ public void testFunction() {
+ BloomFilter64 bloomFilter64 = new BloomFilter64(10000, 0.02);
+
+ List<Long> testData = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ testData.add(RANDOM.nextLong());
+ }
+ testData.forEach(bloomFilter64::addHash);
+
+ for (Long value : testData) {
+ Assertions.assertThat(bloomFilter64.testHash(value)).isTrue();
+ }
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ long ra = RANDOM.nextLong();
+ if (bloomFilter64.testHash(ra) && !testData.contains(ra)) {
+ errorCount++;
+ }
+ }
+
+ // ffp should be less than 0.03
+ Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+ }
+}