This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 942d35d9c [core] Add bloom filter for file index (#3141)
942d35d9c is described below

commit 942d35d9c0a3f7b1c87367232cc5974e18aa9aad
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 8 11:17:05 2024 +0800

    [core] Add bloom filter for file index (#3141)
---
 paimon-common/pom.xml                              |  11 ++
 .../apache/paimon/fileindex/FileIndexFormat.java   |   9 +-
 .../apache/paimon/fileindex/FileIndexReader.java   |   2 -
 .../org/apache/paimon/fileindex/FileIndexer.java   |  10 +-
 .../bloomfilter/BloomFilterFileIndex.java          | 127 +++++++++++++
 .../paimon/fileindex/bloomfilter/FastHash.java     | 199 +++++++++++++++++++++
 .../java/org/apache/paimon/options/Options.java    |   4 +
 .../org/apache/paimon/utils/BloomFilter64.java     |  84 +++++++++
 paimon-common/src/main/resources/META-INF/NOTICE   |   1 +
 .../bloomfilter/BloomFilterFileIndexTest.java      | 127 +++++++++++++
 .../fileindex/bloomfilter/FastHashVisitorTest.java | 103 +++++++++++
 .../org/apache/paimon/utils/BloomFilter64Test.java |  60 +++++++
 12 files changed, 730 insertions(+), 7 deletions(-)

diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index 95d6458d6..38f89999f 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -141,6 +141,12 @@ under the License.
             <version>1.0.5</version>
         </dependency>
 
+        <dependency>
+            <groupId>net.openhft</groupId>
+            <artifactId>zero-allocation-hashing</artifactId>
+            <version>0.16</version>
+        </dependency>
+
         <!-- Test -->
 
         <dependency>
@@ -273,6 +279,7 @@ under the License.
                                     <include>org.codehaus.janino:*</include>
                                     <include>it.unimi.dsi:fastutil</include>
                                     
<include>org.roaringbitmap:RoaringBitmap</include>
+                                    
<include>net.openhft:zero-allocation-hashing</include>
                                 </includes>
                             </artifactSet>
                             <filters>
@@ -312,6 +319,10 @@ under the License.
                                     <pattern>org.roaringbitmap</pattern>
                                     
<shadedPattern>org.apache.paimon.shade.org.roaringbitmap</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>net.openhft.hashing</pattern>
+                                    
<shadedPattern>org.apache.paimon.shade.net.openhft.hashing</shadedPattern>
+                                </relocation>
                             </relocations>
                             <minimizeJar>true</minimizeJar>
                         </configuration>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index c9b827ee7..a7a3c534e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.fileindex;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
@@ -241,9 +242,11 @@ public final class FileIndexFormat {
             return readColumnInputStream(columnName)
                     .map(
                             serializedBytes ->
-                                    FileIndexer.create(type, 
fields.get(columnName).type())
-                                            .createReader()
-                                            .recoverFrom(serializedBytes))
+                                    FileIndexer.create(
+                                                    type,
+                                                    
fields.get(columnName).type(),
+                                                    new Options())
+                                            .createReader(serializedBytes))
                     .orElse(null);
         }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index 6d9404564..8a881b4b8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -29,8 +29,6 @@ import java.util.List;
  */
 public interface FileIndexReader extends FunctionVisitor<Boolean> {
 
-    FileIndexReader recoverFrom(byte[] serializedBytes);
-
     @Override
     default Boolean visitIsNotNull(FieldRef fieldRef) {
         return true;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
index e7e3d40bf..fa3085d79 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
@@ -18,17 +18,23 @@
 
 package org.apache.paimon.fileindex;
 
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 
+import static 
org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex.BLOOM_FILTER;
+
 /** File index interface. To build a file index. */
 public interface FileIndexer {
 
     FileIndexWriter createWriter();
 
-    FileIndexReader createReader();
+    FileIndexReader createReader(byte[] serializedBytes);
 
-    static FileIndexer create(String type, DataType dataType) {
+    static FileIndexer create(String type, DataType dataType, Options options) 
{
         switch (type) {
+            case BLOOM_FILTER:
+                return new BloomFilterFileIndex(dataType, options);
             default:
                 throw new RuntimeException("Doesn't support filter type: " + 
type);
         }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
new file mode 100644
index 000000000..c0486d535
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.BloomFilter64;
+
+import org.apache.hadoop.util.bloom.HashFunction;
+
+import java.util.BitSet;
+
+/**
+ * Bloom filter for file index.
+ *
+ * <p>Note: This class use {@link BloomFilter64} as a base filter. Store the 
num hash function (one
+ * integer) and bit set bytes only. Use {@link HashFunction} to hash the 
objects, which hash bytes
+ * type(like varchar, binary, etc.) using xx hash, hash numeric type by 
specified number hash(see
+ * 
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).
+ */
+public class BloomFilterFileIndex implements FileIndexer {
+
+    public static final String BLOOM_FILTER = "bloom";
+
+    private static final int DEFAULT_ITEMS = 1_000_000;
+    private static final double DEFAULT_FPP = 0.1;
+
+    private static final String ITEMS = "items";
+    private static final String FPP = "fpp";
+
+    private final DataType dataType;
+    private final int items;
+    private final double fpp;
+
+    public BloomFilterFileIndex(DataType dataType, Options options) {
+        this.dataType = dataType;
+        this.items = options.getInteger(ITEMS, DEFAULT_ITEMS);
+        this.fpp = options.getDouble(FPP, DEFAULT_FPP);
+    }
+
+    public String name() {
+        return BLOOM_FILTER;
+    }
+
+    @Override
+    public FileIndexWriter createWriter() {
+        return new Writer(dataType, items, fpp);
+    }
+
+    @Override
+    public FileIndexReader createReader(byte[] serializedBytes) {
+        return new Reader(dataType, serializedBytes);
+    }
+
+    private static class Writer implements FileIndexWriter {
+
+        private final BloomFilter64 filter;
+        private final FastHash hashFunction;
+
+        public Writer(DataType type, int items, double fpp) {
+            this.filter = new BloomFilter64(items, fpp);
+            this.hashFunction = FastHash.getHashFunction(type);
+        }
+
+        @Override
+        public void write(Object key) {
+            filter.addHash(hashFunction.hash(key));
+        }
+
+        @Override
+        public byte[] serializedBytes() {
+            int numHashFunctions = filter.getNumHashFunctions();
+            byte[] bytes = filter.getBitSet().toByteArray();
+            byte[] serialized = new byte[bytes.length + Integer.BYTES];
+            serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
+            serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
+            serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
+            serialized[3] = (byte) (numHashFunctions & 0xFF);
+            System.arraycopy(bytes, 0, serialized, 4, bytes.length);
+            return serialized;
+        }
+    }
+
+    private static class Reader implements FileIndexReader {
+
+        private final BloomFilter64 filter;
+        private final FastHash hashFunction;
+
+        public Reader(DataType type, byte[] serializedBytes) {
+            int numHashFunctions =
+                    ((serializedBytes[0] << 24)
+                            + (serializedBytes[1] << 16)
+                            + (serializedBytes[2] << 8)
+                            + serializedBytes[3]);
+            byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
+            System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
+            BitSet bitSet = BitSet.valueOf(bytes);
+            this.filter = new BloomFilter64(numHashFunctions, bitSet);
+            this.hashFunction = FastHash.getHashFunction(type);
+        }
+
+        @Override
+        public Boolean visitEqual(FieldRef fieldRef, Object key) {
+            return filter.testHash(hashFunction.hash(key));
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
new file mode 100644
index 000000000..d76b86c2e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import net.openhft.hashing.LongHashFunction;
+
+/** Hash object to 64 bits hash code. */
+public interface FastHash {
+
+    long hash(Object o);
+
+    static FastHash getHashFunction(DataType type) {
+        return type.accept(FastHashVisitor.INSTANCE);
+    }
+
+    /** Fast hash for object differs to DataType. */
+    class FastHashVisitor implements DataTypeVisitor<FastHash> {
+
+        private static final FastHashVisitor INSTANCE = new FastHashVisitor();
+
+        @Override
+        public FastHash visit(CharType charType) {
+            return o -> hash64(((BinaryString) o).toBytes());
+        }
+
+        @Override
+        public FastHash visit(VarCharType varCharType) {
+            return o -> hash64(((BinaryString) o).toBytes());
+        }
+
+        @Override
+        public FastHash visit(BooleanType booleanType) {
+            throw new UnsupportedOperationException("Does not support type 
boolean");
+        }
+
+        @Override
+        public FastHash visit(BinaryType binaryType) {
+            return o -> hash64((byte[]) o);
+        }
+
+        @Override
+        public FastHash visit(VarBinaryType varBinaryType) {
+            return o -> hash64((byte[]) o);
+        }
+
+        @Override
+        public FastHash visit(DecimalType decimalType) {
+            throw new UnsupportedOperationException("Does not support 
decimal");
+        }
+
+        @Override
+        public FastHash visit(TinyIntType tinyIntType) {
+            return o -> getLongHash((byte) o);
+        }
+
+        @Override
+        public FastHash visit(SmallIntType smallIntType) {
+            return o -> getLongHash((short) o);
+        }
+
+        @Override
+        public FastHash visit(IntType intType) {
+            return o -> getLongHash((int) o);
+        }
+
+        @Override
+        public FastHash visit(BigIntType bigIntType) {
+            return o -> getLongHash((long) o);
+        }
+
+        @Override
+        public FastHash visit(FloatType floatType) {
+            return o -> getLongHash(Float.floatToIntBits((float) o));
+        }
+
+        @Override
+        public FastHash visit(DoubleType doubleType) {
+            return o -> getLongHash(Double.doubleToLongBits((double) o));
+        }
+
+        @Override
+        public FastHash visit(DateType dateType) {
+            return o -> getLongHash((int) o);
+        }
+
+        @Override
+        public FastHash visit(TimeType timeType) {
+            return o -> getLongHash((int) o);
+        }
+
+        @Override
+        public FastHash visit(TimestampType timestampType) {
+            final int precision = timestampType.getPrecision();
+            return o -> {
+                if (o == null) {
+                    return 0;
+                }
+                if (precision <= 3) {
+                    return getLongHash(((Timestamp) o).getMillisecond());
+                }
+
+                return getLongHash(((Timestamp) o).toMicros());
+            };
+        }
+
+        @Override
+        public FastHash visit(LocalZonedTimestampType localZonedTimestampType) 
{
+            final int precision = localZonedTimestampType.getPrecision();
+            return o -> {
+                if (o == null) {
+                    return 0;
+                }
+                if (precision <= 3) {
+                    return getLongHash(((Timestamp) o).getMillisecond());
+                }
+
+                return getLongHash(((Timestamp) o).toMicros());
+            };
+        }
+
+        @Override
+        public FastHash visit(ArrayType arrayType) {
+            throw new UnsupportedOperationException("Does not support type 
array");
+        }
+
+        @Override
+        public FastHash visit(MultisetType multisetType) {
+            throw new UnsupportedOperationException("Does not support type 
mutiset");
+        }
+
+        @Override
+        public FastHash visit(MapType mapType) {
+            throw new UnsupportedOperationException("Does not support type 
map");
+        }
+
+        @Override
+        public FastHash visit(RowType rowType) {
+            throw new UnsupportedOperationException("Does not support type 
row");
+        }
+
+        // Thomas Wang's integer hash function
+        // 
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+        static long getLongHash(long key) {
+            key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+            key = key ^ (key >> 24);
+            key = (key + (key << 3)) + (key << 8); // key * 265
+            key = key ^ (key >> 14);
+            key = (key + (key << 2)) + (key << 4); // key * 21
+            key = key ^ (key >> 28);
+            key = key + (key << 31);
+            return key;
+        }
+
+        static long hash64(byte[] data) {
+            return LongHashFunction.xx().hashBytes(data);
+        }
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java 
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index d658ce123..161eef067 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -167,6 +167,10 @@ public class Options implements Serializable {
         return 
getRawValue(key).map(OptionsUtils::convertToInt).orElse(defaultValue);
     }
 
+    public synchronized double getDouble(String key, double defaultValue) {
+        return 
getRawValue(key).map(OptionsUtils::convertToDouble).orElse(defaultValue);
+    }
+
     public synchronized String getString(String key, String defaultValue) {
         return 
getRawValue(key).map(OptionsUtils::convertToString).orElse(defaultValue);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
new file mode 100644
index 000000000..75b8661a2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.BitSet;
+
+/** Bloom filter 64 handle 64 bits hash. */
+public final class BloomFilter64 {
+
+    private final BitSet bitSet;
+    private final int numBits;
+    private final int numHashFunctions;
+
+    public BloomFilter64(long items, double fpp) {
+        int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+        this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+        this.numHashFunctions =
+                Math.max(1, (int) Math.round((double) numBits / items * 
Math.log(2)));
+        this.bitSet = new BitSet(numBits);
+    }
+
+    public BloomFilter64(int numHashFunctions, BitSet bitSet) {
+        this.numHashFunctions = numHashFunctions;
+        this.numBits = bitSet.size();
+        this.bitSet = bitSet;
+    }
+
+    public void addHash(long hash64) {
+        int hash1 = (int) hash64;
+        int hash2 = (int) (hash64 >>> 32);
+
+        for (int i = 1; i <= numHashFunctions; i++) {
+            int combinedHash = hash1 + (i * hash2);
+            // hashcode should be positive, flip all the bits if it's negative
+            if (combinedHash < 0) {
+                combinedHash = ~combinedHash;
+            }
+            int pos = combinedHash % numBits;
+            bitSet.set(pos);
+        }
+    }
+
+    public boolean testHash(long hash64) {
+        int hash1 = (int) hash64;
+        int hash2 = (int) (hash64 >>> 32);
+
+        for (int i = 1; i <= numHashFunctions; i++) {
+            int combinedHash = hash1 + (i * hash2);
+            // hashcode should be positive, flip all the bits if it's negative
+            if (combinedHash < 0) {
+                combinedHash = ~combinedHash;
+            }
+            int pos = combinedHash % numBits;
+            if (!bitSet.get(pos)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public int getNumHashFunctions() {
+        return numHashFunctions;
+    }
+
+    public BitSet getBitSet() {
+        return bitSet;
+    }
+}
diff --git a/paimon-common/src/main/resources/META-INF/NOTICE 
b/paimon-common/src/main/resources/META-INF/NOTICE
index 1f8f8a058..cd1ab89b8 100644
--- a/paimon-common/src/main/resources/META-INF/NOTICE
+++ b/paimon-common/src/main/resources/META-INF/NOTICE
@@ -14,3 +14,4 @@ You find them under licenses/LICENSE.antlr-runtime and 
licenses/LICENSE.janino.
 - org.codehaus.janino:janino:3.0.11
 - org.codehaus.janino:commons-compiler:3.0.11
 - it.unimi.dsi:fastutil:8.5.12
+- net.openhft:zero-allocation-hashing:0.16
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
new file mode 100644
index 000000000..9834598c8
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndexTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/** Tests for {@link BloomFilterFileIndex}. */
+public class BloomFilterFileIndexTest {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testAddFindByRandom() {
+        BloomFilterFileIndex filter =
+                new BloomFilterFileIndex(
+                        DataTypes.BYTES(),
+                        new Options(
+                                new HashMap<String, String>() {
+                                    {
+                                        put("items", "10000");
+                                        put("fpp", "0.02");
+                                    }
+                                }));
+        FileIndexWriter writer = filter.createWriter();
+        List<byte[]> testData = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            testData.add(random());
+        }
+
+        // test empty bytes
+        testData.add(new byte[0]);
+
+        testData.forEach(writer::write);
+
+        FileIndexReader reader = filter.createReader(writer.serializedBytes());
+
+        for (byte[] bytes : testData) {
+            Assertions.assertThat(reader.visitEqual(null, bytes)).isTrue();
+        }
+
+        int errorCount = 0;
+        int num = 1000000;
+        for (int i = 0; i < num; i++) {
+            byte[] ra = random();
+            if (reader.visitEqual(null, ra)) {
+                errorCount++;
+            }
+        }
+
+        // ffp should be less than 0.03
+        Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+    }
+
+    @Test
+    public void testAddFindByRandomLong() {
+        BloomFilterFileIndex filter =
+                new BloomFilterFileIndex(
+                        DataTypes.BIGINT(),
+                        new Options(
+                                new HashMap<String, String>() {
+                                    {
+                                        put("items", "10000");
+                                        put("fpp", "0.02");
+                                    }
+                                }));
+        FileIndexWriter writer = filter.createWriter();
+        List<Long> testData = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            testData.add(RANDOM.nextLong());
+        }
+
+        testData.forEach(writer::write);
+
+        FileIndexReader reader = filter.createReader(writer.serializedBytes());
+
+        for (Long value : testData) {
+            Assertions.assertThat(reader.visitEqual(null, value)).isTrue();
+        }
+
+        int errorCount = 0;
+        int num = 1000000;
+        for (int i = 0; i < num; i++) {
+            Long ra = RANDOM.nextLong();
+            if (reader.visitEqual(null, ra)) {
+                errorCount++;
+            }
+        }
+
+        // ffp should be less than 0.03
+        Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+    }
+
+    private byte[] random() {
+        byte[] b = new byte[Math.abs(RANDOM.nextInt(400) + 1)];
+        RANDOM.nextBytes(b);
+        return b;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
new file mode 100644
index 000000000..a661da8b6
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/FastHashVisitorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bloomfilter;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+/** Test for {@link FastHash.FastHashVisitor}. */
+public class FastHashVisitorTest {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testTinyIntType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.TINYINT());
+        byte c = (byte) RANDOM.nextInt();
+        
Assertions.assertThat(function.hash(c)).isEqualTo(FastHash.FastHashVisitor.getLongHash(c));
+    }
+
+    @Test
+    public void testSmallIntType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.SMALLINT());
+        short c = (short) RANDOM.nextInt();
+        
Assertions.assertThat(function.hash(c)).isEqualTo(FastHash.FastHashVisitor.getLongHash(c));
+    }
+
+    @Test
+    public void testIntType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.INT());
+        int c = RANDOM.nextInt();
+        Assertions.assertThat(function.hash(c))
+                .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+    }
+
+    @Test
+    public void testBigIntType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.BIGINT());
+        long c = RANDOM.nextLong();
+        Assertions.assertThat(function.hash(c))
+                .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+    }
+
+    @Test
+    public void testFloatType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.FLOAT());
+        float c = RANDOM.nextFloat();
+        Assertions.assertThat(function.hash(c))
+                
.isEqualTo((FastHash.FastHashVisitor.getLongHash(Float.floatToIntBits(c))));
+    }
+
+    @Test
+    public void testDoubleType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.DOUBLE());
+        double c = RANDOM.nextDouble();
+        Assertions.assertThat(function.hash(c))
+                
.isEqualTo((FastHash.FastHashVisitor.getLongHash(Double.doubleToLongBits(c))));
+    }
+
+    @Test
+    public void testDateType() {
+        FastHash function = FastHash.getHashFunction(DataTypes.DATE());
+        int c = RANDOM.nextInt();
+        Assertions.assertThat(function.hash(c))
+                .isEqualTo((FastHash.FastHashVisitor.getLongHash(c)));
+    }
+
+    @Test
+    public void testTimestampType() {
+        FastHash function = 
FastHash.getHashFunction(DataTypes.TIMESTAMP_MILLIS());
+        Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis());
+        Assertions.assertThat(function.hash(c))
+                
.isEqualTo((FastHash.FastHashVisitor.getLongHash(c.getMillisecond())));
+    }
+
+    @Test
+    public void testLocalZonedTimestampType() {
+        FastHash function = 
FastHash.getHashFunction(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+        Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis());
+        Assertions.assertThat(function.hash(c))
+                
.isEqualTo((FastHash.FastHashVisitor.getLongHash(c.getMillisecond())));
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java
new file mode 100644
index 000000000..10dab2950
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/BloomFilter64Test.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/** Test for {@link BloomFilter64}. */
+public class BloomFilter64Test {
+
+    private static final Random RANDOM = new Random();
+
+    @RepeatedTest(10000)
+    public void testFunction() {
+        BloomFilter64 bloomFilter64 = new BloomFilter64(10000, 0.02);
+
+        List<Long> testData = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            testData.add(RANDOM.nextLong());
+        }
+        testData.forEach(bloomFilter64::addHash);
+
+        for (Long value : testData) {
+            Assertions.assertThat(bloomFilter64.testHash(value)).isTrue();
+        }
+
+        int errorCount = 0;
+        int num = 1000000;
+        for (int i = 0; i < num; i++) {
+            long ra = RANDOM.nextLong();
+            if (bloomFilter64.testHash(ra) && !testData.contains(ra)) {
+                errorCount++;
+            }
+        }
+
+        // ffp should be less than 0.03
+        Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+    }
+}


Reply via email to