This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 497bcf2  feat: Migrate file index interfaces, format, reader, result, 
and factory (#45)
497bcf2 is described below

commit 497bcf2b252f5b5b8dc1be5af08c3bbc3779ab74
Author: lxy <[email protected]>
AuthorDate: Thu Jun 4 16:04:58 2026 +0800

    feat: Migrate file index interfaces, format, reader, result, and factory 
(#45)
---
 include/paimon/file_index/bitmap_index_result.h    |  63 ++
 include/paimon/file_index/file_index_format.h      | 122 +++
 include/paimon/file_index/file_index_reader.h      |  69 ++
 include/paimon/file_index/file_index_result.h      |  79 ++
 include/paimon/file_index/file_index_writer.h      |  56 ++
 include/paimon/file_index/file_indexer.h           |  62 ++
 include/paimon/file_index/file_indexer_factory.h   |  46 ++
 .../file_index/empty/empty_file_index_reader.h     |  72 ++
 .../empty/empty_file_index_reader_test.cpp         |  47 ++
 src/paimon/common/file_index/file_index_format.cpp | 156 ++++
 .../common/file_index/file_index_format_test.cpp   | 822 +++++++++++++++++++++
 src/paimon/common/file_index/file_index_reader.cpp | 103 +++
 .../common/file_index/file_index_reader_test.cpp   |  67 ++
 src/paimon/common/file_index/file_index_result.cpp |  90 +++
 .../common/file_index/file_index_result_test.cpp   |  43 ++
 .../common/file_index/file_indexer_factory.cpp     |  46 ++
 .../file_index/file_indexer_factory_test.cpp       |  49 ++
 17 files changed, 1992 insertions(+)

diff --git a/include/paimon/file_index/bitmap_index_result.h 
b/include/paimon/file_index/bitmap_index_result.h
new file mode 100644
index 0000000..2741ffd
--- /dev/null
+++ b/include/paimon/file_index/bitmap_index_result.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/result.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// The implementation of bitmap file index result, represents row granularity.
+/// @note The inner bitmap in BitmapIndexResult is lazily initialized only when
+/// the result is about to be used.
+class PAIMON_EXPORT BitmapIndexResult : public FileIndexResult {
+ public:
+    using BitmapSupplier = std::function<Result<RoaringBitmap32>()>;
+
+    explicit BitmapIndexResult(BitmapSupplier bitmap_supplier);
+    ~BitmapIndexResult() override;
+
+    /// @return Whether the file is remained.
+    Result<bool> IsRemain() const override;
+
+    /// Compute the intersection of the current result with the provided 
result.
+    Result<std::shared_ptr<FileIndexResult>> And(
+        const std::shared_ptr<FileIndexResult>& other) override;
+
+    /// Compute the union of the current result with the provided result.
+    Result<std::shared_ptr<FileIndexResult>> Or(
+        const std::shared_ptr<FileIndexResult>& other) override;
+
+    /// @return Inner `RoaringBitmap32`.
+    Result<const RoaringBitmap32*> GetBitmap() const;
+
+    std::string ToString() const override;
+
+ private:
+    mutable bool initialized_ = false;
+    BitmapSupplier bitmap_supplier_;
+    mutable RoaringBitmap32 bitmap_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_index_format.h 
b/include/paimon/file_index/file_index_format.h
new file mode 100644
index 0000000..b46dee8
--- /dev/null
+++ b/include/paimon/file_index/file_index_format.h
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+class InputStream;
+class MemoryPool;
+
+/// Defines the on-disk format and versioning for Paimon file-level indexes.
+/// File index file format. Put all column and offset in the header.
+///
+/// <pre>
+///  _______________________________________    _____________________
+/// |     magic    | version | head length |
+/// |--------------------------------------|
+/// |            column number             |
+/// |--------------------------------------|
+/// |   column 1        |  index number    |
+/// |--------------------------------------|
+/// |  index name 1 | start pos | length   |
+/// |--------------------------------------|
+/// |  index name 2 | start pos | length   |
+/// |--------------------------------------|
+/// |  index name 3 | start pos | length   |
+/// |--------------------------------------|            HEAD
+/// |   column 2        |  index number    |
+/// |--------------------------------------|
+/// |  index name 1 | start pos | length   |
+/// |--------------------------------------|
+/// |  index name 2 | start pos | length   |
+/// |--------------------------------------|
+/// |  index name 3 | start pos | length   |
+/// |--------------------------------------|
+/// |                 ...                  |
+/// |--------------------------------------|
+/// |                 ...                  |
+/// |--------------------------------------|
+/// |  redundant length | redundant bytes  |
+/// |--------------------------------------|    ---------------------
+/// |                BODY                  |
+/// |                BODY                  |
+/// |                BODY                  |             BODY
+/// |                BODY                  |
+/// |______________________________________|    _____________________
+///
+/// magic:               8 bytes long
+/// version:             4 bytes int
+/// head length:         4 bytes int
+/// column number:       4 bytes int
+/// column x:            var bytes utf (length + bytes)
+/// index number:        4 bytes int (how many column items below)
+/// index name x:        var bytes utf
+/// start pos:           4 bytes int
+/// length:              4 bytes int
+/// redundant length:    4 bytes int (for compatibility with later versions, 
in this version,
+///                      content is zero)
+/// redundant bytes:     var bytes (for compatibility with later version, in 
this version, is empty)
+/// BODY:                column index bytes + column index bytes + column 
index bytes + .......
+/// </pre>
+///
+class PAIMON_EXPORT FileIndexFormat {
+ public:
+    class Reader;
+    /// Creates a `Reader` to parse a index file (may contain multiple 
indexes) from the given input
+    /// stream.
+    ///
+    /// @param input_stream Input stream containing serialized index data.
+    /// @param pool Memory pool for temporary allocations during reading.
+    /// @return A unique pointer to a `Reader` on success, or an error if the 
stream is invalid
+    ///         (e.g., wrong magic, unsupported version, or corrupted data).
+    static Result<std::unique_ptr<Reader>> CreateReader(
+        const std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<MemoryPool>& pool);
+
+ public:
+    static const int64_t MAGIC;
+    static const int32_t EMPTY_INDEX_FLAG;
+    static const int32_t V_1;
+};
+
+/// Reader for file index file.
+class FileIndexFormat::Reader {
+ public:
+    virtual ~Reader() = default;
+    /// Reads index data for a specific column from the index file.
+    ///
+    /// @param column_name Name of the column to retrieve index data for.
+    /// @param arrow_schema Arrow schema that must contain a field 
corresponding to `column_name`.
+    /// @return A vector of shared pointers to FileIndexReader objects, each 
corresponding to a
+    ///         different index type; or an error if the column is not indexed 
or the index is
+    ///         malformed.
+    virtual Result<std::vector<std::shared_ptr<FileIndexReader>>> 
ReadColumnIndex(
+        const std::string& column_name, ::ArrowSchema* arrow_schema) const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_index_reader.h 
b/include/paimon/file_index/file_index_reader.h
new file mode 100644
index 0000000..ff1d31d
--- /dev/null
+++ b/include/paimon/file_index/file_index_reader.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/predicate/function_visitor.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Evaluates filter predicates against a file-level index to determine file 
eligibility.
+///
+/// `FileIndexReader` implements the `FunctionVisitor` interface specialized 
to produce
+/// `std::shared_ptr<FileIndexResult>` objects. It reads pre-built file-level 
index data
+/// (e.g., bitmap, bsi or bloom filters) from index file and evaluates
+/// whether a given data file may contain rows matching a specific predicate.
+class PAIMON_EXPORT FileIndexReader : public 
FunctionVisitor<std::shared_ptr<FileIndexResult>> {
+ public:
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNotNull() override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNull() override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitNotEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitLessThan(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitLessOrEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterThan(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterOrEqual(const 
Literal& literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIn(const 
std::vector<Literal>& literals) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitNotIn(
+        const std::vector<Literal>& literals) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitStartsWith(const Literal& 
prefix) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitEndsWith(const Literal& 
suffix) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitContains(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitLike(const Literal& literal) 
override;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_index_result.h 
b/include/paimon/file_index/file_index_result.h
new file mode 100644
index 0000000..e3a567e
--- /dev/null
+++ b/include/paimon/file_index/file_index_result.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/defs.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// File index result to decide whether filter a file.
+class PAIMON_EXPORT FileIndexResult : public 
std::enable_shared_from_this<FileIndexResult> {
+ public:
+    virtual ~FileIndexResult() = default;
+
+    /// @return A shared instance representing "retain the file".
+    /// @note This is a singleton-like utility; all calls return equivalent 
objects.
+    static std::shared_ptr<FileIndexResult> Remain();
+
+    /// @return A shared instance representing "skip the file".
+    /// @note This is a singleton-like utility; all calls return equivalent 
objects.
+    static std::shared_ptr<FileIndexResult> Skip();
+
+    /// @return Whether the file is remained.
+    virtual Result<bool> IsRemain() const = 0;
+
+    /// Compute the intersection of the current result with the provided 
result.
+    virtual Result<std::shared_ptr<FileIndexResult>> And(
+        const std::shared_ptr<FileIndexResult>& other);
+
+    /// Compute the union of the current result with the provided result.
+    virtual Result<std::shared_ptr<FileIndexResult>> Or(
+        const std::shared_ptr<FileIndexResult>& other);
+
+    virtual std::string ToString() const = 0;
+};
+
+/// Concrete implementation of FileIndexResult that always retains the file.
+class PAIMON_EXPORT Remain : public FileIndexResult {
+ public:
+    Result<bool> IsRemain() const override;
+    Result<std::shared_ptr<FileIndexResult>> And(
+        const std::shared_ptr<FileIndexResult>& other) override;
+    Result<std::shared_ptr<FileIndexResult>> Or(
+        const std::shared_ptr<FileIndexResult>& other) override;
+    std::string ToString() const override;
+};
+
+/// Concrete implementation of FileIndexResult that always skips the file.
+class PAIMON_EXPORT Skip : public FileIndexResult {
+ public:
+    Result<bool> IsRemain() const override;
+    Result<std::shared_ptr<FileIndexResult>> And(
+        const std::shared_ptr<FileIndexResult>& other) override;
+    Result<std::shared_ptr<FileIndexResult>> Or(
+        const std::shared_ptr<FileIndexResult>& other) override;
+    std::string ToString() const override;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_index_writer.h 
b/include/paimon/file_index/file_index_writer.h
new file mode 100644
index 0000000..4fc388b
--- /dev/null
+++ b/include/paimon/file_index/file_index_writer.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray;
+
+namespace paimon {
+/// Interface for writing file-level index data from Arrow batches.
+class PAIMON_EXPORT FileIndexWriter {
+ public:
+    virtual ~FileIndexWriter() = default;
+
+    /// Adds a batch of data to the index writer.
+    ///
+    /// @param batch Pointer to a C ArrowArray derived from arrow struct array 
contain specified
+    ///              indexed field.
+    /// @return `Status::OK()` on success; otherwise, an error indicating 
failure (e.g., schema
+    ///         mismatch).
+    virtual Status AddBatch(::ArrowArray* batch) = 0;
+
+    /// Serializes the built index into a byte buffer.
+    ///
+    /// @note This method returns the complete serialized form of the index 
after all batches
+    /// have been added. It can be called only once and typically assumes no 
further calls to
+    /// `AddBatch()` will occur afterward.
+    ///
+    /// @return A unique pointer to a byte array containing the serialized 
index data,
+    ///         or an error if serialization fails.
+    virtual Result<PAIMON_UNIQUE_PTR<Bytes>> SerializedBytes() const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_indexer.h 
b/include/paimon/file_index/file_indexer.h
new file mode 100644
index 0000000..72c1fc9
--- /dev/null
+++ b/include/paimon/file_index/file_indexer.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/file_index/file_index_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+/// File index interface. To read and write a file index.
+class PAIMON_EXPORT FileIndexer {
+ public:
+    virtual ~FileIndexer() = default;
+
+    /// Create `FileIndexReader` with input stream.
+    ///
+    /// @param arrow_schema ArrowSchema derived from arrow schema or struct 
type with
+    /// specified indexed field.
+    /// @param start Start position of input stream.
+    /// @param length Length of index bytes.
+    /// @param input_stream Input stream for read index.
+    /// @param pool Memory pool for memory allocation.
+    /// @return A `FileIndexReader` to read index.
+    virtual Result<std::shared_ptr<FileIndexReader>> CreateReader(
+        ::ArrowSchema* arrow_schema, int32_t start, int32_t length,
+        const std::shared_ptr<InputStream>& input_stream,
+        const std::shared_ptr<MemoryPool>& pool) const = 0;
+
+    /// Create `FileIndexWriter` for arrow schema.
+    ///
+    /// @param arrow_schema ArrowSchema derived from arrow schema or struct 
type with
+    /// specified indexed field.
+    /// @param pool Memory pool for memory allocation.
+    /// @return A `FileIndexWriter` to write index.
+    virtual Result<std::shared_ptr<FileIndexWriter>> CreateWriter(
+        ::ArrowSchema* arrow_schema, const std::shared_ptr<MemoryPool>& pool) 
const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/file_index/file_indexer_factory.h 
b/include/paimon/file_index/file_indexer_factory.h
new file mode 100644
index 0000000..bb146de
--- /dev/null
+++ b/include/paimon/file_index/file_indexer_factory.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/factory.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class FileIndexer;
+
+/// File index factory to construct `FileIndexer`.
+class PAIMON_EXPORT FileIndexerFactory : public Factory {
+ public:
+    ~FileIndexerFactory() override;
+
+    /// Get a `FileIndexer` according to identifier and options.
+    static Result<std::unique_ptr<FileIndexer>> Get(
+        const std::string& identifier, const std::map<std::string, 
std::string>& options);
+
+    /// Create a `FileIndexer` with specified options.
+    virtual Result<std::unique_ptr<FileIndexer>> Create(
+        const std::map<std::string, std::string>& options) const = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/empty/empty_file_index_reader.h 
b/src/paimon/common/file_index/empty/empty_file_index_reader.h
new file mode 100644
index 0000000..4301159
--- /dev/null
+++ b/src/paimon/common/file_index/empty/empty_file_index_reader.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Literal;
+
+/// Empty file index which has no writer and no serialized bytes.
+/// No data in the file index, which mean this file has no related records.
+class EmptyFileIndexReader : public FileIndexReader {
+ public:
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNotNull() override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitStartsWith(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitEndsWith(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitContains(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitLike(const Literal& literal) 
override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitLessThan(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterOrEqual(const 
Literal& literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitLessOrEqual(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterThan(const Literal& 
literal) override {
+        return FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitIn(
+        const std::vector<Literal>& literals) override {
+        return FileIndexResult::Skip();
+    }
+};
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/empty/empty_file_index_reader_test.cpp 
b/src/paimon/common/file_index/empty/empty_file_index_reader_test.cpp
new file mode 100644
index 0000000..4225f17
--- /dev/null
+++ b/src/paimon/common/file_index/empty/empty_file_index_reader_test.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/empty/empty_file_index_reader.h"
+
+#include <cstdint>
+
+#include "gtest/gtest.h"
+#include "paimon/predicate/literal.h"
+
+namespace paimon::test {
+TEST(EmptyFileIndexReaderTest, TestSimple) {
+    Literal lit0(static_cast<int32_t>(0));
+    EmptyFileIndexReader reader;
+
+    ASSERT_FALSE(reader.VisitIsNotNull().value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitEqual(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitStartsWith(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitEndsWith(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitContains(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitLike(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitLessThan(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitGreaterOrEqual(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitLessOrEqual(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitGreaterThan(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitIn({lit0}).value()->IsRemain().value());
+
+    ASSERT_TRUE(reader.VisitIsNull().value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotEqual(lit0).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotIn({lit0}).value()->IsRemain().value());
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/file_index_format.cpp 
b/src/paimon/common/file_index/file_index_format.cpp
new file mode 100644
index 0000000..8550084
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_format.cpp
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_index_format.h"
+
+#include <cassert>
+#include <map>
+#include <unordered_map>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "paimon/common/file_index/empty/empty_file_index_reader.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/file_index/file_indexer_factory.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class InputStream;
+class MemoryPool;
+
+class FileIndexFormatReaderImpl : public FileIndexFormat::Reader {
+ public:
+    using HeaderType =
+        std::unordered_map<std::string,
+                           std::unordered_map<std::string, std::pair<int32_t, 
int32_t>>>;
+
+    static Result<std::unique_ptr<FileIndexFormatReaderImpl>> Create(
+        const std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<MemoryPool>& pool) {
+        DataInputStream data_input_stream(input_stream);
+        PAIMON_ASSIGN_OR_RAISE(int64_t magic, 
data_input_stream.ReadValue<int64_t>());
+        if (magic != FileIndexFormat::MAGIC) {
+            return Status::Invalid("This file is not file index file.");
+        }
+        PAIMON_ASSIGN_OR_RAISE(int32_t version, 
data_input_stream.ReadValue<int32_t>());
+        if (version != FileIndexFormat::V_1) {
+            return Status::Invalid(
+                fmt::format("This index file is version of {}, not in 
supported version list [{}]",
+                            version, FileIndexFormat::V_1));
+        }
+        PAIMON_ASSIGN_OR_RAISE(int32_t head_length, 
data_input_stream.ReadValue<int32_t>());
+        auto head_bytes = std::make_unique<Bytes>(head_length - 8 - 4 - 4, 
pool.get());
+        PAIMON_RETURN_NOT_OK(data_input_stream.ReadBytes(head_bytes.get()));
+
+        auto byte_array_input_stream =
+            std::make_shared<ByteArrayInputStream>(head_bytes->data(), 
head_bytes->size());
+        DataInputStream inner_data_input_stream(byte_array_input_stream);
+        PAIMON_ASSIGN_OR_RAISE(int32_t column_size, 
inner_data_input_stream.ReadValue<int32_t>());
+
+        HeaderType header;
+        for (int32_t i = 0; i < column_size; i++) {
+            PAIMON_ASSIGN_OR_RAISE(std::string column_name, 
inner_data_input_stream.ReadString());
+            PAIMON_ASSIGN_OR_RAISE(int32_t index_size,
+                                   
inner_data_input_stream.ReadValue<int32_t>());
+            auto& index_map = header[column_name];
+            for (int32_t j = 0; j < index_size; j++) {
+                PAIMON_ASSIGN_OR_RAISE(std::string index_type,
+                                       inner_data_input_stream.ReadString());
+                PAIMON_ASSIGN_OR_RAISE(int32_t offset,
+                                       
inner_data_input_stream.ReadValue<int32_t>());
+                PAIMON_ASSIGN_OR_RAISE(int32_t length,
+                                       
inner_data_input_stream.ReadValue<int32_t>());
+                index_map[index_type] = std::make_pair(offset, length);
+            }
+        }
+        return std::unique_ptr<FileIndexFormatReaderImpl>(
+            new FileIndexFormatReaderImpl(input_stream, std::move(header), 
pool));
+    }
+
+    Result<std::vector<std::shared_ptr<FileIndexReader>>> ReadColumnIndex(
+        const std::string& column_name, ::ArrowSchema* c_arrow_schema) const 
override {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
arrow_schema,
+                                          arrow::ImportSchema(c_arrow_schema));
+        auto column_field = arrow_schema->GetFieldByName(column_name);
+        if (!column_field) {
+            return Status::Invalid(fmt::format("cannot find column {} in 
schema", column_name));
+        }
+        std::vector<std::shared_ptr<FileIndexReader>> res;
+        auto index_iter = header_.find(column_name);
+        if (index_iter != header_.end()) {
+            const auto& index_map = index_iter->second;
+            for (const auto& [index_type, offset_and_length] : index_map) {
+                PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexReader> 
file_index_reader,
+                                       
GetFileIndexReader(arrow::schema({column_field}), index_type,
+                                                          offset_and_length));
+                if (file_index_reader) {
+                    // skip the index not registered
+                    res.push_back(std::move(file_index_reader));
+                }
+            }
+        }
+        return res;
+    }
+
+ private:
+    FileIndexFormatReaderImpl(const std::shared_ptr<InputStream>& 
input_stream, HeaderType&& header,
+                              const std::shared_ptr<MemoryPool>& pool)
+        : input_stream_(input_stream), pool_(pool), header_(std::move(header)) 
{
+        assert(input_stream_);
+    }
+
+    Result<std::shared_ptr<FileIndexReader>> GetFileIndexReader(
+        const std::shared_ptr<arrow::Schema>& arrow_schema, const std::string& 
index_type,
+        const std::pair<int32_t, int32_t>& offset_and_length) const {
+        if (offset_and_length.first == FileIndexFormat::EMPTY_INDEX_FLAG) {
+            return std::make_shared<EmptyFileIndexReader>();
+        }
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileIndexer> file_indexer,
+                               FileIndexerFactory::Get(index_type, 
/*options=*/{}));
+        // assert(file_indexer);
+        if (!file_indexer) {
+            return std::shared_ptr<FileIndexReader>();
+        }
+        ArrowSchema c_arrow_schema;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, 
&c_arrow_schema));
+        return file_indexer->CreateReader(&c_arrow_schema, 
offset_and_length.first,
+                                          offset_and_length.second, 
input_stream_, pool_);
+    }
+
+ private:
+    std::shared_ptr<InputStream> input_stream_;
+    std::shared_ptr<MemoryPool> pool_;
+    // get header and cache it.
+    // [column_name : [index_type : {offset, length}]]
+    HeaderType header_;
+};
+
+const int64_t FileIndexFormat::MAGIC = 1493475289347502LL;
+const int32_t FileIndexFormat::EMPTY_INDEX_FLAG = -1;
+const int32_t FileIndexFormat::V_1 = 1;
+
+Result<std::unique_ptr<FileIndexFormat::Reader>> FileIndexFormat::CreateReader(
+    const std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<MemoryPool>& pool) {
+    return FileIndexFormatReaderImpl::Create(input_stream, pool);
+}
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/file_index_format_test.cpp 
b/src/paimon/common/file_index/file_index_format_test.cpp
new file mode 100644
index 0000000..7851d57
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_format_test.cpp
@@ -0,0 +1,822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/file_index/file_index_format.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
+#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
+#include "paimon/common/file_index/empty/empty_file_index_reader.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class FileIndexFormatTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    std::unique_ptr<::ArrowSchema> CreateArrowSchema(
+        const std::shared_ptr<arrow::Schema>& schema) const {
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        EXPECT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok());
+        return c_schema;
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(FileIndexFormatTest, TestCreateEmptyFileIndexReader) {
+    auto schema = arrow::schema({arrow::field("c1", arrow::utf8())});
+    std::vector<char> index_file_bytes = {0,  5,  78, 78, -48, 26, 53,  -82, 
0,   0,   0,   1,
+                                          0,  0,  0,  47, 0,   0,  0,   1,   
0,   2,   99,  49,
+                                          0,  0,  0,  1,  0,   5,  101, 109, 
112, 116, 121, -1,
+                                          -1, -1, -1, 0,  0,   0,  0,   0,   
0,   0,   0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_file_bytes.data(), 
index_file_bytes.size());
+    ASSERT_OK_AND_ASSIGN(auto reader, 
FileIndexFormat::CreateReader(input_stream, pool_));
+    ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                         reader->ReadColumnIndex("c1", 
CreateArrowSchema(schema).get()));
+    ASSERT_EQ(1, index_file_readers.size());
+    auto* empty_reader = 
dynamic_cast<EmptyFileIndexReader*>(index_file_readers[0].get());
+    ASSERT_TRUE(empty_reader);
+}
+
+TEST_F(FileIndexFormatTest, TestSimple) {
+    auto schema =
+        arrow::schema({arrow::field("f1", arrow::int32()), arrow::field("f2", 
arrow::int32()),
+                       arrow::field("non-exist", arrow::int32())});
+    std::vector<uint8_t> index_file_bytes = {
+        0,   5,   78,  78,  208, 26,  53,  174, 0,   0,   0,   1,   0,  0,   
0,   96,  0,   0,
+        0,   3,   0,   2,   102, 48,  0,   0,   0,   1,   0,   6,   98, 105, 
116, 109, 97,  112,
+        0,   0,   0,   96,  0,   0,   0,   131, 0,   2,   102, 49,  0,  0,   
0,   1,   0,   6,
+        98,  105, 116, 109, 97,  112, 0,   0,   0,   227, 0,   0,   0,  74,  
0,   2,   102, 50,
+        0,   0,   0,   1,   0,   6,   98,  105, 116, 109, 97,  112, 0,  0,   
1,   45,  0,   0,
+        0,   76,  0,   0,   0,   0,   1,   0,   0,   0,   8,   0,   0,  0,   
5,   0,   0,   0,
+        0,   5,   65,  108, 105, 99,  101, 0,   0,   0,   0,   0,   0,  0,   
4,   76,  117, 99,
+        121, 255, 255, 255, 251, 0,   0,   0,   3,   66,  111, 98,  0,  0,   
0,   20,  0,   0,
+        0,   5,   69,  109, 105, 108, 121, 255, 255, 255, 253, 0,   0,  0,   
4,   84,  111, 110,
+        121, 0,   0,   0,   40,  58,  48,  0,   0,   1,   0,   0,   0,  0,   
0,   1,   0,   16,
+        0,   0,   0,   0,   0,   7,   0,   58,  48,  0,   0,   1,   0,  0,   
0,   0,   0,   1,
+        0,   16,  0,   0,   0,   1,   0,   5,   0,   58,  48,  0,   0,  1,   
0,   0,   0,   0,
+        0,   1,   0,   16,  0,   0,   0,   3,   0,   6,   0,   1,   0,  0,   
0,   8,   0,   0,
+        0,   2,   0,   0,   0,   0,   20,  0,   0,   0,   0,   0,   0,  0,   
10,  0,   0,   0,
+        22,  58,  48,  0,   0,   1,   0,   0,   0,   0,   0,   2,   0,  16,  
0,   0,   0,   4,
+        0,   6,   0,   7,   0,   58,  48,  0,   0,   1,   0,   0,   0,  0,   
0,   4,   0,   16,
+        0,   0,   0,   0,   0,   1,   0,   2,   0,   3,   0,   5,   0,  1,   
0,   0,   0,   8,
+        0,   0,   0,   2,   1,   255, 255, 255, 248, 0,   0,   0,   0,  0,   
0,   0,   0,   0,
+        0,   0,   1,   0,   0,   0,   22,  58,  48,  0,   0,   1,   0,  0,   
0,   0,   0,   2,
+        0,   16,  0,   0,   0,   2,   0,   3,   0,   6,   0,   58,  48, 0,   
0,   1,   0,   0,
+        0,   0,   0,   3,   0,   16,  0,   0,   0,   0,   0,   1,   0,  4,   
0,   5,   0};
+    auto input_stream = std::make_shared<ByteArrayInputStream>(
+        reinterpret_cast<char*>(index_file_bytes.data()), 
index_file_bytes.size());
+    ASSERT_OK_AND_ASSIGN(auto reader, 
FileIndexFormat::CreateReader(input_stream, pool_));
+    {
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex("f1", 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(1, index_file_readers.size());
+        auto* bitmap_reader = 
dynamic_cast<BitmapFileIndexReader*>(index_file_readers[0].get());
+        ASSERT_TRUE(bitmap_reader);
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, 
bitmap_reader->VisitEqual(Literal(10)));
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{0,1,2,3,5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, 
bitmap_reader->VisitNotEqual(Literal(10)));
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, 
bitmap_reader->VisitEqual(Literal(50)));
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{}", result->ToString());
+        }
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex("f2", 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(1, index_file_readers.size());
+        auto* bitmap_reader = 
dynamic_cast<BitmapFileIndexReader*>(index_file_readers[0].get());
+        ASSERT_TRUE(bitmap_reader);
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, 
bitmap_reader->VisitEqual(Literal(0)));
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, 
bitmap_reader->VisitEqual(Literal(1)));
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{0,1,4,5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNull());
+            ASSERT_TRUE(result);
+            ASSERT_EQ("{7}", result->ToString());
+        }
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex("non-exist", 
CreateArrowSchema(schema).get()));
+        ASSERT_TRUE(index_file_readers.empty());
+    }
+}
+
+// NOLINTNEXTLINE(google-readability-function-size)
+TEST_F(FileIndexFormatTest, TestBitmapIndexWithTimestamp) {
+    auto schema = arrow::schema({
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
"Asia/Tokyo")),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
"Asia/Tokyo")),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
"Asia/Tokyo")),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
"Asia/Tokyo")),
+    });
+
+    auto fs = std::make_shared<LocalFileSystem>();
+    std::string file_name = GetDataDir() +
+                            "orc/timestamp_index.db/timestamp_index/"
+                            
"bucket-0/data-18569866-0c37-45e9-9d88-2eaf6dd084b0-0.orc.index";
+    std::string index_file_bytes;
+    ASSERT_OK(fs->ReadFile(file_name, &index_file_bytes));
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_file_bytes.data(), 
index_file_bytes.size());
+    ASSERT_OK_AND_ASSIGN(auto reader, 
FileIndexFormat::CreateReader(input_stream, pool_));
+    auto check_second = [&](const std::string& field_name) {
+        // data: second
+        // 1745542802000lms, 0ns
+        // 1745542902000lms, 0ns
+        // 1745542602000lms, 0ns
+        // -1745000lms, 0ns
+        // -1765000lms, 0ns
+        // null
+        // 1745542802000lms, 0ns
+        // -1725000lms, 0ns
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex(field_name, 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(3, index_file_readers.size());
+        BitSliceIndexBitmapFileIndexReader* bsi_reader = nullptr;
+        BitmapFileIndexReader* bitmap_reader = nullptr;
+        BloomFilterFileIndexReader* bloom_filter_reader = nullptr;
+
+        for (const auto& reader : index_file_readers) {
+            if (auto* r = 
dynamic_cast<BitSliceIndexBitmapFileIndexReader*>(reader.get())) {
+                bsi_reader = r;
+            } else if (auto* r = 
dynamic_cast<BitmapFileIndexReader*>(reader.get())) {
+                bitmap_reader = r;
+            } else if (auto* r = 
dynamic_cast<BloomFilterFileIndexReader*>(reader.get())) {
+                bloom_filter_reader = r;
+            }
+        }
+        ASSERT_TRUE(bsi_reader);
+        ASSERT_TRUE(bitmap_reader);
+        ASSERT_TRUE(bloom_filter_reader);
+        // test bitmap
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542502000l, 0))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotEqual(Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitIn({Literal(Timestamp(1745542802000l, 0)),
+                                                         
Literal(Timestamp(-1745000l, 0)),
+                                                         
Literal(Timestamp(1745542602000l, 0))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotIn({Literal(Timestamp(1745542802000l, 0)),
+                                                        
Literal(Timestamp(-1745000l, 0)),
+                                                        
Literal(Timestamp(1745542602000l, 0))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+
+        // test bsi
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542502000l, 0))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitNotEqual(Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitIn({Literal(Timestamp(1745542802000l, 0)),
+                                                      
Literal(Timestamp(-1745000l, 0)),
+                                                      
Literal(Timestamp(1745542602000l, 0))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitNotIn({Literal(Timestamp(1745542802000l, 0)),
+                                                         
Literal(Timestamp(-1745000l, 0)),
+                                                         
Literal(Timestamp(1745542602000l, 0))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitGreaterThan(Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{1}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterOrEqual(
+                                                  
Literal(Timestamp(1745542802000l, 0))));
+            ASSERT_EQ("{0,1,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessThan(Literal(Timestamp(-1745000l, 0))));
+            ASSERT_EQ("{4}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessOrEqual(Literal(Timestamp(0l, 0))));
+            ASSERT_EQ("{3,4,7}", result->ToString());
+        }
+
+        // test bloom filter
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802000l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542902000l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542602000l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1745000l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1765000l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802000l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1725000l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+    };
+
+    auto check_milli = [&](const std::string& field_name) {
+        // data: milli second
+        // 1745542802001lms, 0ns
+        // 1745542902001lms, 0ns
+        // 1745542602001lms, 0ns
+        // -1745001lms, 0ns
+        // -1765001lms, 0ns
+        // null
+        // 1745542802001lms, 0ns
+        // -1725001lms, 0ns
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex(field_name, 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(3, index_file_readers.size());
+        BitSliceIndexBitmapFileIndexReader* bsi_reader = nullptr;
+        BitmapFileIndexReader* bitmap_reader = nullptr;
+        BloomFilterFileIndexReader* bloom_filter_reader = nullptr;
+
+        for (const auto& reader : index_file_readers) {
+            if (auto* r = 
dynamic_cast<BitSliceIndexBitmapFileIndexReader*>(reader.get())) {
+                bsi_reader = r;
+            } else if (auto* r = 
dynamic_cast<BitmapFileIndexReader*>(reader.get())) {
+                bitmap_reader = r;
+            } else if (auto* r = 
dynamic_cast<BloomFilterFileIndexReader*>(reader.get())) {
+                bloom_filter_reader = r;
+            }
+        }
+        ASSERT_TRUE(bsi_reader);
+        ASSERT_TRUE(bitmap_reader);
+        ASSERT_TRUE(bloom_filter_reader);
+        // test bitmap
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542502001l, 0))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotEqual(Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bitmap_reader->VisitIn({Literal(Timestamp(1745542802001l, 0)),
+                                                         
Literal(Timestamp(-1745001l, 0)),
+                                                         
Literal(Timestamp(1745542602001l, 0))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 0)),
+                                                        
Literal(Timestamp(-1745001l, 0)),
+                                                        
Literal(Timestamp(1745542602001l, 0))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+
+        // test bsi
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542502001l, 0))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitNotEqual(Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitIn({Literal(Timestamp(1745542802001l, 0)),
+                                                      
Literal(Timestamp(-1745001l, 0)),
+                                                      
Literal(Timestamp(1745542602001l, 0))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 0)),
+                                                         
Literal(Timestamp(-1745001l, 0)),
+                                                         
Literal(Timestamp(1745542602001l, 0))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitGreaterThan(Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{1}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterOrEqual(
+                                                  
Literal(Timestamp(1745542802001l, 0))));
+            ASSERT_EQ("{0,1,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessThan(Literal(Timestamp(-1745001l, 0))));
+            ASSERT_EQ("{4}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessOrEqual(Literal(Timestamp(0l, 0))));
+            ASSERT_EQ("{3,4,7}", result->ToString());
+        }
+
+        // test bloom filter
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542902001l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542602001l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1745001l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1765001l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1725001l, 0)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+    };
+    auto check_micro = [&](const std::string& field_name) {
+        // data: milli second
+        // 1745542802001lms, 1000ns
+        // 1745542902001lms, 1000ns
+        // 1745542602001lms, 1000ns
+        // -1745001lms, 1000ns
+        // -1765001lms, 1000ns
+        // null
+        // 1745542802001lms, 1000ns
+        // -1725001lms, 1000ns
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex(field_name, 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(3, index_file_readers.size());
+        BitSliceIndexBitmapFileIndexReader* bsi_reader = nullptr;
+        BitmapFileIndexReader* bitmap_reader = nullptr;
+        BloomFilterFileIndexReader* bloom_filter_reader = nullptr;
+
+        for (const auto& reader : index_file_readers) {
+            if (auto* r = 
dynamic_cast<BitSliceIndexBitmapFileIndexReader*>(reader.get())) {
+                bsi_reader = r;
+            } else if (auto* r = 
dynamic_cast<BitmapFileIndexReader*>(reader.get())) {
+                bitmap_reader = r;
+            } else if (auto* r = 
dynamic_cast<BloomFilterFileIndexReader*>(reader.get())) {
+                bloom_filter_reader = r;
+            }
+        }
+        ASSERT_TRUE(bsi_reader);
+        ASSERT_TRUE(bitmap_reader);
+        ASSERT_TRUE(bloom_filter_reader);
+        // test bitmap
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542502001l, 1000))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitNotEqual(
+                                                  
Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitIn({Literal(Timestamp(1745542802001l, 1000)),
+                                                     
Literal(Timestamp(-1745001l, 1000)),
+                                                     
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 1000)),
+                                                        
Literal(Timestamp(-1745001l, 1000)),
+                                                        
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+
+        // test bsi
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542502001l, 1000))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitNotEqual(Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitIn({Literal(Timestamp(1745542802001l, 1000)),
+                                                      
Literal(Timestamp(-1745001l, 1000)),
+                                                      
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 1000)),
+                                                     
Literal(Timestamp(-1745001l, 1000)),
+                                                     
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterThan(
+                                                  
Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{1}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterOrEqual(
+                                                  
Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{0,1,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessThan(Literal(Timestamp(-1745001l, 1000))));
+            ASSERT_EQ("{4}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessOrEqual(Literal(Timestamp(0l, 1000))));
+            ASSERT_EQ("{3,4,7}", result->ToString());
+        }
+
+        // test bloom filter
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542902001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542602001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1745001l, 1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1765001l, 1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1725001l, 1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+    };
+
+    auto check_nano = [&](const std::string& field_name) {
+        // data: milli second
+        // 1745542802001lms, 1001ns
+        // 1745542902001lms, 1001ns
+        // 1745542602001lms, 1001ns
+        // -1745001lms, 1001ns
+        // -1765001lms, 1001ns
+        // null
+        // 1745542802001lms, 1001ns
+        // -1725001lms, 1001ns
+
+        // as timestamp is normalized by micro seconds, there is a loss of 
precision in the
+        // nanosecond part
+        ASSERT_OK_AND_ASSIGN(auto index_file_readers,
+                             reader->ReadColumnIndex(field_name, 
CreateArrowSchema(schema).get()));
+        ASSERT_EQ(3, index_file_readers.size());
+        BitSliceIndexBitmapFileIndexReader* bsi_reader = nullptr;
+        BitmapFileIndexReader* bitmap_reader = nullptr;
+        BloomFilterFileIndexReader* bloom_filter_reader = nullptr;
+
+        for (const auto& reader : index_file_readers) {
+            if (auto* r = 
dynamic_cast<BitSliceIndexBitmapFileIndexReader*>(reader.get())) {
+                bsi_reader = r;
+            } else if (auto* r = 
dynamic_cast<BitmapFileIndexReader*>(reader.get())) {
+                bitmap_reader = r;
+            } else if (auto* r = 
dynamic_cast<BloomFilterFileIndexReader*>(reader.get())) {
+                bloom_filter_reader = r;
+            }
+        }
+        ASSERT_TRUE(bsi_reader);
+        ASSERT_TRUE(bitmap_reader);
+        ASSERT_TRUE(bloom_filter_reader);
+        // test bitmap
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542502001l, 1000))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitEqual(Literal(Timestamp(1745542802001l, 1001))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bitmap_reader->VisitNotEqual(
+                                                  
Literal(Timestamp(1745542802001l, 1001))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitIn({Literal(Timestamp(1745542802001l, 1001)),
+                                                     
Literal(Timestamp(-1745001l, 1000)),
+                                                     
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bitmap_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 1001)),
+                                                        
Literal(Timestamp(-1745001l, 1000)),
+                                                        
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+
+        // test bsi
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNull());
+            ASSERT_EQ("{5}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitIsNotNull());
+            ASSERT_EQ("{0,1,2,3,4,6,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542502001l, 1000))));
+            ASSERT_EQ("{}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitEqual(Literal(Timestamp(1745542802001l, 1001))));
+            ASSERT_EQ("{0,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitNotEqual(Literal(Timestamp(1745542802001l, 1001))));
+            ASSERT_EQ("{1,2,3,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitIn({Literal(Timestamp(1745542802001l, 1001)),
+                                                      
Literal(Timestamp(-1745001l, 1000)),
+                                                      
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{0,2,3,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(
+                auto result, 
bsi_reader->VisitNotIn({Literal(Timestamp(1745542802001l, 1001)),
+                                                     
Literal(Timestamp(-1745001l, 1000)),
+                                                     
Literal(Timestamp(1745542602001l, 1000))}));
+            ASSERT_EQ("{1,4,7}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterThan(
+                                                  
Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{1}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result, bsi_reader->VisitGreaterOrEqual(
+                                                  
Literal(Timestamp(1745542802001l, 1000))));
+            ASSERT_EQ("{0,1,6}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessThan(Literal(Timestamp(-1745001l, 1000))));
+            ASSERT_EQ("{4}", result->ToString());
+        }
+        {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
bsi_reader->VisitLessOrEqual(Literal(Timestamp(0l, 1000))));
+            ASSERT_EQ("{3,4,7}", result->ToString());
+        }
+
+        // test bloom filter
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
1001)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542902001l, 
1001)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542602001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1745001l, 1001)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1765001l, 1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(1745542802001l, 
1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+        
ASSERT_TRUE(bloom_filter_reader->VisitEqual(Literal(Timestamp(-1725001l, 1000)))
+                        .value()
+                        ->IsRemain()
+                        .value());
+    };
+    check_second("ts_sec");
+    check_second("ts_tz_sec");
+
+    check_milli("ts_milli");
+    check_milli("ts_tz_milli");
+
+    check_micro("ts_micro");
+    check_micro("ts_tz_micro");
+
+    check_nano("ts_nano");
+    check_nano("ts_tz_nano");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/file_index_reader.cpp 
b/src/paimon/common/file_index/file_index_reader.cpp
new file mode 100644
index 0000000..822e457
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_reader.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_index_reader.h"
+
+#include <utility>
+
+#include "paimon/predicate/literal.h"
+
+namespace paimon {
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitIsNotNull() {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitIsNull() {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
FileIndexReader::VisitStartsWith(const Literal& prefix) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitEndsWith(const 
Literal& suffix) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitContains(const 
Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitLike(const 
Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitLessThan(const 
Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitGreaterOrEqual(
+    const Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitNotEqual(const 
Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
FileIndexReader::VisitLessOrEqual(const Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitEqual(const 
Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
FileIndexReader::VisitGreaterThan(const Literal& literal) {
+    return FileIndexResult::Remain();
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitIn(
+    const std::vector<Literal>& literals) {
+    std::shared_ptr<FileIndexResult> file_index_result;
+    for (const Literal& key : literals) {
+        if (!file_index_result) {
+            PAIMON_ASSIGN_OR_RAISE(file_index_result, VisitEqual(key));
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> 
inner_result, VisitEqual(key));
+            PAIMON_ASSIGN_OR_RAISE(file_index_result, 
file_index_result->Or(inner_result));
+        }
+    }
+    return file_index_result;
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexReader::VisitNotIn(
+    const std::vector<Literal>& literals) {
+    std::shared_ptr<FileIndexResult> file_index_result;
+    for (const Literal& key : literals) {
+        if (!file_index_result) {
+            PAIMON_ASSIGN_OR_RAISE(file_index_result, VisitNotEqual(key));
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> 
inner_result,
+                                   VisitNotEqual(key));
+            PAIMON_ASSIGN_OR_RAISE(file_index_result, 
file_index_result->And(inner_result));
+        }
+    }
+    return file_index_result;
+}
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/file_index_reader_test.cpp 
b/src/paimon/common/file_index/file_index_reader_test.cpp
new file mode 100644
index 0000000..b0db5bb
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_reader_test.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_index_reader.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/predicate/literal.h"
+
+namespace paimon::test {
+
+class MockFileIndexReader : public FileIndexReader {
+    // simulate a file with all 1
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override {
+        return literal.GetValue<int32_t>() == 1 ? FileIndexResult::Remain()
+                                                : FileIndexResult::Skip();
+    }
+    Result<std::shared_ptr<FileIndexResult>> VisitNotEqual(const Literal& 
literal) override {
+        return literal.GetValue<int32_t>() == 1 ? FileIndexResult::Skip()
+                                                : FileIndexResult::Remain();
+    }
+};
+
+TEST(FileIndexReaderTest, TestMockIndexReader) {
+    MockFileIndexReader reader;
+    Literal lit0(static_cast<int32_t>(0));
+    Literal lit1(static_cast<int32_t>(1));
+
+    ASSERT_FALSE(reader.VisitEqual(lit0).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitEqual(lit1).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotEqual(lit0).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitNotEqual(lit1).value()->IsRemain().value());
+
+    ASSERT_TRUE(reader.VisitIn({lit0, lit1}).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitNotIn({lit0, lit1}).value()->IsRemain().value());
+    ASSERT_FALSE(reader.VisitIn({lit0, lit0}).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotIn({lit0, lit0}).value()->IsRemain().value());
+}
+
+TEST(FileIndexReaderTest, TestDefaultIndexReader) {
+    FileIndexReader reader;
+    Literal lit0(static_cast<int32_t>(0));
+    Literal lit1(static_cast<int32_t>(1));
+
+    ASSERT_TRUE(reader.VisitEqual(lit0).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotEqual(lit0).value()->IsRemain().value());
+
+    ASSERT_TRUE(reader.VisitIn({lit0, lit1}).value()->IsRemain().value());
+    ASSERT_TRUE(reader.VisitNotIn({lit0, lit1}).value()->IsRemain().value());
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/file_index_result.cpp 
b/src/paimon/common/file_index/file_index_result.cpp
new file mode 100644
index 0000000..bb07c63
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_result.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_index_result.h"
+
+#include <cassert>
+#include <utility>
+
+namespace paimon {
+std::shared_ptr<FileIndexResult> FileIndexResult::Remain() {
+    static std::shared_ptr<FileIndexResult> remain = std::make_shared<class 
Remain>();
+    return remain;
+}
+
+std::shared_ptr<FileIndexResult> FileIndexResult::Skip() {
+    static std::shared_ptr<FileIndexResult> skip = std::make_shared<class 
Skip>();
+    return skip;
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexResult::And(
+    const std::shared_ptr<FileIndexResult>& other) {
+    assert(other);
+    PAIMON_ASSIGN_OR_RAISE(bool other_remain, other->IsRemain());
+    if (other_remain) {
+        return shared_from_this();
+    } else {
+        return FileIndexResult::Skip();
+    }
+}
+
+Result<std::shared_ptr<FileIndexResult>> FileIndexResult::Or(
+    const std::shared_ptr<FileIndexResult>& other) {
+    assert(other);
+    PAIMON_ASSIGN_OR_RAISE(bool other_remain, other->IsRemain());
+    if (other_remain) {
+        return FileIndexResult::Remain();
+    } else {
+        return shared_from_this();
+    }
+}
+
+Result<bool> Remain::IsRemain() const {
+    return true;
+}
+
+Result<std::shared_ptr<FileIndexResult>> Remain::And(
+    const std::shared_ptr<FileIndexResult>& other) {
+    return other;
+}
+
+Result<std::shared_ptr<FileIndexResult>> Remain::Or(const 
std::shared_ptr<FileIndexResult>& other) {
+    return shared_from_this();
+}
+
+std::string Remain::ToString() const {
+    return "REMAIN";
+}
+
+Result<bool> Skip::IsRemain() const {
+    return false;
+}
+
+Result<std::shared_ptr<FileIndexResult>> Skip::And(const 
std::shared_ptr<FileIndexResult>& other) {
+    return shared_from_this();
+}
+
+Result<std::shared_ptr<FileIndexResult>> Skip::Or(const 
std::shared_ptr<FileIndexResult>& other) {
+    return other;
+}
+
+std::string Skip::ToString() const {
+    return "SKIP";
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/file_index_result_test.cpp 
b/src/paimon/common/file_index/file_index_result_test.cpp
new file mode 100644
index 0000000..58772b6
--- /dev/null
+++ b/src/paimon/common/file_index/file_index_result_test.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_index_result.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+TEST(FileIndexResultTest, TestSimple) {
+    auto remain = FileIndexResult::Remain();
+    auto skip = FileIndexResult::Skip();
+
+    ASSERT_TRUE(remain->And(remain).value()->IsRemain().value());
+    ASSERT_FALSE(remain->And(skip).value()->IsRemain().value());
+    ASSERT_FALSE(skip->And(remain).value()->IsRemain().value());
+    ASSERT_FALSE(skip->And(skip).value()->IsRemain().value());
+
+    ASSERT_TRUE(remain->Or(remain).value()->IsRemain().value());
+    ASSERT_TRUE(remain->Or(skip).value()->IsRemain().value());
+    ASSERT_TRUE(skip->Or(remain).value()->IsRemain().value());
+    ASSERT_FALSE(skip->Or(skip).value()->IsRemain().value());
+
+    ASSERT_EQ(remain->ToString(), "REMAIN");
+    ASSERT_EQ(skip->ToString(), "SKIP");
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/file_indexer_factory.cpp 
b/src/paimon/common/file_index/file_indexer_factory.cpp
new file mode 100644
index 0000000..632a6df
--- /dev/null
+++ b/src/paimon/common/file_index/file_indexer_factory.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_indexer_factory.h"
+
+#include <cassert>
+#include <utility>
+
+#include "paimon/factories/factory_creator.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/status.h"
+
+namespace paimon {
+FileIndexerFactory::~FileIndexerFactory() = default;
+
+Result<std::unique_ptr<FileIndexer>> FileIndexerFactory::Get(
+    const std::string& identifier, const std::map<std::string, std::string>& 
options) {
+    auto factory_creator = FactoryCreator::GetInstance();
+    if (factory_creator == nullptr) {
+        assert(false);
+        return Status::Invalid("factory creator is null pointer");
+    }
+    auto file_indexer_factory =
+        dynamic_cast<FileIndexerFactory*>(factory_creator->Create(identifier));
+    if (file_indexer_factory == nullptr) {
+        // if an index type is not found, return nullptr to skip this index 
instead of return error
+        return std::unique_ptr<FileIndexer>();
+    }
+    return file_indexer_factory->Create(options);
+}
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/file_indexer_factory_test.cpp 
b/src/paimon/common/file_index/file_indexer_factory_test.cpp
new file mode 100644
index 0000000..610dd17
--- /dev/null
+++ b/src/paimon/common/file_index/file_indexer_factory_test.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/file_indexer_factory.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
+#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(FileIndexerFactoryTest, TestRegister) {
+    ASSERT_OK_AND_ASSIGN(auto file_indexer1, FileIndexerFactory::Get("bitmap", 
{}));
+    ASSERT_TRUE(file_indexer1);
+    auto* bitmap_indexer = dynamic_cast<BitmapFileIndex*>(file_indexer1.get());
+    ASSERT_TRUE(bitmap_indexer);
+
+    ASSERT_OK_AND_ASSIGN(auto file_indexer2, 
FileIndexerFactory::Get("bloom-filter", {}));
+    ASSERT_TRUE(file_indexer2);
+    auto* bloomfilter_indexer = 
dynamic_cast<BloomFilterFileIndex*>(file_indexer2.get());
+    ASSERT_TRUE(bloomfilter_indexer);
+
+    ASSERT_OK_AND_ASSIGN(auto file_indexer3, FileIndexerFactory::Get("bsi", 
{}));
+    ASSERT_TRUE(file_indexer3);
+    auto* bsi_indexer = 
dynamic_cast<BitSliceIndexBitmapFileIndex*>(file_indexer3.get());
+    ASSERT_TRUE(bsi_indexer);
+
+    ASSERT_OK_AND_ASSIGN(auto non_exist_file_indexer, 
FileIndexerFactory::Get("non-exist", {}));
+    ASSERT_FALSE(non_exist_file_indexer);
+}
+}  // namespace paimon::test


Reply via email to