This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6cfc2557a [core] Support bitmap file index (#3775)
6cfc2557a is described below
commit 6cfc2557a421177be7a10a810be5b50f3e5741be
Author: Zhonghang Liu <[email protected]>
AuthorDate: Tue Jul 23 17:40:00 2024 +0800
[core] Support bitmap file index (#3775)
---
docs/content/append-table/query.md | 6 +-
.../paimon/fileindex/bitmap/BitmapFileIndex.java | 302 +++++++++++++++++
.../fileindex/bitmap/BitmapFileIndexFactory.java | 40 +++
.../fileindex/bitmap/BitmapFileIndexMeta.java | 377 +++++++++++++++++++++
.../fileindex/bitmap/BitmapIndexResultLazy.java | 57 ++++
.../org.apache.paimon.fileindex.FileIndexerFactory | 3 +-
.../fileindex/bitmapindex/TestBitmapFileIndex.java | 115 +++++++
.../apache/paimon/spark/SparkFileIndexITCase.java | 167 +++++++++
8 files changed, 1064 insertions(+), 3 deletions(-)
diff --git a/docs/content/append-table/query.md
b/docs/content/append-table/query.md
index fc4cbeb6c..269bf6e76 100644
--- a/docs/content/append-table/query.md
+++ b/docs/content/append-table/query.md
@@ -59,8 +59,7 @@ be stored directly in the manifest, otherwise in the
directory of the data file.
which has a separate file definition and can contain different types of
indexes with multiple columns.
Different file index may be efficient in different scenario. For example bloom
filter may speed up query in point lookup
-scenario. Using a bitmap may consume more space but can result in greater
accuracy. Though we only realize bloom filter
-currently, but other types of index will be supported in the future.
+scenario. Using a bitmap may consume more space but can result in greater
accuracy.
Currently, file index is only supported in append-only table.
@@ -69,6 +68,9 @@ Currently, file index is only supported in append-only table.
* `file-index.bloom-filter.<column_name>.fpp` to config false positive
probability.
* `file-index.bloom-filter.<column_name>.items` to config the expected
distinct items in one data file.
+`Bitmap`:
+* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+
More filter types will be supported...
If you want to add file index to existing table, without any rewrite, you can
use `rewrite_file_index` procedure. Before
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
new file mode 100644
index 000000000..ef2c2b207
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmap;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** implementation of bitmap file index. */
+public class BitmapFileIndex implements FileIndexer {
+
+ public static final int VERSION_1 = 1;
+
+ private final DataType dataType;
+
+ public BitmapFileIndex(DataType dataType, Options options) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public FileIndexWriter createWriter() {
+ return new Writer(dataType);
+ }
+
+ @Override
+ public FileIndexReader createReader(
+ SeekableInputStream seekableInputStream, int start, int length) {
+ try {
+ return new Reader(seekableInputStream, start, length);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class Writer extends FileIndexWriter {
+
+ private final DataType dataType;
+ private final Function<Object, Object> valueMapper;
+ private final Map<Object, RoaringBitmap32> id2bitmap = new HashMap<>();
+ private final RoaringBitmap32 nullBitmap = new RoaringBitmap32();
+ private int rowNumber;
+
+ public Writer(DataType dataType) {
+ this.dataType = dataType;
+ this.valueMapper = getValueMapper(dataType);
+ }
+
+ @Override
+ public void write(Object key) {
+ if (key == null) {
+ nullBitmap.add(rowNumber++);
+ } else {
+ id2bitmap
+ .computeIfAbsent(valueMapper.apply(key), k -> new
RoaringBitmap32())
+ .add(rowNumber++);
+ }
+ }
+
+ @Override
+ public byte[] serializedBytes() {
+
+ try {
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(output);
+
+ dos.writeByte(VERSION_1);
+
+ // 1.serialize bitmaps to bytes
+ byte[] nullBitmapBytes = nullBitmap.serialize();
+ Map<Object, byte[]> id2bitmapBytes =
+ id2bitmap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey(), e ->
e.getValue().serialize()));
+
+ // 2.build bitmap file index meta
+ LinkedHashMap<Object, Integer> bitmapOffsets = new
LinkedHashMap<>();
+ LinkedList<byte[]> serializeBitmaps = new LinkedList<>();
+ int[] offsetRef = {nullBitmap.isEmpty() ? 0 :
nullBitmapBytes.length};
+ id2bitmap.forEach(
+ (k, v) -> {
+ if (v.getCardinality() == 1) {
+ bitmapOffsets.put(k, -1 - v.iterator().next());
+ } else {
+ byte[] bytes = id2bitmapBytes.get(k);
+ serializeBitmaps.add(bytes);
+ bitmapOffsets.put(k, offsetRef[0]);
+ offsetRef[0] += bytes.length;
+ }
+ });
+ BitmapFileIndexMeta bitmapFileIndexMeta =
+ new BitmapFileIndexMeta(
+ dataType,
+ rowNumber,
+ id2bitmap.size(),
+ !nullBitmap.isEmpty(),
+ nullBitmap.getCardinality() == 1
+ ? -1 - nullBitmap.iterator().next()
+ : 0,
+ bitmapOffsets);
+
+ // 3.serialize meta
+ bitmapFileIndexMeta.serialize(dos);
+
+ // 4.serialize body
+ if (nullBitmap.getCardinality() > 1) {
+ dos.write(nullBitmapBytes);
+ }
+ for (byte[] bytes : serializeBitmaps) {
+ dos.write(bytes);
+ }
+ return output.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class Reader extends FileIndexReader {
+
+ private final SeekableInputStream seekableInputStream;
+ private final int headStart;
+ private int bodyStart;
+ private final Map<Object, RoaringBitmap32> bitmaps = new
LinkedHashMap<>();
+
+ private int version;
+ private BitmapFileIndexMeta bitmapFileIndexMeta;
+ private Function<Object, Object> valueMapper;
+
+ public Reader(SeekableInputStream seekableInputStream, int start, int
length) {
+ this.seekableInputStream = seekableInputStream;
+ this.headStart = start;
+ }
+
+ @Override
+ public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ return visitIn(fieldRef, Collections.singletonList(literal));
+ }
+
+ @Override
+ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object
literal) {
+ return visitNotIn(fieldRef, Collections.singletonList(literal));
+ }
+
+ @Override
+ public FileIndexResult visitIn(FieldRef fieldRef, List<Object>
literals) {
+ return new BitmapIndexResultLazy(
+ () -> {
+ readInternalMeta(fieldRef.type());
+ return getInListResultBitmap(literals);
+ });
+ }
+
+ @Override
+ public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ return new BitmapIndexResultLazy(
+ () -> {
+ readInternalMeta(fieldRef.type());
+ RoaringBitmap32 bitmap =
getInListResultBitmap(literals);
+ bitmap.flip(0, bitmapFileIndexMeta.getRowCount());
+ return bitmap;
+ });
+ }
+
+ @Override
+ public FileIndexResult visitIsNull(FieldRef fieldRef) {
+ return visitIn(fieldRef, Collections.singletonList(null));
+ }
+
+ @Override
+ public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
+ return visitNotIn(fieldRef, Collections.singletonList(null));
+ }
+
+ private RoaringBitmap32 getInListResultBitmap(List<Object> literals) {
+ return RoaringBitmap32.or(
+ literals.stream()
+ .map(
+ it ->
+ bitmaps.computeIfAbsent(
+ valueMapper.apply(it), k
-> readBitmap(k)))
+ .iterator());
+ }
+
+ private RoaringBitmap32 readBitmap(Object bitmapId) {
+ try {
+ if (!bitmapFileIndexMeta.contains(bitmapId)) {
+ return new RoaringBitmap32();
+ } else {
+ int offset = bitmapFileIndexMeta.getOffset(bitmapId);
+ if (offset < 0) {
+ return RoaringBitmap32.bitmapOf(-1 - offset);
+ } else {
+ seekableInputStream.seek(bodyStart + offset);
+ RoaringBitmap32 bitmap = new RoaringBitmap32();
+ bitmap.deserialize(new
DataInputStream(seekableInputStream));
+ return bitmap;
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void readInternalMeta(DataType dataType) {
+ if (this.bitmapFileIndexMeta == null) {
+ this.valueMapper = getValueMapper(dataType);
+ try {
+ seekableInputStream.seek(headStart);
+ this.version = seekableInputStream.read();
+ if (this.version > VERSION_1) {
+ throw new RuntimeException(
+ String.format(
+ "read index file fail, "
+ + "your plugin version is
lower than %d",
+ this.version));
+ }
+ DataInput input = new DataInputStream(seekableInputStream);
+ this.bitmapFileIndexMeta = new
BitmapFileIndexMeta(dataType);
+ this.bitmapFileIndexMeta.deserialize(input);
+ bodyStart = (int) seekableInputStream.getPos();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ // Currently, it is mainly used to convert timestamps to long
+ public static Function<Object, Object> getValueMapper(DataType dataType) {
+ return dataType.accept(
+ new DataTypeDefaultVisitor<Function<Object, Object>>() {
+ @Override
+ public Function<Object, Object> visit(TimestampType
timestampType) {
+ return
getTimeStampMapper(timestampType.getPrecision());
+ }
+
+ @Override
+ public Function<Object, Object> visit(
+ LocalZonedTimestampType localZonedTimestampType) {
+ return
getTimeStampMapper(localZonedTimestampType.getPrecision());
+ }
+
+ @Override
+ protected Function<Object, Object> defaultMethod(DataType
dataType) {
+ return Function.identity();
+ }
+
+ private Function<Object, Object> getTimeStampMapper(int
precision) {
+ return o -> {
+ if (o == null) {
+ return null;
+ } else if (precision <= 3) {
+ return ((Timestamp) o).getMillisecond();
+ } else {
+ return ((Timestamp) o).toMicros();
+ }
+ };
+ }
+ });
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexFactory.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexFactory.java
new file mode 100644
index 000000000..5b2ea144a
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmap;
+
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.FileIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+/** Factory to create {@link BitmapFileIndex}. */
+public class BitmapFileIndexFactory implements FileIndexerFactory {
+
+ public static final String BITMAP_INDEX = "bitmap";
+
+ @Override
+ public String identifier() {
+ return BITMAP_INDEX;
+ }
+
+ @Override
+ public FileIndexer create(DataType dataType, Options options) {
+ return new BitmapFileIndex(dataType, options);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
new file mode 100644
index 000000000..595e12191
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmap;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ *
+ *
+ * <pre>
+ * Bitmap file index format (V1)
+ * +-------------------------------------------------+-----------------
+ * | version (1 byte) |
+ * +-------------------------------------------------+
+ * | row count (4 bytes int) |
+ * +-------------------------------------------------+
+ * | non-null value bitmap number (4 bytes int) |
+ * +-------------------------------------------------+
+ * | has null value (1 byte) |
+ * +-------------------------------------------------+
+ * | null value offset (4 bytes if has null value) | HEAD
+ * +-------------------------------------------------+
+ * | value 1 | offset 1 |
+ * +-------------------------------------------------+
+ * | value 2 | offset 2 |
+ * +-------------------------------------------------+
+ * | value 3 | offset 3 |
+ * +-------------------------------------------------+
+ * | ... |
+ * +-------------------------------------------------+-----------------
+ * | serialized bitmap 1 |
+ * +-------------------------------------------------+
+ * | serialized bitmap 2 |
+ * +-------------------------------------------------+ BODY
+ * | serialized bitmap 3 |
+ * +-------------------------------------------------+
+ * | ... |
+ * +-------------------------------------------------+-----------------
+ *
+ * value x: var bytes for any data type (as bitmap
identifier)
+ * offset: 4 bytes int (when it is negative, it
represents that there is only one value
+ * and its position is the inverse of the
negative value)
+ * </pre>
+ */
+public class BitmapFileIndexMeta {
+
+ private final DataType dataType;
+
+ private int rowCount;
+ private int nonNullBitmapNumber;
+ private boolean hasNullValue;
+ private int nullValueOffset;
+ private LinkedHashMap<Object, Integer> bitmapOffsets;
+
+ public BitmapFileIndexMeta(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ public BitmapFileIndexMeta(
+ DataType dataType,
+ int rowCount,
+ int nonNullBitmapNumber,
+ boolean hasNullValue,
+ int nullValueOffset,
+ LinkedHashMap<Object, Integer> bitmapOffsets) {
+ this(dataType);
+ this.rowCount = rowCount;
+ this.nonNullBitmapNumber = nonNullBitmapNumber;
+ this.hasNullValue = hasNullValue;
+ this.nullValueOffset = nullValueOffset;
+ this.bitmapOffsets = bitmapOffsets;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public boolean contains(Object bitmapId) {
+ if (bitmapId == null) {
+ return hasNullValue;
+ }
+ return bitmapOffsets.containsKey(bitmapId);
+ }
+
+ public int getOffset(Object bitmapId) {
+ if (bitmapId == null) {
+ return nullValueOffset;
+ }
+ return bitmapOffsets.get(bitmapId);
+ }
+
+ public void serialize(DataOutput out) throws Exception {
+
+ ThrowableConsumer valueWriter =
+ dataType.accept(
+ new DataTypeVisitorAdapter<ThrowableConsumer>() {
+ @Override
+ public ThrowableConsumer visitBinaryString() {
+ return o -> {
+ byte[] bytes = ((BinaryString)
o).toBytes();
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ };
+ }
+
+ @Override
+ public ThrowableConsumer visitByte() {
+ return o -> out.writeByte((byte) o);
+ }
+
+ @Override
+ public ThrowableConsumer visitShort() {
+ return o -> out.writeShort((short) o);
+ }
+
+ @Override
+ public ThrowableConsumer visitInt() {
+ return o -> out.writeInt((int) o);
+ }
+
+ @Override
+ public ThrowableConsumer visitLong() {
+ return o -> out.writeLong((long) o);
+ }
+
+ @Override
+ public ThrowableConsumer visitFloat() {
+ return o -> out.writeFloat((float) o);
+ }
+
+ @Override
+ public ThrowableConsumer visitDouble() {
+ return o -> out.writeDouble((double) o);
+ }
+ });
+
+ out.writeInt(rowCount);
+ out.writeInt(nonNullBitmapNumber);
+ out.writeBoolean(hasNullValue);
+ if (hasNullValue) {
+ out.writeInt(nullValueOffset);
+ }
+ for (Map.Entry<Object, Integer> entry : bitmapOffsets.entrySet()) {
+ valueWriter.accept(entry.getKey());
+ out.writeInt(entry.getValue());
+ }
+ }
+
+ public void deserialize(DataInput in) throws Exception {
+
+ ThrowableSupplier valueReader =
+ dataType.accept(
+ new DataTypeVisitorAdapter<ThrowableSupplier>() {
+ @Override
+ public ThrowableSupplier visitBinaryString() {
+ return () -> {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return BinaryString.fromBytes(bytes);
+ };
+ }
+
+ @Override
+ public ThrowableSupplier visitByte() {
+ return in::readByte;
+ }
+
+ @Override
+ public ThrowableSupplier visitShort() {
+ return in::readShort;
+ }
+
+ @Override
+ public ThrowableSupplier visitInt() {
+ return in::readInt;
+ }
+
+ @Override
+ public ThrowableSupplier visitLong() {
+ return in::readLong;
+ }
+
+ @Override
+ public ThrowableSupplier visitFloat() {
+ return in::readFloat;
+ }
+
+ @Override
+ public ThrowableSupplier visitDouble() {
+ return in::readDouble;
+ }
+ });
+
+ rowCount = in.readInt();
+ nonNullBitmapNumber = in.readInt();
+ hasNullValue = in.readBoolean();
+ if (hasNullValue) {
+ nullValueOffset = in.readInt();
+ }
+ bitmapOffsets = new LinkedHashMap<>();
+ for (int i = 0; i < nonNullBitmapNumber; i++) {
+ bitmapOffsets.put(valueReader.get(), in.readInt());
+ }
+ }
+
+ /** functional interface. */
+ public interface ThrowableConsumer {
+ void accept(Object o) throws Exception;
+ }
+
+ /** functional interface. */
+ public interface ThrowableSupplier {
+ Object get() throws Exception;
+ }
+
+ /** simplified visitor. */
+ public abstract static class DataTypeVisitorAdapter<R> implements
DataTypeVisitor<R> {
+
+ public abstract R visitBinaryString();
+
+ public abstract R visitByte();
+
+ public abstract R visitShort();
+
+ public abstract R visitInt();
+
+ public abstract R visitLong();
+
+ public abstract R visitFloat();
+
+ public abstract R visitDouble();
+
+ @Override
+ public final R visit(CharType charType) {
+ return visitBinaryString();
+ }
+
+ @Override
+ public final R visit(VarCharType varCharType) {
+ return visitBinaryString();
+ }
+
+ @Override
+ public final R visit(BooleanType booleanType) {
+ return visitByte();
+ }
+
+ @Override
+ public final R visit(BinaryType binaryType) {
+ throw new UnsupportedOperationException("Does not support type
binary");
+ }
+
+ @Override
+ public final R visit(VarBinaryType varBinaryType) {
+ throw new UnsupportedOperationException("Does not support type
binary");
+ }
+
+ @Override
+ public final R visit(DecimalType decimalType) {
+ throw new UnsupportedOperationException("Does not support
decimal");
+ }
+
+ @Override
+ public final R visit(TinyIntType tinyIntType) {
+ return visitByte();
+ }
+
+ @Override
+ public final R visit(SmallIntType smallIntType) {
+ return visitShort();
+ }
+
+ @Override
+ public final R visit(IntType intType) {
+ return visitInt();
+ }
+
+ @Override
+ public final R visit(BigIntType bigIntType) {
+ return visitLong();
+ }
+
+ @Override
+ public final R visit(FloatType floatType) {
+ return visitFloat();
+ }
+
+ @Override
+ public final R visit(DoubleType doubleType) {
+ return visitDouble();
+ }
+
+ @Override
+ public final R visit(DateType dateType) {
+ return visitInt();
+ }
+
+ @Override
+ public final R visit(TimeType timeType) {
+ return visitInt();
+ }
+
+ @Override
+ public final R visit(ArrayType arrayType) {
+ throw new UnsupportedOperationException("Does not support type
array");
+ }
+
+ @Override
+ public final R visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException("Does not support type
mutiset");
+ }
+
+ @Override
+ public final R visit(TimestampType timestampType) {
+ return visitLong();
+ }
+
+ @Override
+ public final R visit(LocalZonedTimestampType localZonedTimestampType) {
+ return visitLong();
+ }
+
+ @Override
+ public final R visit(MapType mapType) {
+ throw new UnsupportedOperationException("Does not support type
map");
+ }
+
+ @Override
+ public final R visit(RowType rowType) {
+ throw new UnsupportedOperationException("Does not support type
row");
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResultLazy.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResultLazy.java
new file mode 100644
index 000000000..15210e856
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResultLazy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmap;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.util.function.Supplier;
+
+/** bitmap file index result. */
+public class BitmapIndexResultLazy extends LazyField<RoaringBitmap32>
implements FileIndexResult {
+
+ public BitmapIndexResultLazy(Supplier<RoaringBitmap32> supplier) {
+ super(supplier);
+ }
+
+ public boolean remain() {
+ return !get().isEmpty();
+ }
+
+ public FileIndexResult and(FileIndexResult fileIndexResult) {
+ if (fileIndexResult instanceof BitmapIndexResultLazy) {
+ return new BitmapIndexResultLazy(
+ () ->
+ RoaringBitmap32.and(
+ get(), ((BitmapIndexResultLazy)
fileIndexResult).get()));
+ }
+ return FileIndexResult.super.and(fileIndexResult);
+ }
+
+ public FileIndexResult or(FileIndexResult fileIndexResult) {
+ if (fileIndexResult instanceof BitmapIndexResultLazy) {
+ return new BitmapIndexResultLazy(
+ () ->
+ RoaringBitmap32.or(
+ get(), ((BitmapIndexResultLazy)
fileIndexResult).get()));
+ }
+ return FileIndexResult.super.and(fileIndexResult);
+ }
+}
diff --git
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
index 67abe62a6..8a899eb23 100644
---
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
+++
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
\ No newline at end of file
+org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
+org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
\ No newline at end of file
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
new file mode 100644
index 000000000..923cc5c7b
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmapindex;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** test for {@link BitmapFileIndex}. */
+public class TestBitmapFileIndex {
+
+ @Test
+ public void testFlip() {
+ RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOf(1, 3, 5);
+ bitmap.flip(0, 6);
+ assert bitmap.equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+ }
+
+ @Test
+ public void testBitmapIndex1() {
+ VarCharType dataType = new VarCharType();
+ FieldRef fieldRef = new FieldRef(0, "", dataType);
+ BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, null);
+ FileIndexWriter writer = bitmapFileIndex.createWriter();
+ Object[] arr = {
+ BinaryString.fromString("a"),
+ null,
+ BinaryString.fromString("b"),
+ null,
+ BinaryString.fromString("a"),
+ };
+ for (Object o : arr) {
+ writer.write(o);
+ }
+ byte[] bytes = writer.serializedBytes();
+ ByteArraySeekableStream seekableStream = new
ByteArraySeekableStream(bytes);
+ FileIndexReader reader = bitmapFileIndex.createReader(seekableStream,
0, bytes.length);
+
+ BitmapIndexResultLazy result1 =
+ (BitmapIndexResultLazy) reader.visitEqual(fieldRef,
BinaryString.fromString("a"));
+ assert result1.get().equals(RoaringBitmap32.bitmapOf(0, 4));
+
+ BitmapIndexResultLazy result2 =
+ (BitmapIndexResultLazy) reader.visitEqual(fieldRef,
BinaryString.fromString("b"));
+ assert result2.get().equals(RoaringBitmap32.bitmapOf(2));
+
+ BitmapIndexResultLazy result3 = (BitmapIndexResultLazy)
reader.visitIsNull(fieldRef);
+ assert result3.get().equals(RoaringBitmap32.bitmapOf(1, 3));
+
+ BitmapIndexResultLazy result4 = (BitmapIndexResultLazy)
result1.and(result2);
+ assert result4.get().equals(RoaringBitmap32.bitmapOf());
+
+ BitmapIndexResultLazy result5 = (BitmapIndexResultLazy)
result1.or(result2);
+ assert result5.get().equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+ }
+
+ @Test
+ public void testBitmapIndex2() {
+ IntType dataType = new IntType();
+ FieldRef fieldRef = new FieldRef(0, "", dataType);
+ BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, null);
+ FileIndexWriter writer = bitmapFileIndex.createWriter();
+ Object[] arr = {0, 1, null};
+ for (Object o : arr) {
+ writer.write(o);
+ }
+ byte[] bytes = writer.serializedBytes();
+ ByteArraySeekableStream seekableStream = new
ByteArraySeekableStream(bytes);
+ FileIndexReader reader = bitmapFileIndex.createReader(seekableStream,
0, bytes.length);
+
+ BitmapIndexResultLazy result1 = (BitmapIndexResultLazy)
reader.visitEqual(fieldRef, 1);
+ assert result1.get().equals(RoaringBitmap32.bitmapOf(1));
+
+ BitmapIndexResultLazy result2 = (BitmapIndexResultLazy)
reader.visitIsNull(fieldRef);
+ assert result2.get().equals(RoaringBitmap32.bitmapOf(2));
+
+ BitmapIndexResultLazy result3 = (BitmapIndexResultLazy)
reader.visitIsNotNull(fieldRef);
+ assert result3.get().equals(RoaringBitmap32.bitmapOf(0, 1));
+
+ BitmapIndexResultLazy result4 =
+ (BitmapIndexResultLazy) reader.visitNotIn(fieldRef,
Arrays.asList(1, 2));
+ assert result4.get().equals(RoaringBitmap32.bitmapOf(0, 2));
+
+ BitmapIndexResultLazy result5 =
+ (BitmapIndexResultLazy) reader.visitNotIn(fieldRef,
Arrays.asList(1, 0));
+ assert result5.get().equals(RoaringBitmap32.bitmapOf(2));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
new file mode 100644
index 000000000..55ce7c9aa
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for using file index in Spark. */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SparkFileIndexITCase extends SparkWriteITCase {
+
+ protected FileIO fileIO = LocalFileIO.create();
+ protected FileSystemCatalog fileSystemCatalog;
+
+ @BeforeAll
+ public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+ Path warehousePath = new Path("file:///" + tempDir.toString());
+ spark =
+ SparkSession.builder()
+ .master("local[1]")
+ .config(
+ "spark.sql.extensions",
+ PaimonSparkSessionExtensions.class.getName())
+ .getOrCreate();
+ spark.conf().set("spark.sql.catalog.paimon",
SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.paimon.warehouse",
warehousePath.toString());
+ spark.conf().set("spark.sql.shuffle.partitions", 1);
+ spark.conf().set("spark.default.parallelism", 1);
+ spark.sql("CREATE DATABASE paimon.db");
+ spark.sql("USE paimon.db");
+
+ Options options = new Options();
+ options.set(WAREHOUSE,
spark.conf().get("spark.sql.catalog.paimon.warehouse"));
+ fileSystemCatalog =
+ (FileSystemCatalog)
CatalogFactory.createCatalog(CatalogContext.create(options));
+ }
+
+ @Test
+ public void testReadWriteTableWithBitmapIndex() throws
Catalog.TableNotExistException {
+
+ spark.sql(
+ "CREATE TABLE T(a int) TBLPROPERTIES ("
+ + "'file-index.bitmap.columns'='a',"
+ + "'file-index.in-manifest-threshold'='1B');");
+ spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");
+
+ // check query result
+ List<Row> rows = spark.sql("SELECT a FROM T where
a='3';").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[3]]");
+
+ // check index reader
+ foreachIndexReader(
+ fileIndexReader -> {
+ FileIndexResult fileIndexResult =
+ fileIndexReader.visitEqual(new FieldRef(0, "", new
IntType()), 3);
+ assert fileIndexResult instanceof BitmapIndexResultLazy;
+ RoaringBitmap32 roaringBitmap32 =
+ ((BitmapIndexResultLazy) fileIndexResult).get();
+ assert roaringBitmap32.equals(RoaringBitmap32.bitmapOf(3));
+ });
+ }
+
+ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
+ throws Catalog.TableNotExistException {
+ Path tableRoot =
fileSystemCatalog.getDataTableLocation(Identifier.create("db", "T"));
+ SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+ FileStorePathFactory pathFactory =
+ new FileStorePathFactory(
+ tableRoot,
+ RowType.of(),
+ new CoreOptions(new Options()).partitionDefaultName(),
+ CoreOptions.FILE_FORMAT.defaultValue().toString());
+
+ Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(
+ dataSplit.partition(), dataSplit.bucket());
+ for (DataFileMeta dataFileMeta : dataSplit.dataFiles()) {
+ TableSchema tableSchema =
schemaManager.schema(dataFileMeta.schemaId());
+ List<String> indexFiles =
+ dataFileMeta.extraFiles().stream()
+ .filter(
+ name ->
+ name.endsWith(
+
DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ // assert index file exist and only one index file
+ assert indexFiles.size() == 1;
+ try {
+ FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ fileIO.newInputStream(
+
dataFilePathFactory.toPath(indexFiles.get(0))),
+ tableSchema.logicalRowType());
+ Optional<FileIndexReader> fileIndexReader =
+ reader.readColumnIndex("a").stream().findFirst();
+ // assert index reader exist
+ assert fileIndexReader.isPresent();
+ consumer.accept(fileIndexReader.get());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}