This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ddf10d457 [core] Support bsi file index (#4464)
ddf10d457 is described below

commit ddf10d457176b4961a35110fc907e834a6db2261
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Fri Nov 8 17:30:22 2024 +0800

    [core] Support bsi file index (#4464)
---
 docs/content/append-table/query-performance.md     |   3 +
 docs/content/concepts/spec/fileindex.md            | 102 ++++++
 .../content/primary-key-table/query-performance.md |   3 +
 .../bsi/BitSliceIndexBitmapFileIndex.java          | 403 +++++++++++++++++++++
 .../bsi/BitSliceIndexBitmapFileIndexFactory.java   |  40 ++
 .../paimon/utils/BitSliceIndexRoaringBitmap.java   | 234 ++++++++++++
 .../org/apache/paimon/utils/RoaringBitmap32.java   |   8 +
 .../org.apache.paimon.fileindex.FileIndexerFactory |   3 +-
 .../bsi/BitSliceIndexBitmapFileIndexTest.java      | 260 +++++++++++++
 .../utils/BitSliceIndexRoaringBitmapTest.java      | 124 +++++++
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 132 +++++++
 .../apache/paimon/spark/SparkFileIndexITCase.java  |  39 +-
 12 files changed, 1343 insertions(+), 8 deletions(-)

diff --git a/docs/content/append-table/query-performance.md 
b/docs/content/append-table/query-performance.md
index 32ea806b8..7ec745468 100644
--- a/docs/content/append-table/query-performance.md
+++ b/docs/content/append-table/query-performance.md
@@ -65,6 +65,9 @@ scenario. Using a bitmap may consume more space but can 
result in greater accura
 `Bitmap`:
 * `file-index.bitmap.columns`: specify the columns that need bitmap index.
 
+`Bit-Slice Index Bitmap`
+* `file-index.bsi.columns`: specify the columns that need bsi index.
+
 More filter types will be supported...
 
 If you want to add file index to existing table, without any rewrite, you can 
use `rewrite_file_index` procedure. Before
diff --git a/docs/content/concepts/spec/fileindex.md 
b/docs/content/concepts/spec/fileindex.md
index 6a8169aef..fd499fdf7 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -136,3 +136,105 @@ offset:                        4 bytes int (when it is 
negative, it represents t
 </pre>
 
 Integer are all BIT_ENDIAN.
+
+## Column Index Bytes: Bit-Slice Index Bitmap
+
+BSI file index is a numeric range index, used to accelerate range query, it 
can use with bitmap index.
+
+Define `'file-index.bsi.columns'`.
+
+BSI file index format (V1):
+
+<pre>
+BSI file index format (V1)
++-------------------------------------------------+
+| version (1 byte)                               |
++-------------------------------------------------+
+| row count (4 bytes int)                        |
++-------------------------------------------------+
+| has positive value (1 byte)                    |
++-------------------------------------------------+
+| positive bsi serialized (if has positive value)|       
++-------------------------------------------------+
+| has negative value (1 byte)                    |
++-------------------------------------------------+
+| negative bsi serialized (if has negative value)|       
++-------------------------------------------------+
+</pre>
+
+BSI only support the following data type:
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 10%">Paimon Data Type</th>
+      <th class="text-left" style="width: 5%">Supported</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>TinyIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>SmallIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>IntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>BigIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>DateType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>LocalZonedTimestamp</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>TimestampType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>DecimalType(precision, scale)</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>FloatType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>DoubleType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>String</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>VarBinaryType</code>, <code>BinaryType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>RowType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>MapType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>ArrayType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>BooleanType</code></td>
+      <td>false</td>
+    </tr>
+    </tbody>
+</table>
diff --git a/docs/content/primary-key-table/query-performance.md 
b/docs/content/primary-key-table/query-performance.md
index 971be8ae6..fd6bdeeeb 100644
--- a/docs/content/primary-key-table/query-performance.md
+++ b/docs/content/primary-key-table/query-performance.md
@@ -63,6 +63,9 @@ Supported filter types:
 `Bitmap`:
 * `file-index.bitmap.columns`: specify the columns that need bitmap index.
 
+`Bit-Slice Index Bitmap`
+* `file-index.bsi.columns`: specify the columns that need bsi index.
+
 More filter types will be supported...
 
 If you want to add file index to existing table, without any rewrite, you can 
use `rewrite_file_index` procedure. Before
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java
new file mode 100644
index 000000000..8d0e054dd
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bsi;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.BitSliceIndexRoaringBitmap;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** implementation of BSI file index. */
+public class BitSliceIndexBitmapFileIndex implements FileIndexer {
+
+    public static final int VERSION_1 = 1;
+
+    private final DataType dataType;
+
+    public BitSliceIndexBitmapFileIndex(DataType dataType, Options options) {
+        this.dataType = dataType;
+    }
+
+    @Override
+    public FileIndexWriter createWriter() {
+        return new Writer(dataType);
+    }
+
+    @Override
+    public FileIndexReader createReader(SeekableInputStream inputStream, int 
start, int length) {
+        try {
+            inputStream.seek(start);
+            DataInput input = new DataInputStream(inputStream);
+            byte version = input.readByte();
+            if (version > VERSION_1) {
+                throw new RuntimeException(
+                        String.format(
+                                "read bsi index file fail, "
+                                        + "your plugin version is lower than 
%d",
+                                version));
+            }
+
+            int rowNumber = input.readInt();
+
+            boolean hasPositive = input.readBoolean();
+            BitSliceIndexRoaringBitmap positive =
+                    hasPositive
+                            ? BitSliceIndexRoaringBitmap.map(input)
+                            : BitSliceIndexRoaringBitmap.EMPTY;
+
+            boolean hasNegative = input.readBoolean();
+            BitSliceIndexRoaringBitmap negative =
+                    hasNegative
+                            ? BitSliceIndexRoaringBitmap.map(input)
+                            : BitSliceIndexRoaringBitmap.EMPTY;
+
+            return new Reader(dataType, rowNumber, positive, negative);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class Writer extends FileIndexWriter {
+
+        private final Function<Object, Long> valueMapper;
+        private final StatsCollectList collector;
+
+        public Writer(DataType dataType) {
+            this.valueMapper = getValueMapper(dataType);
+            this.collector = new StatsCollectList();
+        }
+
+        @Override
+        public void write(Object key) {
+            collector.add(valueMapper.apply(key));
+        }
+
+        @Override
+        public byte[] serializedBytes() {
+            try {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutput out = new DataOutputStream(bos);
+
+                BitSliceIndexRoaringBitmap.Appender positive =
+                        new BitSliceIndexRoaringBitmap.Appender(
+                                collector.positiveMin, collector.positiveMax);
+                BitSliceIndexRoaringBitmap.Appender negative =
+                        new BitSliceIndexRoaringBitmap.Appender(
+                                collector.negativeMin, collector.negativeMax);
+
+                for (int i = 0; i < collector.values.size(); i++) {
+                    Long value = collector.values.get(i);
+                    if (value != null) {
+                        if (value < 0) {
+                            negative.append(i, Math.abs(value));
+                        } else {
+                            positive.append(i, value);
+                        }
+                    }
+                }
+
+                out.writeByte(VERSION_1);
+                out.writeInt(collector.values.size());
+
+                boolean hasPositive = positive.isNotEmpty();
+                out.writeBoolean(hasPositive);
+                if (hasPositive) {
+                    positive.serialize(out);
+                }
+
+                boolean hasNegative = negative.isNotEmpty();
+                out.writeBoolean(hasNegative);
+                if (hasNegative) {
+                    negative.serialize(out);
+                }
+                return bos.toByteArray();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private static class StatsCollectList {
+            private long positiveMin;
+            private long positiveMax;
+            private long negativeMin;
+            private long negativeMax;
+            // todo: Find a way to reduce the risk of out-of-memory.
+            private final List<Long> values = new ArrayList<>();
+
+            public void add(Long value) {
+                values.add(value);
+                if (value != null) {
+                    collect(value);
+                }
+            }
+
+            private void collect(long value) {
+                if (value < 0) {
+                    negativeMin = Math.min(negativeMin, Math.abs(value));
+                    negativeMax = Math.max(negativeMax, Math.abs(value));
+                } else {
+                    positiveMin = Math.min(positiveMin, value);
+                    positiveMax = Math.max(positiveMax, value);
+                }
+            }
+        }
+    }
+
+    private static class Reader extends FileIndexReader {
+
+        private final int rowNumber;
+        private final BitSliceIndexRoaringBitmap positive;
+        private final BitSliceIndexRoaringBitmap negative;
+        private final Function<Object, Long> valueMapper;
+
+        public Reader(
+                DataType dataType,
+                int rowNumber,
+                BitSliceIndexRoaringBitmap positive,
+                BitSliceIndexRoaringBitmap negative) {
+            this.rowNumber = rowNumber;
+            this.positive = positive;
+            this.negative = negative;
+            this.valueMapper = getValueMapper(dataType);
+        }
+
+        @Override
+        public FileIndexResult visitIsNull(FieldRef fieldRef) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        RoaringBitmap32 bitmap =
+                                RoaringBitmap32.or(positive.isNotNull(), 
negative.isNotNull());
+                        bitmap.flip(0, rowNumber);
+                        return bitmap;
+                    });
+        }
+
+        @Override
+        public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
+            return new BitmapIndexResultLazy(
+                    () -> RoaringBitmap32.or(positive.isNotNull(), 
negative.isNotNull()));
+        }
+
+        @Override
+        public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+            return visitIn(fieldRef, Collections.singletonList(literal));
+        }
+
+        @Override
+        public FileIndexResult visitNotEqual(FieldRef fieldRef, Object 
literal) {
+            return visitNotIn(fieldRef, Collections.singletonList(literal));
+        }
+
+        @Override
+        public FileIndexResult visitIn(FieldRef fieldRef, List<Object> 
literals) {
+            return new BitmapIndexResultLazy(
+                    () ->
+                            literals.stream()
+                                    .map(valueMapper)
+                                    .map(
+                                            value -> {
+                                                if (value < 0) {
+                                                    return 
negative.eq(Math.abs(value));
+                                                } else {
+                                                    return positive.eq(value);
+                                                }
+                                            })
+                                    .reduce(
+                                            new RoaringBitmap32(),
+                                            (x1, x2) -> RoaringBitmap32.or(x1, 
x2)));
+        }
+
+        @Override
+        public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> 
literals) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        RoaringBitmap32 ebm =
+                                RoaringBitmap32.or(positive.isNotNull(), 
negative.isNotNull());
+                        RoaringBitmap32 eq =
+                                literals.stream()
+                                        .map(valueMapper)
+                                        .map(
+                                                value -> {
+                                                    if (value < 0) {
+                                                        return 
negative.eq(Math.abs(value));
+                                                    } else {
+                                                        return 
positive.eq(value);
+                                                    }
+                                                })
+                                        .reduce(
+                                                new RoaringBitmap32(),
+                                                (x1, x2) -> 
RoaringBitmap32.or(x1, x2));
+                        return RoaringBitmap32.andNot(ebm, eq);
+                    });
+        }
+
+        @Override
+        public FileIndexResult visitLessThan(FieldRef fieldRef, Object 
literal) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        Long value = valueMapper.apply(literal);
+                        if (value < 0) {
+                            return negative.gt(Math.abs(value));
+                        } else {
+                            return RoaringBitmap32.or(positive.lt(value), 
negative.isNotNull());
+                        }
+                    });
+        }
+
+        @Override
+        public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object 
literal) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        Long value = valueMapper.apply(literal);
+                        if (value < 0) {
+                            return negative.gte(Math.abs(value));
+                        } else {
+                            return RoaringBitmap32.or(positive.lte(value), 
negative.isNotNull());
+                        }
+                    });
+        }
+
+        @Override
+        public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object 
literal) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        Long value = valueMapper.apply(literal);
+                        if (value < 0) {
+                            return RoaringBitmap32.or(
+                                    positive.isNotNull(), 
negative.lt(Math.abs(value)));
+                        } else {
+                            return positive.gt(value);
+                        }
+                    });
+        }
+
+        @Override
+        public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
+            return new BitmapIndexResultLazy(
+                    () -> {
+                        Long value = valueMapper.apply(literal);
+                        if (value < 0) {
+                            return RoaringBitmap32.or(
+                                    positive.isNotNull(), 
negative.lte(Math.abs(value)));
+                        } else {
+                            return positive.gte(value);
+                        }
+                    });
+        }
+    }
+
+    public static Function<Object, Long> getValueMapper(DataType dataType) {
+        return dataType.accept(
+                new DataTypeDefaultVisitor<Function<Object, Long>>() {
+                    @Override
+                    public Function<Object, Long> visit(DecimalType 
decimalType) {
+                        return o -> o == null ? null : ((Decimal) 
o).toUnscaledLong();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(TinyIntType 
tinyIntType) {
+                        return o -> o == null ? null : ((Byte) o).longValue();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(SmallIntType 
smallIntType) {
+                        return o -> o == null ? null : ((Short) o).longValue();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(IntType intType) {
+                        return o -> o == null ? null : ((Integer) 
o).longValue();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(BigIntType bigIntType) 
{
+                        return o -> o == null ? null : (Long) o;
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(DateType dateType) {
+                        return o -> o == null ? null : ((Integer) 
o).longValue();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(TimeType timeType) {
+                        return o -> o == null ? null : ((Integer) 
o).longValue();
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(TimestampType 
timestampType) {
+                        return 
getTimeStampMapper(timestampType.getPrecision());
+                    }
+
+                    @Override
+                    public Function<Object, Long> visit(
+                            LocalZonedTimestampType localZonedTimestampType) {
+                        return 
getTimeStampMapper(localZonedTimestampType.getPrecision());
+                    }
+
+                    @Override
+                    protected Function<Object, Long> defaultMethod(DataType 
dataType) {
+                        throw new UnsupportedOperationException(
+                                dataType.asSQLString()
+                                        + " type is not support to build bsi 
index yet.");
+                    }
+
+                    private Function<Object, Long> getTimeStampMapper(int 
precision) {
+                        return o -> {
+                            if (o == null) {
+                                return null;
+                            } else if (precision <= 3) {
+                                return ((Timestamp) o).getMillisecond();
+                            } else {
+                                return ((Timestamp) o).toMicros();
+                            }
+                        };
+                    }
+                });
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java
new file mode 100644
index 000000000..aa7a92b78
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bsi;
+
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.FileIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+/** Factory to create {@link BitSliceIndexBitmapFileIndex}. */
+public class BitSliceIndexBitmapFileIndexFactory implements FileIndexerFactory 
{
+
+    public static final String BSI_INDEX = "bsi";
+
+    @Override
+    public String identifier() {
+        return BSI_INDEX;
+    }
+
+    @Override
+    public FileIndexer create(DataType dataType, Options options) {
+        return new BitSliceIndexBitmapFileIndex(dataType, options);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java
new file mode 100644
index 000000000..662d791d1
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/* This file is based on source code from the RoaringBitmap Project 
(http://roaringbitmap.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A bit slice index compressed bitmap. */
+public class BitSliceIndexRoaringBitmap {
+
+    public static final byte VERSION_1 = 1;
+
+    public static final BitSliceIndexRoaringBitmap EMPTY =
+            new BitSliceIndexRoaringBitmap(0, new RoaringBitmap32(), new 
RoaringBitmap32[] {});
+
+    private final long min;
+    private final RoaringBitmap32 ebm;
+    private final RoaringBitmap32[] slices;
+
+    private BitSliceIndexRoaringBitmap(long min, RoaringBitmap32 ebm, 
RoaringBitmap32[] slices) {
+        this.min = min;
+        this.ebm = ebm;
+        this.slices = slices;
+    }
+
+    public RoaringBitmap32 eq(long predicate) {
+        return oNeilCompare(Operation.EQ, predicate - min, null);
+    }
+
+    public RoaringBitmap32 lt(long predicate) {
+        return oNeilCompare(Operation.LT, predicate - min, null);
+    }
+
+    public RoaringBitmap32 lte(long predicate) {
+        return oNeilCompare(Operation.LTE, predicate - min, null);
+    }
+
+    public RoaringBitmap32 gt(long predicate) {
+        return oNeilCompare(Operation.GT, predicate - min, null);
+    }
+
+    public RoaringBitmap32 gte(long predicate) {
+        return oNeilCompare(Operation.GTE, predicate - min, null);
+    }
+
+    public RoaringBitmap32 isNotNull() {
+        return ebm.clone();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BitSliceIndexRoaringBitmap that = (BitSliceIndexRoaringBitmap) o;
+        return min == that.min
+                && Objects.equals(ebm, that.ebm)
+                && Arrays.equals(slices, that.slices);
+    }
+
+    /**
+     * O'Neil bit-sliced index compare algorithm.
+     *
+     * <p>See <a href="https://dl.acm.org/doi/10.1145/253262.253268";>Improved 
query performance with
+     * variant indexes</a>
+     *
+     * @param operation compare operation
+     * @param predicate the value we found filter
+     * @param foundSet rid set we want compare, using RoaringBitmap to express
+     * @return rid set we found in this bsi with giving conditions, using 
RoaringBitmap to express
+     */
+    private RoaringBitmap32 oNeilCompare(
+            Operation operation, long predicate, RoaringBitmap32 foundSet) {
+        RoaringBitmap32 fixedFoundSet = foundSet == null ? ebm : foundSet;
+        RoaringBitmap32 gt = new RoaringBitmap32();
+        RoaringBitmap32 lt = new RoaringBitmap32();
+        RoaringBitmap32 eq = ebm;
+
+        for (int i = slices.length - 1; i >= 0; i--) {
+            long bit = (predicate >> i) & 1;
+            if (bit == 1) {
+                lt = RoaringBitmap32.or(lt, RoaringBitmap32.andNot(eq, 
slices[i]));
+                eq = RoaringBitmap32.and(eq, slices[i]);
+            } else {
+                gt = RoaringBitmap32.or(gt, RoaringBitmap32.and(eq, 
slices[i]));
+                eq = RoaringBitmap32.andNot(eq, slices[i]);
+            }
+        }
+
+        eq = RoaringBitmap32.and(fixedFoundSet, eq);
+        switch (operation) {
+            case EQ:
+                return eq;
+            case NEQ:
+                return RoaringBitmap32.andNot(fixedFoundSet, eq);
+            case GT:
+                return RoaringBitmap32.and(gt, fixedFoundSet);
+            case LT:
+                return RoaringBitmap32.and(lt, fixedFoundSet);
+            case LTE:
+                return RoaringBitmap32.and(RoaringBitmap32.or(lt, eq), 
fixedFoundSet);
+            case GTE:
+                return RoaringBitmap32.and(RoaringBitmap32.or(gt, eq), 
fixedFoundSet);
+            default:
+                throw new IllegalArgumentException("not support operation: " + 
operation);
+        }
+    }
+
+    /** Specifies O'Neil compare algorithm operation. */
+    private enum Operation {
+        EQ,
+        NEQ,
+        LTE,
+        LT,
+        GTE,
+        GT
+    }
+
+    public static BitSliceIndexRoaringBitmap map(DataInput in) throws 
IOException {
+        int version = in.readByte();
+        if (version > VERSION_1) {
+            throw new RuntimeException(
+                    String.format(
+                            "deserialize bsi index fail, " + "your plugin 
version is lower than %d",
+                            version));
+        }
+
+        // deserialize min
+        long min = in.readLong();
+
+        // deserialize ebm
+        RoaringBitmap32 ebm = new RoaringBitmap32();
+        ebm.deserialize(in);
+
+        // deserialize slices
+        RoaringBitmap32[] slices = new RoaringBitmap32[in.readInt()];
+        for (int i = 0; i < slices.length; i++) {
+            RoaringBitmap32 rb = new RoaringBitmap32();
+            rb.deserialize(in);
+            slices[i] = rb;
+        }
+
+        return new BitSliceIndexRoaringBitmap(min, ebm, slices);
+    }
+
+    /** A Builder for {@link BitSliceIndexRoaringBitmap}. */
+    public static class Appender {
+        private final long min;
+        private final long max;
+        private final RoaringBitmap32 ebm;
+        private final RoaringBitmap32[] slices;
+
+        public Appender(long min, long max) {
+            if (min < 0) {
+                throw new IllegalArgumentException("values should be 
non-negative");
+            }
+            if (min > max) {
+                throw new IllegalArgumentException("min should be less than 
max");
+            }
+
+            this.min = min;
+            this.max = max;
+            this.ebm = new RoaringBitmap32();
+            this.slices = new RoaringBitmap32[64 - 
Long.numberOfLeadingZeros(max - min)];
+            for (int i = 0; i < slices.length; i++) {
+                slices[i] = new RoaringBitmap32();
+            }
+        }
+
+        public void append(int rid, long value) {
+            if (value > max) {
+                throw new IllegalArgumentException(String.format("value %s is 
too large", value));
+            }
+
+            if (ebm.contains(rid)) {
+                throw new IllegalArgumentException(String.format("rid=%s is 
already exists", rid));
+            }
+
+            // reduce the number of slices
+            value = value - min;
+
+            // only bit=1 need to set
+            while (value != 0) {
+                slices[Long.numberOfTrailingZeros(value)].add(rid);
+                value &= (value - 1);
+            }
+            ebm.add(rid);
+        }
+
+        public boolean isNotEmpty() {
+            return !ebm.isEmpty();
+        }
+
+        public void serialize(DataOutput out) throws IOException {
+            out.writeByte(VERSION_1);
+            out.writeLong(min);
+            ebm.serialize(out);
+            out.writeInt(slices.length);
+            for (RoaringBitmap32 slice : slices) {
+                slice.serialize(out);
+            }
+        }
+
+        public BitSliceIndexRoaringBitmap build() throws IOException {
+            return new BitSliceIndexRoaringBitmap(min, ebm, slices);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 7b86bae8a..f9232dc82 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -68,6 +68,10 @@ public class RoaringBitmap32 {
         return roaringBitmap.getLongCardinality();
     }
 
+    public RoaringBitmap32 clone() {
+        return new RoaringBitmap32(roaringBitmap.clone());
+    }
+
     public void serialize(DataOutput out) throws IOException {
         roaringBitmap.runOptimize();
         roaringBitmap.serialize(out);
@@ -149,4 +153,8 @@ public class RoaringBitmap32 {
                             }
                         }));
     }
+
+    public static RoaringBitmap32 andNot(final RoaringBitmap32 x1, final 
RoaringBitmap32 x2) {
+        return new RoaringBitmap32(RoaringBitmap.andNot(x1.roaringBitmap, 
x2.roaringBitmap));
+    }
 }
diff --git 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
index 8a899eb23..30c908c72 100644
--- 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
+++ 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
-org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
\ No newline at end of file
+org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
+org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
\ No newline at end of file
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java
new file mode 100644
index 000000000..594ac40b9
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bsi;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test for {@link BitSliceIndexBitmapFileIndex}. */
+public class BitSliceIndexBitmapFileIndexTest {
+
+    @Test
+    public void testBitSliceIndexMix() {
+        IntType intType = new IntType();
+        FieldRef fieldRef = new FieldRef(0, "", intType);
+        BitSliceIndexBitmapFileIndex bsiFileIndex = new 
BitSliceIndexBitmapFileIndex(intType, null);
+        FileIndexWriter writer = bsiFileIndex.createWriter();
+
+        Object[] arr = {1, 2, null, -2, -2, -1, null, 2, 0, 5, null};
+
+        for (Object o : arr) {
+            writer.write(o);
+        }
+        byte[] bytes = writer.serializedBytes();
+        ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+        FileIndexReader reader = bsiFileIndex.createReader(stream, 0, 
bytes.length);
+
+        // test eq
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
100)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+
+        // test neq
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4, 5, 8, 9));
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7, 8, 9));
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
100)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+
+        // test in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitIn(fieldRef, 
Arrays.asList(-1, 1, 2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7));
+
+        // test not in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitNotIn(fieldRef, 
Arrays.asList(-1, 1, 2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 8, 9));
+
+        // test null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(2, 6, 10));
+
+        // test is not null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNotNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+
+        // test lt
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4, 5, 8));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
+
+        // test gt
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7, 8, 9));
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, -2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(9));
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, 2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 7, 9));
+    }
+
+    @Test
+    public void testBitSliceIndexPositiveOnly() {
+        IntType intType = new IntType();
+        FieldRef fieldRef = new FieldRef(0, "", intType);
+        BitSliceIndexBitmapFileIndex bsiFileIndex = new 
BitSliceIndexBitmapFileIndex(intType, null);
+        FileIndexWriter writer = bsiFileIndex.createWriter();
+
+        Object[] arr = {0, 1, null, 3, 4, 5, 6, 0, null};
+
+        for (Object o : arr) {
+            writer.write(o);
+        }
+        byte[] bytes = writer.serializedBytes();
+        ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+        FileIndexReader reader = bsiFileIndex.createReader(stream, 0, 
bytes.length);
+
+        // test eq
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
0)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1));
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+
+        // test neq
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 4, 5, 6, 7));
+
+        // test in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitIn(fieldRef, 
Arrays.asList(-1, 1, 2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3));
+
+        // test not in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitNotIn(fieldRef, 
Arrays.asList(-1, 1, 2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 4, 5, 6, 7));
+
+        // test null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(2, 8));
+
+        // test is not null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNotNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+
+        // test lt
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+
+        // test gt
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, -2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5, 6));
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, 1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6));
+    }
+
+    @Test
+    public void testBitSliceIndexNegativeOnly() {
+        IntType intType = new IntType();
+        FieldRef fieldRef = new FieldRef(0, "", intType);
+        BitSliceIndexBitmapFileIndex bsiFileIndex = new 
BitSliceIndexBitmapFileIndex(intType, null);
+        FileIndexWriter writer = bsiFileIndex.createWriter();
+
+        Object[] arr = {null, -1, null, -3, -4, -5, -6, -1, null};
+
+        for (Object o : arr) {
+            writer.write(o);
+        }
+        byte[] bytes = writer.serializedBytes();
+        ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+        FileIndexReader reader = bsiFileIndex.createReader(stream, 0, 
bytes.length);
+
+        // test eq
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+        assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 
-1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+
+        // test neq
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
-2)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 
-3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 4, 5, 6, 7));
+
+        // test in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitIn(fieldRef, 
Arrays.asList(-1, -4, -2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 4, 7));
+
+        // test not in
+        assertThat(
+                        ((BitmapIndexResultLazy)
+                                        reader.visitNotIn(fieldRef, 
Arrays.asList(-1, -4, -2, 3)))
+                                .get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 5, 6));
+
+        // test null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 2, 8));
+
+        // test is not null
+        assertThat(((BitmapIndexResultLazy) 
reader.visitIsNotNull(fieldRef)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+
+        // test lt
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
-3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(4, 5, 6));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
-3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5, 6));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+
+        // test gt
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
-3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, -3)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 7));
+        assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 
1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+        assertThat(((BitmapIndexResultLazy) 
reader.visitGreaterOrEqual(fieldRef, 1)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf());
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java
new file mode 100644
index 000000000..8c4a27d43
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BitSliceIndexRoaringBitmap}. */
+public class BitSliceIndexRoaringBitmapTest {
+
+    private long base;
+    private BitSliceIndexRoaringBitmap bsi;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        this.base = System.currentTimeMillis();
+        BitSliceIndexRoaringBitmap.Appender appender =
+                new BitSliceIndexRoaringBitmap.Appender(base, 
toPredicate(100));
+        IntStream.range(0, 31).forEach(x -> appender.append(x, 
toPredicate(x)));
+        IntStream.range(51, 100).forEach(x -> appender.append(x, 
toPredicate(x)));
+        appender.append(100, toPredicate(30));
+        this.bsi = appender.build();
+    }
+
+    @Test
+    public void testSerde() throws IOException {
+        BitSliceIndexRoaringBitmap.Appender appender =
+                new BitSliceIndexRoaringBitmap.Appender(0, toPredicate(100));
+        IntStream.range(0, 31).forEach(x -> appender.append(x, 
toPredicate(x)));
+        IntStream.range(51, 100).forEach(x -> appender.append(x, 
toPredicate(x)));
+        appender.append(100, toPredicate(30));
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        appender.serialize(new DataOutputStream(out));
+
+        ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+        assertThat(BitSliceIndexRoaringBitmap.map(new DataInputStream(in)))
+                .isEqualTo(appender.build());
+    }
+
+    @Test
+    public void testEQ() {
+        
assertThat(bsi.eq(toPredicate(1))).isEqualTo(RoaringBitmap32.bitmapOf(1));
+        
assertThat(bsi.eq(toPredicate(32))).isEqualTo(RoaringBitmap32.bitmapOf());
+        
assertThat(bsi.eq(toPredicate(30))).isEqualTo(RoaringBitmap32.bitmapOf(30, 
100));
+    }
+
+    @Test
+    public void testLT() {
+        assertThat(bsi.lt(toPredicate(30)))
+                .isEqualTo(RoaringBitmap32.bitmapOf(IntStream.range(0, 
30).toArray()));
+        assertThat(bsi.lt(toPredicate(45)))
+                .isEqualTo(
+                        RoaringBitmap32.bitmapOf(
+                                IntStream.concat(IntStream.range(0, 31), 
IntStream.range(100, 101))
+                                        .toArray()));
+    }
+
+    @Test
+    public void testLTE() {
+        RoaringBitmap32 expected =
+                RoaringBitmap32.bitmapOf(
+                        IntStream.concat(IntStream.range(0, 31), 
IntStream.range(100, 101))
+                                .toArray());
+        assertThat(bsi.lte(toPredicate(30))).isEqualTo(expected);
+        assertThat(bsi.lte(toPredicate(45))).isEqualTo(expected);
+    }
+
+    @Test
+    public void testGT() {
+        RoaringBitmap32 expected = 
RoaringBitmap32.bitmapOf(IntStream.range(51, 100).toArray());
+        assertThat(bsi.gt(toPredicate(30))).isEqualTo(expected);
+        assertThat(bsi.gt(toPredicate(45))).isEqualTo(expected);
+    }
+
+    @Test
+    public void testGTE() {
+        assertThat(bsi.gte(toPredicate(30)))
+                .isEqualTo(
+                        RoaringBitmap32.bitmapOf(
+                                IntStream.concat(IntStream.range(30, 31), 
IntStream.range(51, 101))
+                                        .toArray()));
+        assertThat(bsi.gte(toPredicate(45)))
+                .isEqualTo(RoaringBitmap32.bitmapOf(IntStream.range(51, 
100).toArray()));
+    }
+
+    @Test
+    public void testIsNotNull() {
+        assertThat(bsi.isNotNull())
+                .isEqualTo(
+                        RoaringBitmap32.bitmapOf(
+                                IntStream.concat(IntStream.range(0, 31), 
IntStream.range(51, 101))
+                                        .toArray()));
+    }
+
+    private long toPredicate(long predicate) {
+        return base + predicate;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index e6503cc40..81dd30262 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -26,7 +26,9 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory;
 import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory;
+import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -559,6 +561,136 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         reader.forEachRemaining(row -> 
assertThat(row.getString(1).toString()).isEqualTo("b"));
     }
 
+    @Test
+    public void testBSIAndBitmapIndexInMemory() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("event", DataTypes.STRING())
+                        .field("price", DataTypes.BIGINT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
BitmapFileIndexFactory.BITMAP_INDEX
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "event");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "price");
+                            
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 MB");
+                        });
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
+        write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L));
+        write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
+        write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+
+        // test bitmap index and bsi index
+        Predicate predicate =
+                PredicateBuilder.and(
+                        new PredicateBuilder(rowType).equal(1, 
BinaryString.fromString("C")),
+                        new PredicateBuilder(rowType).greaterThan(2, 3L));
+        TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+        List<DataFileMeta> metas =
+                plan.splits().stream()
+                        .flatMap(split -> ((DataSplit) 
split).dataFiles().stream())
+                        .collect(Collectors.toList());
+        assertThat(metas.size()).isEqualTo(1);
+
+        RecordReader<InternalRow> reader =
+                
table.newRead().withFilter(predicate).createReader(plan.splits());
+        reader.forEachRemaining(
+                row -> {
+                    assertThat(row.getString(1).toString()).isEqualTo("C");
+                    assertThat(row.getLong(2)).isEqualTo(4L);
+                });
+    }
+
+    @Test
+    public void testBSIAndBitmapIndexInDisk() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("event", DataTypes.STRING())
+                        .field("price", DataTypes.BIGINT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
BitmapFileIndexFactory.BITMAP_INDEX
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "event");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "price");
+                            
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 B");
+                        });
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
+        write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L));
+        write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
+        write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+
+        // test bitmap index and bsi index
+        Predicate predicate =
+                PredicateBuilder.and(
+                        new PredicateBuilder(rowType).equal(1, 
BinaryString.fromString("C")),
+                        new PredicateBuilder(rowType).greaterThan(2, 3L));
+        TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+        List<DataFileMeta> metas =
+                plan.splits().stream()
+                        .flatMap(split -> ((DataSplit) 
split).dataFiles().stream())
+                        .collect(Collectors.toList());
+        assertThat(metas.size()).isEqualTo(2);
+
+        RecordReader<InternalRow> reader =
+                
table.newRead().withFilter(predicate).createReader(plan.splits());
+        reader.forEachRemaining(
+                row -> {
+                    assertThat(row.getString(1).toString()).isEqualTo("C");
+                    assertThat(row.getLong(2)).isEqualTo(4L);
+                });
+    }
+
     @Test
     public void testWithShardAppendTable() throws Exception {
         FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
-1));
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 2251619c4..806f1f628 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -119,6 +119,32 @@ public class SparkFileIndexITCase extends SparkWriteITCase 
{
                 });
     }
 
+    @Test
+    public void testReadWriteTableWithBitSliceIndex() throws 
Catalog.TableNotExistException {
+
+        spark.sql(
+                "CREATE TABLE T(a int) TBLPROPERTIES ("
+                        + "'file-index.bsi.columns'='a',"
+                        + "'file-index.in-manifest-threshold'='1B');");
+        spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");
+
+        // check query result
+        List<Row> rows = spark.sql("SELECT a FROM T where 
a>=3;").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[3], [4], [5]]");
+
+        // check index reader
+        foreachIndexReader(
+                fileIndexReader -> {
+                    FileIndexResult fileIndexResult =
+                            fileIndexReader.visitGreaterOrEqual(
+                                    new FieldRef(0, "", new IntType()), 3);
+                    
assertThat(fileIndexResult).isInstanceOf(BitmapIndexResultLazy.class);
+                    RoaringBitmap32 roaringBitmap32 =
+                            ((BitmapIndexResultLazy) fileIndexResult).get();
+                    
assertThat(roaringBitmap32).isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
+                });
+    }
+
     protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
             throws Catalog.TableNotExistException {
         Path tableRoot = 
fileSystemCatalog.getTableLocation(Identifier.create("db", "T"));
@@ -128,7 +154,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                         tableRoot,
                         RowType.of(),
                         new CoreOptions(new Options()).partitionDefaultName(),
-                        CoreOptions.FILE_FORMAT.defaultValue().toString(),
+                        CoreOptions.FILE_FORMAT.defaultValue(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
                         
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
@@ -154,12 +180,11 @@ public class SparkFileIndexITCase extends 
SparkWriteITCase {
                                 .collect(Collectors.toList());
                 // assert index file exist and only one index file
                 assert indexFiles.size() == 1;
-                try {
-                    FileIndexFormat.Reader reader =
-                            FileIndexFormat.createReader(
-                                    fileIO.newInputStream(
-                                            
dataFilePathFactory.toPath(indexFiles.get(0))),
-                                    tableSchema.logicalRowType());
+                try (FileIndexFormat.Reader reader =
+                        FileIndexFormat.createReader(
+                                fileIO.newInputStream(
+                                        
dataFilePathFactory.toPath(indexFiles.get(0))),
+                                tableSchema.logicalRowType())) {
                     Optional<FileIndexReader> fileIndexReader =
                             reader.readColumnIndex("a").stream().findFirst();
                     // assert index reader exist

Reply via email to