This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 35b121d  feat: Migrate format interfaces, column_stats, table 
special_fields, and lookup store (#43)
35b121d is described below

commit 35b121ddc1bfa1d5694cea8aa4b61ff9d5097713
Author: lxy <[email protected]>
AuthorDate: Wed Jun 3 14:24:54 2026 +0800

    feat: Migrate format interfaces, column_stats, table special_fields, and 
lookup store (#43)
---
 include/paimon/format/column_stats.h               | 199 ++++++++
 include/paimon/format/file_format.h                |  53 ++
 include/paimon/format/file_format_factory.h        |  49 ++
 include/paimon/format/format_stats_extractor.h     |  60 +++
 include/paimon/format/format_writer.h              |  73 +++
 include/paimon/format/reader_builder.h             |  46 ++
 include/paimon/format/writer_builder.h             |  55 ++
 src/paimon/common/format/column_stats.cpp          | 556 +++++++++++++++++++++
 src/paimon/common/format/column_stats_test.cpp     | 311 ++++++++++++
 src/paimon/common/format/file_format_factory.cpp   |  45 ++
 src/paimon/common/lookup/lookup_store_factory.cpp  |  45 ++
 src/paimon/common/lookup/lookup_store_factory.h    |  72 +++
 .../lookup/sort/sort_lookup_store_factory.cpp      |  65 +++
 .../common/lookup/sort/sort_lookup_store_factory.h |  95 ++++
 .../lookup/sort/sort_lookup_store_footer.cpp       |  69 +++
 .../common/lookup/sort/sort_lookup_store_footer.h  |  62 +++
 .../common/lookup/sort/sort_lookup_store_test.cpp  | 135 +++++
 src/paimon/common/table/special_fields.h           |  87 ++++
 src/paimon/common/table/special_fields_test.cpp    |  70 +++
 19 files changed, 2147 insertions(+)

diff --git a/include/paimon/format/column_stats.h 
b/include/paimon/format/column_stats.h
new file mode 100644
index 0000000..f16cb25
--- /dev/null
+++ b/include/paimon/format/column_stats.h
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// ColumnStats is an abstract base class that represents statistical 
information for data columns
+/// in Paimon tables. It provides min/max values and null count statistics
+///
+/// Only primitive data types support min/max statistics. Nested types 
(arrays, maps, structs) only
+/// track null counts through `NestedColumnStats`.
+///
+/// @note This is an abstract base class. Use the static factory methods 
`CreateXXXColumnStats()` to
+/// create concrete instances for specific data types.
+class PAIMON_EXPORT ColumnStats {
+ public:
+    virtual ~ColumnStats() = default;
+
+    /// Gets the number of null values in this column.
+    virtual std::optional<int64_t> NullCount() const = 0;
+    virtual std::string ToString() const = 0;
+
+    /// Gets the field type that this column statistics instance represents.
+    virtual FieldType GetFieldType() const = 0;
+
+    /// @name CreateXXXColumnStats()
+    /// %Factory methods `CreateXXXColumnStats()` to create column statistics.
+    /// - min/max/null_count for primitive data types
+    /// - null_count for nested data types (arrays, maps, structs)
+    ///
+    /// @{
+    static std::unique_ptr<ColumnStats> 
CreateBooleanColumnStats(std::optional<bool> min,
+                                                                 
std::optional<bool> max,
+                                                                 
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> 
CreateTinyIntColumnStats(std::optional<int8_t> min,
+                                                                 
std::optional<int8_t> max,
+                                                                 
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> CreateSmallIntColumnStats(
+        std::optional<int16_t> min, std::optional<int16_t> max, 
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> 
CreateIntColumnStats(std::optional<int32_t> min,
+                                                             
std::optional<int32_t> max,
+                                                             
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> 
CreateBigIntColumnStats(std::optional<int64_t> min,
+                                                                
std::optional<int64_t> max,
+                                                                
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> 
CreateFloatColumnStats(std::optional<float> min,
+                                                               
std::optional<float> max,
+                                                               
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> 
CreateDoubleColumnStats(std::optional<double> min,
+                                                                
std::optional<double> max,
+                                                                
std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> CreateStringColumnStats(
+        const std::optional<std::string>& min, const 
std::optional<std::string>& max,
+        std::optional<int64_t> null_count);
+    static std::unique_ptr<ColumnStats> CreateTimestampColumnStats(
+        const std::optional<Timestamp>& min, const std::optional<Timestamp>& 
max,
+        std::optional<int64_t> null_count, int32_t precision);
+    static std::unique_ptr<ColumnStats> CreateDecimalColumnStats(const 
std::optional<Decimal>& min,
+                                                                 const 
std::optional<Decimal>& max,
+                                                                 
std::optional<int64_t> null_count,
+                                                                 int32_t 
precision, int32_t scale);
+    static std::unique_ptr<ColumnStats> 
CreateDateColumnStats(std::optional<int32_t> min,
+                                                              
std::optional<int32_t> max,
+                                                              
std::optional<int64_t> null_count);
+    /// Creates column statistics for nested data types (arrays, maps, 
structs), which only track
+    /// null counts.
+    static std::unique_ptr<ColumnStats> CreateNestedColumnStats(const 
FieldType& nested_type,
+                                                                
std::optional<int64_t> null_count);
+    /// @}
+};
+
+class PAIMON_EXPORT BooleanColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<bool> Min() const = 0;
+    virtual std::optional<bool> Max() const = 0;
+    virtual void Collect(std::optional<bool> value) = 0;
+};
+
+class PAIMON_EXPORT TinyIntColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<int8_t> Min() const = 0;
+    virtual std::optional<int8_t> Max() const = 0;
+    virtual void Collect(std::optional<int8_t> value) = 0;
+};
+
+class PAIMON_EXPORT SmallIntColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<int16_t> Min() const = 0;
+    virtual std::optional<int16_t> Max() const = 0;
+    virtual void Collect(std::optional<int16_t> value) = 0;
+};
+
+class PAIMON_EXPORT IntColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<int32_t> Min() const = 0;
+    virtual std::optional<int32_t> Max() const = 0;
+    virtual void Collect(std::optional<int32_t> value) = 0;
+};
+
+class PAIMON_EXPORT BigIntColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<int64_t> Min() const = 0;
+    virtual std::optional<int64_t> Max() const = 0;
+    virtual void Collect(std::optional<int64_t> value) = 0;
+};
+
+class PAIMON_EXPORT FloatColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<float> Min() const = 0;
+    virtual std::optional<float> Max() const = 0;
+    virtual void Collect(std::optional<float> value) = 0;
+};
+
+class PAIMON_EXPORT DoubleColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<double> Min() const = 0;
+    virtual std::optional<double> Max() const = 0;
+    virtual void Collect(std::optional<double> value) = 0;
+};
+
+class PAIMON_EXPORT StringColumnStats : public ColumnStats {
+ public:
+    virtual const std::optional<std::string>& Min() const = 0;
+    virtual const std::optional<std::string>& Max() const = 0;
+    virtual void Collect(const std::optional<std::string>& value) = 0;
+};
+
+class PAIMON_EXPORT TimestampColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<Timestamp> Min() const = 0;
+    virtual std::optional<Timestamp> Max() const = 0;
+    virtual void Collect(const std::optional<Timestamp>& value) = 0;
+    virtual int32_t GetPrecision() const = 0;
+};
+
+class PAIMON_EXPORT DecimalColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<Decimal> Min() const = 0;
+    virtual std::optional<Decimal> Max() const = 0;
+    virtual void Collect(const std::optional<Decimal>& value) = 0;
+    virtual int32_t GetPrecision() const = 0;
+    virtual int32_t GetScale() const = 0;
+};
+
+class PAIMON_EXPORT DateColumnStats : public ColumnStats {
+ public:
+    virtual std::optional<int32_t> Min() const = 0;
+    virtual std::optional<int32_t> Max() const = 0;
+    virtual void Collect(std::optional<int32_t> value) = 0;
+};
+
+class PAIMON_EXPORT NestedColumnStats : public ColumnStats {
+ public:
+    NestedColumnStats(const FieldType& nested_type, std::optional<int64_t> 
null_count)
+        : nested_type_(nested_type), null_count_(null_count) {
+        assert(nested_type == FieldType::ARRAY || nested_type == 
FieldType::MAP ||
+               nested_type == FieldType::STRUCT);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return null_count_;
+    }
+
+    std::string ToString() const override;
+
+    FieldType GetFieldType() const override;
+
+ private:
+    FieldType nested_type_;
+    std::optional<int64_t> null_count_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/file_format.h 
b/include/paimon/format/file_format.h
new file mode 100644
index 0000000..67c135f
--- /dev/null
+++ b/include/paimon/format/file_format.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/format/format_stats_extractor.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/format/writer_builder.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/predicate.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// `FileFormat` is used to create `ReaderBuilder` and `WriterBuilder`.
+class PAIMON_EXPORT FileFormat {
+ public:
+    virtual ~FileFormat() = default;
+    /// @return The corresponding identifier of file format, e.g., orc.
+    virtual const std::string& Identifier() const = 0;
+    /// @return A reader builder which will create reader with specific batch 
size.
+    virtual Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(
+        int32_t batch_size) const = 0;
+
+    /// @return A `WriterBuilder` of the corresponding schema, or error status 
when schema is
+    /// invalid.
+    virtual Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(
+        ::ArrowSchema* schema, int32_t batch_size) const = 0;
+    /// @return A `FormatStatsExtractor` of current file format.
+    virtual Result<std::unique_ptr<FormatStatsExtractor>> CreateStatsExtractor(
+        ::ArrowSchema* schema) const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/file_format_factory.h 
b/include/paimon/format/file_format_factory.h
new file mode 100644
index 0000000..b472a8b
--- /dev/null
+++ b/include/paimon/format/file_format_factory.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/factory.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class FileFormat;
+
+/// A factory for creating `FileFormat` instances.
+class PAIMON_EXPORT FileFormatFactory : public Factory {
+ public:
+    ~FileFormatFactory() override;
+
+    /// Get `FileFormat` corresponding to identifier.
+    ///
+    /// @pre Factory is already registered.
+    static Result<std::unique_ptr<FileFormat>> Get(
+        const std::string& identifier, const std::map<std::string, 
std::string>& options);
+
+    /// Create a `FileFormat` with the corresponding options.
+    virtual Result<std::unique_ptr<FileFormat>> Create(
+        const std::map<std::string, std::string>& options) const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/format_stats_extractor.h 
b/include/paimon/format/format_stats_extractor.h
new file mode 100644
index 0000000..d049aaf
--- /dev/null
+++ b/include/paimon/format/format_stats_extractor.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Extracts statistics directly from file.
+class PAIMON_EXPORT FormatStatsExtractor {
+ public:
+    virtual ~FormatStatsExtractor() = default;
+    /// File info fetched from physical file, currently only include row count.
+    class PAIMON_EXPORT FileInfo {
+     public:
+        explicit FileInfo(int64_t row_count) : row_count_(row_count) {}
+
+        int64_t GetRowCount() const {
+            return row_count_;
+        }
+
+     private:
+        int64_t row_count_;
+    };
+
+    /// Extracts statistics for each column of a data file based on the file 
path and file system.
+    virtual Result<ColumnStatsVector> Extract(const 
std::shared_ptr<FileSystem>& file_system,
+                                              const std::string& path,
+                                              const 
std::shared_ptr<MemoryPool>& pool) = 0;
+
+    /// Extracts statistics for each column and `FileInfo` of a data file 
based on the file path and
+    /// file system.
+    virtual Result<std::pair<ColumnStatsVector, FileInfo>> ExtractWithFileInfo(
+        const std::shared_ptr<FileSystem>& file_system, const std::string& 
path,
+        const std::shared_ptr<MemoryPool>& pool) = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/format_writer.h 
b/include/paimon/format/format_writer.h
new file mode 100644
index 0000000..cf570b3
--- /dev/null
+++ b/include/paimon/format/format_writer.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/type_fwd.h"
+
+struct ArrowArray;
+
+namespace paimon {
+/// File format writer, each writer corresponds to a data file.
+class PAIMON_EXPORT FormatWriter {
+ public:
+    virtual ~FormatWriter() = default;
+
+    /// Add a batch of records to the format writer.
+    /// @param batch Pointer to an ArrowArray containing the batch data to 
write.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    /// @note The batch must conform to the schema expected by the writer.
+    /// @note This method can be called multiple times to write data 
incrementally.
+    /// @note After calling `Finish()`, this method should not be called again.
+    virtual Status AddBatch(::ArrowArray* batch) = 0;
+
+    /// Flushes all intermediate buffered data to the format writer.
+    ///
+    /// @return Error status returned if the encoder cannot be flushed, or if 
the output stream
+    /// return an error.
+    virtual Status Flush() = 0;
+
+    /// Finishes the writing. This must flush all internal buffer, finish 
encoding, and write
+    /// footers.
+    ///
+    /// @note The writer is not expected to handle any more records via 
`AddBatch()` after
+    /// this method is called.
+    ///
+    /// @warning This method **MUST NOT** close the stream that the writer 
writes to. Closing
+    /// the stream is expected to happen through the invoker of this method 
afterwards.
+    ///
+    /// @return Error status returned if the finalization fails.
+    virtual Status Finish() = 0;
+
+    /// Check if the writer has reached the `target_size`.
+    ///
+    /// @param suggested_check Whether it needs to be checked, but subclasses 
can also decide
+    ///                        whether to check it themselves.
+    /// @param target_size The size of the target.
+    /// @return True if the target size was reached, otherwise false.
+    /// @return Error status returned if calculating the length fails.
+    virtual Result<bool> ReachTargetSize(bool suggested_check, int64_t 
target_size) const = 0;
+
+    /// Get metrics of the writer
+    /// @return The accumulated writer metrics to current state.
+    virtual std::shared_ptr<Metrics> GetWriterMetrics() const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/reader_builder.h 
b/include/paimon/format/reader_builder.h
new file mode 100644
index 0000000..91b604d
--- /dev/null
+++ b/include/paimon/format/reader_builder.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/memory/memory_pool.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Create a file batch reader based on the file path. Allows you to specify 
memory pool.
+class PAIMON_EXPORT ReaderBuilder {
+ public:
+    virtual ~ReaderBuilder() = default;
+
+    /// Set memory pool to use.
+    virtual ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& 
pool) = 0;
+
+    /// Build a file batch reader based on the created `InputStream`.
+    virtual Result<std::unique_ptr<FileBatchReader>> Build(
+        const std::shared_ptr<InputStream>& path) const = 0;
+
+    /// Build a file batch reader based on the file path.
+    virtual Result<std::unique_ptr<FileBatchReader>> Build(const std::string& 
path) const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/format/writer_builder.h 
b/include/paimon/format/writer_builder.h
new file mode 100644
index 0000000..33ef07e
--- /dev/null
+++ b/include/paimon/format/writer_builder.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+/// Create a file format writer based on the file output stream. Allows you to 
specify memory pool.
+class PAIMON_EXPORT WriterBuilder {
+ public:
+    virtual ~WriterBuilder() = default;
+
+    /// Set memory pool to use.
+    virtual WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& 
pool) = 0;
+
+    /// Build a file format writer based on the file output stream and file 
compression.
+    virtual Result<std::unique_ptr<FormatWriter>> Build(const 
std::shared_ptr<OutputStream>& out,
+                                                        const std::string& 
compression) = 0;
+};
+
+/// Create a file format writer based on the file path.
+class PAIMON_EXPORT DirectWriterBuilder : public WriterBuilder {
+ public:
+    ~DirectWriterBuilder() override = default;
+    virtual Result<std::unique_ptr<FormatWriter>> BuildFromPath(const 
std::string& path) = 0;
+};
+
+/// `SpecificFSWriterBuilder` allows you to specify a specific file system.
+class PAIMON_EXPORT SpecificFSWriterBuilder : public WriterBuilder {
+ public:
+    ~SpecificFSWriterBuilder() override = default;
+    // Set file system to use.
+    virtual SpecificFSWriterBuilder* WithFileSystem(const 
std::shared_ptr<FileSystem>& fs) = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/format/column_stats.cpp 
b/src/paimon/common/format/column_stats.cpp
new file mode 100644
index 0000000..d813427
--- /dev/null
+++ b/src/paimon/common/format/column_stats.cpp
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/column_stats.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "fmt/format.h"
+
+namespace paimon {
+
+template <typename T>
+std::string FormatStatsToString(const T& stats) {
+    auto to_str = [](auto opt) -> std::string { return opt ? fmt::format("{}", 
*opt) : "null"; };
+    return fmt::format("min {}, max {}, null count {}", to_str(stats.Min()), 
to_str(stats.Max()),
+                       to_str(stats.NullCount()));
+}
+
+/// A simple column statistics, supports the following stats.
+///
+/// <ul>
+/// <li>min: the minimum value of the column
+/// <li>max: the maximum value of the column
+/// <li>null_count: the number of nulls
+/// </ul>
+template <typename T>
+class InternalStatsImpl {
+ public:
+    InternalStatsImpl(const std::optional<T>& min, const std::optional<T>& max,
+                      std::optional<int64_t> null_count)
+        : min_(min), max_(max), null_count_(null_count) {}
+
+    const std::optional<T>& Min() const {
+        return min_;
+    }
+
+    const std::optional<T>& Max() const {
+        return max_;
+    }
+
+    void Collect(const std::optional<T>& value) {
+        if (value != std::nullopt) {
+            if (max_ != std::nullopt) {
+                if (max_.value() < value) {
+                    max_ = value;
+                }
+            } else {
+                max_ = value;
+            }
+            if (min_ != std::nullopt) {
+                if (value < min_.value()) {
+                    min_ = value;
+                }
+            } else {
+                min_ = value;
+            }
+            if (null_count_ == std::nullopt) {
+                null_count_ = 0;
+            }
+        } else {
+            if (null_count_ != std::nullopt) {
+                null_count_.value()++;
+            } else {
+                null_count_ = 1;
+            }
+        }
+    }
+
+    std::optional<int64_t> NullCount() const {
+        return null_count_;
+    }
+
+ private:
+    std::optional<T> min_;
+    std::optional<T> max_;
+    std::optional<int64_t> null_count_;
+};
+
+using InternalBooleanStats = InternalStatsImpl<bool>;
+using InternalTinyIntStats = InternalStatsImpl<int8_t>;
+using InternalSmallIntStats = InternalStatsImpl<int16_t>;
+using InternalIntStats = InternalStatsImpl<int32_t>;
+using InternalBigIntStats = InternalStatsImpl<int64_t>;
+using InternalFloatStats = InternalStatsImpl<float>;
+using InternalDoubleStats = InternalStatsImpl<double>;
+using InternalStringStats = InternalStatsImpl<std::string>;
+using InternalTimestampStats = InternalStatsImpl<Timestamp>;
+using InternalDecimalStats = InternalStatsImpl<Decimal>;
+
+class BooleanColumnStatsImpl : public BooleanColumnStats {
+ public:
+    explicit BooleanColumnStatsImpl(const InternalBooleanStats& stats) : 
stats_(stats) {}
+
+    std::optional<bool> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<bool> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<bool> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::BOOLEAN;
+    }
+
+ private:
+    InternalBooleanStats stats_;
+};
+
+class TinyIntColumnStatsImpl : public TinyIntColumnStats {
+ public:
+    explicit TinyIntColumnStatsImpl(const InternalTinyIntStats& stats) : 
stats_(stats) {}
+
+    std::optional<int8_t> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<int8_t> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<int8_t> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::TINYINT;
+    }
+
+ private:
+    InternalTinyIntStats stats_;
+};
+
+class SmallIntColumnStatsImpl : public SmallIntColumnStats {
+ public:
+    explicit SmallIntColumnStatsImpl(const InternalSmallIntStats& stats) : 
stats_(stats) {}
+
+    std::optional<int16_t> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<int16_t> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<int16_t> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::SMALLINT;
+    }
+
+ private:
+    InternalSmallIntStats stats_;
+};
+
+class IntColumnStatsImpl : public IntColumnStats {
+ public:
+    explicit IntColumnStatsImpl(const InternalIntStats& stats) : stats_(stats) 
{}
+
+    std::optional<int32_t> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<int32_t> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<int32_t> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::INT;
+    }
+
+ private:
+    InternalIntStats stats_;
+};
+
+class BigIntColumnStatsImpl : public BigIntColumnStats {
+ public:
+    explicit BigIntColumnStatsImpl(const InternalBigIntStats& stats) : 
stats_(stats) {}
+
+    std::optional<int64_t> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<int64_t> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<int64_t> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::BIGINT;
+    }
+
+ private:
+    InternalBigIntStats stats_;
+};
+
+class FloatColumnStatsImpl : public FloatColumnStats {
+ public:
+    explicit FloatColumnStatsImpl(const InternalFloatStats& stats) : 
stats_(stats) {}
+
+    std::optional<float> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<float> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<float> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::FLOAT;
+    }
+
+ private:
+    InternalFloatStats stats_;
+};
+
+class DoubleColumnStatsImpl : public DoubleColumnStats {
+ public:
+    explicit DoubleColumnStatsImpl(const InternalDoubleStats& stats) : 
stats_(stats) {}
+
+    std::optional<double> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<double> Max() const override {
+        return stats_.Max();
+    }
+    void Collect(std::optional<double> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::DOUBLE;
+    }
+
+ private:
+    InternalDoubleStats stats_;
+};
+
+class StringColumnStatsImpl : public StringColumnStats {
+ public:
+    explicit StringColumnStatsImpl(const InternalStringStats& stats) : 
stats_(stats) {}
+
+    const std::optional<std::string>& Min() const override {
+        return stats_.Min();
+    }
+
+    const std::optional<std::string>& Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(const std::optional<std::string>& value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::STRING;
+    }
+
+ private:
+    InternalStringStats stats_;
+};
+
+class TimestampColumnStatsImpl : public TimestampColumnStats {
+ public:
+    TimestampColumnStatsImpl(const InternalTimestampStats& stats, int32_t 
precision)
+        : stats_(stats), precision_(precision) {}
+
+    std::optional<Timestamp> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<Timestamp> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(const std::optional<Timestamp>& value) override {
+        return stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return fmt::format("min {}, max {}, null count {}",
+                           Min() ? Min().value().ToString() : "null",
+                           Max() ? Max().value().ToString() : "null",
+                           NullCount() ? std::to_string(NullCount().value()) : 
"null");
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::TIMESTAMP;
+    }
+
+    int32_t GetPrecision() const override {
+        return precision_;
+    }
+
+ private:
+    InternalTimestampStats stats_;
+    int32_t precision_;
+};
+
+class DecimalColumnStatsImpl : public DecimalColumnStats {
+ public:
+    DecimalColumnStatsImpl(const InternalDecimalStats& stats, int32_t 
precision, int32_t scale)
+        : stats_(stats), precision_(precision), scale_(scale) {}
+
+    std::optional<Decimal> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<Decimal> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(const std::optional<Decimal>& value) override {
+        return stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return fmt::format("min {}, max {}, null count {}",
+                           Min() ? Min().value().ToString() : "null",
+                           Max() ? Max().value().ToString() : "null",
+                           NullCount() ? std::to_string(NullCount().value()) : 
"null");
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::DECIMAL;
+    }
+
+    int32_t GetPrecision() const override {
+        return precision_;
+    }
+
+    int32_t GetScale() const override {
+        return scale_;
+    }
+
+ private:
+    InternalDecimalStats stats_;
+    int32_t precision_;
+    int32_t scale_;
+};
+
+class DateColumnStatsImpl : public DateColumnStats {
+ public:
+    explicit DateColumnStatsImpl(const InternalIntStats& stats) : 
stats_(stats) {}
+
+    std::optional<int32_t> Min() const override {
+        return stats_.Min();
+    }
+
+    std::optional<int32_t> Max() const override {
+        return stats_.Max();
+    }
+
+    void Collect(std::optional<int32_t> value) override {
+        stats_.Collect(value);
+    }
+
+    std::optional<int64_t> NullCount() const override {
+        return stats_.NullCount();
+    }
+
+    std::string ToString() const override {
+        return FormatStatsToString(*this);
+    }
+
+    FieldType GetFieldType() const override {
+        return FieldType::DATE;
+    }
+
+ private:
+    InternalIntStats stats_;
+};
+
+std::string NestedColumnStats::ToString() const {
+    return fmt::format("min null, max null, null count {}",
+                       NullCount() ? std::to_string(NullCount().value()) : 
"null");
+}
+
+FieldType NestedColumnStats::GetFieldType() const {
+    return nested_type_;
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateBooleanColumnStats(
+    std::optional<bool> min, std::optional<bool> max, std::optional<int64_t> 
null_count) {
+    return std::make_unique<BooleanColumnStatsImpl>(InternalBooleanStats(min, 
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateTinyIntColumnStats(
+    std::optional<int8_t> min, std::optional<int8_t> max, 
std::optional<int64_t> null_count) {
+    return std::make_unique<TinyIntColumnStatsImpl>(InternalTinyIntStats(min, 
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateSmallIntColumnStats(
+    std::optional<int16_t> min, std::optional<int16_t> max, 
std::optional<int64_t> null_count) {
+    return 
std::make_unique<SmallIntColumnStatsImpl>(InternalSmallIntStats(min, max, 
null_count));
+}
+
+std::unique_ptr<ColumnStats> 
ColumnStats::CreateIntColumnStats(std::optional<int32_t> min,
+                                                               
std::optional<int32_t> max,
+                                                               
std::optional<int64_t> null_count) {
+    return std::make_unique<IntColumnStatsImpl>(InternalIntStats(min, max, 
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateBigIntColumnStats(
+    std::optional<int64_t> min, std::optional<int64_t> max, 
std::optional<int64_t> null_count) {
+    return std::make_unique<BigIntColumnStatsImpl>(InternalBigIntStats(min, 
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateFloatColumnStats(
+    std::optional<float> min, std::optional<float> max, std::optional<int64_t> 
null_count) {
+    return std::make_unique<FloatColumnStatsImpl>(InternalFloatStats(min, max, 
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateDoubleColumnStats(
+    std::optional<double> min, std::optional<double> max, 
std::optional<int64_t> null_count) {
+    return std::make_unique<DoubleColumnStatsImpl>(InternalDoubleStats(min, 
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateStringColumnStats(
+    const std::optional<std::string>& min, const std::optional<std::string>& 
max,
+    std::optional<int64_t> null_count) {
+    return std::make_unique<StringColumnStatsImpl>(InternalStringStats(min, 
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateTimestampColumnStats(
+    const std::optional<Timestamp>& min, const std::optional<Timestamp>& max,
+    std::optional<int64_t> null_count, int32_t precision) {
+    return 
std::make_unique<TimestampColumnStatsImpl>(InternalTimestampStats(min, max, 
null_count),
+                                                      precision);
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateDecimalColumnStats(
+    const std::optional<Decimal>& min, const std::optional<Decimal>& max,
+    std::optional<int64_t> null_count, int32_t precision, int32_t scale) {
+    return std::make_unique<DecimalColumnStatsImpl>(InternalDecimalStats(min, 
max, null_count),
+                                                    precision, scale);
+}
+
+std::unique_ptr<ColumnStats> 
ColumnStats::CreateDateColumnStats(std::optional<int32_t> min,
+                                                                
std::optional<int32_t> max,
+                                                                
std::optional<int64_t> null_count) {
+    return std::make_unique<DateColumnStatsImpl>(InternalIntStats(min, max, 
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateNestedColumnStats(
+    const FieldType& nested_type, std::optional<int64_t> null_count) {
+    return std::make_unique<NestedColumnStats>(nested_type, null_count);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/format/column_stats_test.cpp 
b/src/paimon/common/format/column_stats_test.cpp
new file mode 100644
index 0000000..4937cfd
--- /dev/null
+++ b/src/paimon/common/format/column_stats_test.cpp
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/column_stats.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/result.h"
+
+namespace paimon::test {
+
+TEST(ColumnStatsTest, TestBooleanColumnStats) {
+    auto stats = ColumnStats::CreateBooleanColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<BooleanColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(true);
+    ASSERT_EQ(true, typed_stats->Min().value());
+    ASSERT_EQ(true, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(true, typed_stats->Min().value());
+    ASSERT_EQ(true, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min true, max true, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestTinyIntColumnStats) {
+    auto stats = ColumnStats::CreateTinyIntColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<TinyIntColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(5, typed_stats->Max().value());
+    typed_stats->Collect(10);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    typed_stats->Collect(1);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestSmallIntColumnStats) {
+    auto stats = ColumnStats::CreateSmallIntColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<SmallIntColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(5, typed_stats->Max().value());
+    typed_stats->Collect(10);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    typed_stats->Collect(1);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestIntColumnStats) {
+    auto stats = ColumnStats::CreateIntColumnStats(std::nullopt, std::nullopt, 
std::nullopt);
+    auto typed_stats = dynamic_cast<IntColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(5, typed_stats->Max().value());
+    typed_stats->Collect(10);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    typed_stats->Collect(1);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestDateColumnStats) {
+    auto stats = ColumnStats::CreateDateColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<DateColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(5, typed_stats->Max().value());
+    typed_stats->Collect(10);
+    ASSERT_EQ(5, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    typed_stats->Collect(1);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1, typed_stats->Min().value());
+    ASSERT_EQ(10, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestBigIntColumnStats) {
+    auto stats = ColumnStats::CreateBigIntColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<BigIntColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5l);
+    ASSERT_EQ(5l, typed_stats->Min().value());
+    ASSERT_EQ(5l, typed_stats->Max().value());
+    typed_stats->Collect(10l);
+    ASSERT_EQ(5l, typed_stats->Min().value());
+    ASSERT_EQ(10l, typed_stats->Max().value());
+    typed_stats->Collect(1l);
+    ASSERT_EQ(1l, typed_stats->Min().value());
+    ASSERT_EQ(10l, typed_stats->Max().value());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1l, typed_stats->Min().value());
+    ASSERT_EQ(10l, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestFloatColumnStats) {
+    auto stats = ColumnStats::CreateFloatColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<FloatColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5.1f);
+    ASSERT_EQ(5.1f, typed_stats->Min().value());
+    ASSERT_EQ(5.1f, typed_stats->Max().value());
+    typed_stats->Collect(10.1f);
+    ASSERT_EQ(5.1f, typed_stats->Min().value());
+    ASSERT_EQ(10.1f, typed_stats->Max().value());
+    typed_stats->Collect(1.1f);
+    ASSERT_EQ(1.1f, typed_stats->Min().value());
+    ASSERT_EQ(10.1f, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1.1f, typed_stats->Min().value());
+    ASSERT_EQ(10.1f, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1.1, max 10.1, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestDoubleColumnStats) {
+    auto stats = ColumnStats::CreateDoubleColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<DoubleColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(5.12);
+    ASSERT_EQ(5.12, typed_stats->Min().value());
+    ASSERT_EQ(5.12, typed_stats->Max().value());
+    typed_stats->Collect(10.12);
+    ASSERT_EQ(5.12, typed_stats->Min().value());
+    ASSERT_EQ(10.12, typed_stats->Max().value());
+    typed_stats->Collect(1.12);
+    ASSERT_EQ(1.12, typed_stats->Min().value());
+    ASSERT_EQ(10.12, typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(1.12, typed_stats->Min().value());
+    ASSERT_EQ(10.12, typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1.12, max 10.12, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestStringColumnStats) {
+    auto stats = ColumnStats::CreateStringColumnStats(std::nullopt, 
std::nullopt, std::nullopt);
+    auto typed_stats = dynamic_cast<StringColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect("abc");
+    ASSERT_EQ("abc", typed_stats->Min().value());
+    ASSERT_EQ("abc", typed_stats->Max().value());
+    typed_stats->Collect("cba");
+    ASSERT_EQ("abc", typed_stats->Min().value());
+    ASSERT_EQ("cba", typed_stats->Max().value());
+    typed_stats->Collect("你好");
+    ASSERT_EQ("abc", typed_stats->Min().value());
+    ASSERT_EQ("你好", typed_stats->Max().value());
+    ASSERT_EQ(1, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ("abc", typed_stats->Min().value());
+    ASSERT_EQ("你好", typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min abc, max 你好, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestTimestampColumnStats) {
+    auto stats = ColumnStats::CreateTimestampColumnStats(std::nullopt, 
std::nullopt, std::nullopt,
+                                                         /*precision=*/9);
+    auto typed_stats = dynamic_cast<TimestampColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(Timestamp(/*millisecond=*/10, 
/*nano_of_millisecond=*/1));
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1), 
typed_stats->Min().value());
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1), 
typed_stats->Max().value());
+    typed_stats->Collect(Timestamp(/*millisecond=*/10, 
/*nano_of_millisecond=*/2));
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1), 
typed_stats->Min().value());
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2), 
typed_stats->Max().value());
+    typed_stats->Collect(Timestamp(/*millisecond=*/9, 
/*nano_of_millisecond=*/1));
+    ASSERT_EQ(Timestamp(/*millisecond=*/9, /*nano_of_millisecond=*/1), 
typed_stats->Min().value());
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2), 
typed_stats->Max().value());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(Timestamp(/*millisecond=*/9, /*nano_of_millisecond=*/1), 
typed_stats->Min().value());
+    ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2), 
typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 1970-01-01 00:00:00.009000001, max 1970-01-01 
00:00:00.010000002, null count 2",
+              stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestDecimalColumnStats) {
+    int32_t precision = 21;
+    int32_t scale = 3;
+    auto stats = ColumnStats::CreateDecimalColumnStats(std::nullopt, 
std::nullopt, std::nullopt,
+                                                       precision, scale);
+    auto typed_stats = dynamic_cast<DecimalColumnStats*>(stats.get());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(std::nullopt, typed_stats->Min());
+    ASSERT_EQ(std::nullopt, typed_stats->Max());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+    typed_stats->Collect(
+        Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321234").value()));
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321234").value()),
+              typed_stats->Min().value());
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321234").value()),
+              typed_stats->Max().value());
+    typed_stats->Collect(
+        Decimal(precision, scale, 
DecimalUtils::StrToInt128("923456789987654321234").value()));
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321234").value()),
+              typed_stats->Min().value());
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("923456789987654321234").value()),
+              typed_stats->Max().value());
+    typed_stats->Collect(
+        Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321233").value()));
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321233").value()),
+              typed_stats->Min().value());
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("923456789987654321234").value()),
+              typed_stats->Max().value());
+    ASSERT_EQ(1l, typed_stats->NullCount().value());
+    typed_stats->Collect(std::nullopt);
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("123456789987654321233").value()),
+              typed_stats->Min().value());
+    ASSERT_EQ(Decimal(precision, scale, 
DecimalUtils::StrToInt128("923456789987654321234").value()),
+              typed_stats->Max().value());
+    ASSERT_EQ(2, typed_stats->NullCount().value());
+    ASSERT_EQ("min 123456789987654321.233, max 923456789987654321.234, null 
count 2",
+              stats->ToString());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/format/file_format_factory.cpp 
b/src/paimon/common/format/file_format_factory.cpp
new file mode 100644
index 0000000..27fbfce
--- /dev/null
+++ b/src/paimon/common/format/file_format_factory.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/file_format_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/format/file_format.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+FileFormatFactory::~FileFormatFactory() = default;
+
+Result<std::unique_ptr<FileFormat>> FileFormatFactory::Get(
+    const std::string& identifier, const std::map<std::string, std::string>& 
options) {
+    auto factory_creator = FactoryCreator::GetInstance();
+    if (factory_creator == nullptr) {
+        return Status::Invalid("factory creator is null pointer");
+    }
+    auto file_format_factory =
+        dynamic_cast<FileFormatFactory*>(factory_creator->Create(identifier));
+    if (file_format_factory == nullptr) {
+        return Status::Invalid(fmt::format(
+            "Could not find a FileFormatFactory implementation class for 
format '{}'", identifier));
+    }
+    return file_format_factory->Create(options);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/lookup_store_factory.cpp 
b/src/paimon/common/lookup/lookup_store_factory.cpp
new file mode 100644
index 0000000..c5742e1
--- /dev/null
+++ b/src/paimon/common/lookup/lookup_store_factory.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/lookup_store_factory.h"
+
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+namespace paimon {
+Result<std::shared_ptr<LookupStoreFactory>> LookupStoreFactory::Create(
+    MemorySlice::SliceComparator comparator, const 
std::shared_ptr<CacheManager>& cache_manager,
+    const CoreOptions& options) {
+    const auto& compress_options = options.GetLookupCompressOptions();
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockCompressionFactory> 
compression_factory,
+                           BlockCompressionFactory::Create(compress_options));
+    return std::make_shared<SortLookupStoreFactory>(
+        std::move(comparator), cache_manager, options.GetCachePageSize(), 
compression_factory);
+}
+
+Result<std::shared_ptr<BloomFilter>> LookupStoreFactory::BfGenerator(int64_t 
row_count,
+                                                                     const 
CoreOptions& options,
+                                                                     
MemoryPool* pool) {
+    if (row_count <= 0 || !options.LookupCacheBloomFilterEnabled()) {
+        return std::shared_ptr<BloomFilter>();
+    }
+    auto bloom_filter = BloomFilter::Create(row_count, 
options.GetLookupCacheBloomFilterFpp());
+    MemorySegment memory_segment =
+        MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
+    PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment));
+    return bloom_filter;
+}
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/lookup_store_factory.h 
b/src/paimon/common/lookup/lookup_store_factory.h
new file mode 100644
index 0000000..222dc7b
--- /dev/null
+++ b/src/paimon/common/lookup/lookup_store_factory.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/core/core_options.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+namespace paimon {
+/// Reader, lookup value by key bytes.
+class LookupStoreReader {
+ public:
+    virtual ~LookupStoreReader() = default;
+    /// Lookup value by key.
+    virtual Result<std::shared_ptr<Bytes>> Lookup(const 
std::shared_ptr<Bytes>& key) const = 0;
+    virtual Status Close() = 0;
+};
+
+/// Writer to prepare binary file.
+class LookupStoreWriter {
+ public:
+    virtual ~LookupStoreWriter() = default;
+    virtual Status Put(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&& 
value) = 0;
+    virtual Status Close() = 0;
+};
+
+/// A key-value store for lookup, key-value store should be single binary file 
written once and
+/// ready to be used. This factory provide two interfaces:
+///
+///  Writer: written once to prepare binary file.
+///  Reader: lookup value by key bytes.
+class LookupStoreFactory {
+ public:
+    virtual ~LookupStoreFactory() = default;
+
+    virtual Result<std::unique_ptr<LookupStoreWriter>> CreateWriter(
+        const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+        const std::shared_ptr<BloomFilter>& bloom_filter,
+        const std::shared_ptr<MemoryPool>& pool) const = 0;
+
+    virtual Result<std::unique_ptr<LookupStoreReader>> CreateReader(
+        const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+        const std::shared_ptr<MemoryPool>& pool) const = 0;
+
+    static Result<std::shared_ptr<LookupStoreFactory>> Create(
+        MemorySlice::SliceComparator comparator, const 
std::shared_ptr<CacheManager>& cache_manager,
+        const CoreOptions& options);
+
+    static Result<std::shared_ptr<BloomFilter>> BfGenerator(int64_t row_count,
+                                                            const CoreOptions& 
options,
+                                                            MemoryPool* pool);
+};
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp 
b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
new file mode 100644
index 0000000..e376dff
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/memory/bytes.h"
+namespace paimon {
+Result<std::unique_ptr<LookupStoreWriter>> 
SortLookupStoreFactory::CreateWriter(
+    const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+    const std::shared_ptr<BloomFilter>& bloom_filter,
+    const std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<OutputStream> out,
+                           fs->Create(file_path, /*overwrite=*/false));
+    return std::make_unique<SortLookupStoreWriter>(
+        out,
+        std::make_shared<SstFileWriter>(out, bloom_filter, block_size_, 
compression_factory_, pool),
+        pool);
+}
+
+Result<std::unique_ptr<LookupStoreReader>> 
SortLookupStoreFactory::CreateReader(
+    const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+    const std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in, 
fs->Open(file_path));
+    auto block_cache = std::make_shared<BlockCache>(file_path, in, 
cache_manager_, pool);
+    PAIMON_ASSIGN_OR_RAISE(
+        std::shared_ptr<SstFileReader> reader,
+        SstFileReader::CreateForSortLookupStore(in, comparator_, block_cache, 
pool));
+    return std::make_unique<SortLookupStoreReader>(in, reader);
+}
+
+Status SortLookupStoreReader::Close() {
+    PAIMON_RETURN_NOT_OK(reader_->Close());
+    return in_->Close();
+}
+
+Status SortLookupStoreWriter::Close() {
+    PAIMON_RETURN_NOT_OK(writer_->Flush());
+    PAIMON_ASSIGN_OR_RAISE(std::optional<BloomFilterHandle> 
bloom_filter_handle,
+                           writer_->WriteBloomFilter());
+    PAIMON_ASSIGN_OR_RAISE(BlockHandle index_block_handle, 
writer_->WriteIndexBlock());
+    SortLookupStoreFooter footer(index_block_handle, bloom_filter_handle);
+    auto slice = footer.WriteSortLookupStoreFooter(pool_.get());
+    PAIMON_RETURN_NOT_OK(writer_->WriteSlice(slice));
+    PAIMON_RETURN_NOT_OK(out_->Close());
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.h 
b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h
new file mode 100644
index 0000000..3aa193c
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/lookup/lookup_store_factory.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/sst/sst_file_reader.h"
+#include "paimon/common/sst/sst_file_writer.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+namespace paimon {
+/// Reader, lookup value by key bytes.
+class SortLookupStoreReader : public LookupStoreReader {
+ public:
+    SortLookupStoreReader(const std::shared_ptr<InputStream>& in,
+                          const std::shared_ptr<SstFileReader>& reader)
+        : in_(in), reader_(reader) {}
+
+    Result<std::shared_ptr<Bytes>> Lookup(const std::shared_ptr<Bytes>& key) 
const override {
+        return reader_->Lookup(key);
+    }
+
+    Status Close() override;
+
+ private:
+    std::shared_ptr<InputStream> in_;
+    std::shared_ptr<SstFileReader> reader_;
+};
+
+/// Writer to prepare binary file.
+class SortLookupStoreWriter : public LookupStoreWriter {
+ public:
+    SortLookupStoreWriter(const std::shared_ptr<OutputStream>& out,
+                          const std::shared_ptr<SstFileWriter>& writer,
+                          const std::shared_ptr<MemoryPool>& pool)
+        : out_(out), writer_(writer), pool_(pool) {}
+
+    Status Put(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&& value) 
override {
+        return writer_->Write(std::move(key), std::move(value));
+    }
+
+    Status Close() override;
+
+ private:
+    std::shared_ptr<OutputStream> out_;
+    std::shared_ptr<SstFileWriter> writer_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+/// A `LookupStoreFactory` which uses hash to lookup records on disk.
+class SortLookupStoreFactory : public LookupStoreFactory {
+ public:
+    SortLookupStoreFactory(MemorySlice::SliceComparator comparator,
+                           const std::shared_ptr<CacheManager>& cache_manager, 
int32_t block_size,
+                           const std::shared_ptr<BlockCompressionFactory>& 
compression_factory)
+        : block_size_(block_size),
+          comparator_(std::move(comparator)),
+          cache_manager_(cache_manager),
+          compression_factory_(compression_factory) {}
+
+    Result<std::unique_ptr<LookupStoreWriter>> CreateWriter(
+        const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+        const std::shared_ptr<BloomFilter>& bloom_filter,
+        const std::shared_ptr<MemoryPool>& pool) const override;
+
+    Result<std::unique_ptr<LookupStoreReader>> CreateReader(
+        const std::shared_ptr<paimon::FileSystem>& fs, const std::string& 
file_path,
+        const std::shared_ptr<MemoryPool>& pool) const override;
+
+ private:
+    int32_t block_size_;
+    MemorySlice::SliceComparator comparator_;
+    std::shared_ptr<CacheManager> cache_manager_;
+    std::shared_ptr<BlockCompressionFactory> compression_factory_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp 
b/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp
new file mode 100644
index 0000000..d15ece5
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<SortLookupStoreFooter>> 
SortLookupStoreFooter::ReadSortLookupStoreFooter(
+    MemorySliceInput* input) {
+    auto offset = input->ReadLong();
+    auto size = input->ReadInt();
+    auto expected_entries = input->ReadLong();
+    std::optional<BloomFilterHandle> bloom_filter_handle;
+    if (offset || size || expected_entries) {
+        bloom_filter_handle.emplace(offset, size, expected_entries);
+    }
+    auto index_offset = input->ReadLong();
+    auto index_size = input->ReadInt();
+    BlockHandle index_block_handle(index_offset, index_size);
+
+    // skip padding
+    PAIMON_RETURN_NOT_OK(input->SetPosition(ENCODED_LENGTH - 4));
+
+    auto magic = input->ReadInt();
+    if (magic != MAGIC_NUMBER) {
+        return Status::Invalid(
+            fmt::format("Expected magic number {}, but got {}", MAGIC_NUMBER, 
magic));
+    }
+    return std::make_unique<SortLookupStoreFooter>(index_block_handle, 
bloom_filter_handle);
+}
+
+MemorySlice SortLookupStoreFooter::WriteSortLookupStoreFooter(MemoryPool* 
pool) {
+    MemorySliceOutput output(ENCODED_LENGTH, pool);
+    // 20 bytes
+    if (!bloom_filter_handle_) {
+        output.WriteValue(static_cast<int64_t>(0));
+        output.WriteValue(static_cast<int32_t>(0));
+        output.WriteValue(static_cast<int64_t>(0));
+    } else {
+        output.WriteValue(bloom_filter_handle_->Offset());
+        output.WriteValue(bloom_filter_handle_->Size());
+        output.WriteValue(bloom_filter_handle_->ExpectedEntries());
+    }
+    // 12 bytes
+    output.WriteValue(index_block_handle_.Offset());
+    output.WriteValue(index_block_handle_.Size());
+    // 4 bytes
+    output.WriteValue(MAGIC_NUMBER);
+    return output.ToSlice();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_footer.h 
b/src/paimon/common/lookup/sort/sort_lookup_store_footer.h
new file mode 100644
index 0000000..f1462a8
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_footer.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Footer of a sort lookup store.
+class PAIMON_EXPORT SortLookupStoreFooter {
+ public:
+    static Result<std::unique_ptr<SortLookupStoreFooter>> 
ReadSortLookupStoreFooter(
+        MemorySliceInput* input);
+
+ public:
+    SortLookupStoreFooter(const BlockHandle& index_block_handle,
+                          const std::optional<BloomFilterHandle>& 
bloom_filter_handle)
+        : index_block_handle_(index_block_handle), 
bloom_filter_handle_(bloom_filter_handle) {}
+
+    ~SortLookupStoreFooter() = default;
+
+    const BlockHandle& GetIndexBlockHandle() const {
+        return index_block_handle_;
+    }
+    const std::optional<BloomFilterHandle>& GetBloomFilterHandle() const {
+        return bloom_filter_handle_;
+    }
+
+    MemorySlice WriteSortLookupStoreFooter(MemoryPool* pool);
+
+ public:
+    // 20 bytes for bloom filter handle, 12 bytes for index block handle, 4 
bytes for magic number
+    static constexpr int32_t ENCODED_LENGTH = 36;
+    static constexpr int32_t MAGIC_NUMBER = 1481571681;
+
+ private:
+    BlockHandle index_block_handle_;
+    std::optional<BloomFilterHandle> bloom_filter_handle_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp 
b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp
new file mode 100644
index 0000000..e5847e8
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "arrow/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+#include "paimon/common/lookup/lookup_store_factory.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(SortLookupStoreTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    auto check_result = [&](const std::map<std::string, std::string>& 
options_map) {
+        auto dir = UniqueTestDirectory::Create("local");
+        std::string file_path = dir->Str() + "/test.lookup";
+        auto fs = dir->GetFileSystem();
+
+        arrow::FieldVector fields = {
+            arrow::field("key", arrow::utf8()),
+            arrow::field("value", arrow::int32()),
+        };
+        auto key_type = arrow::struct_({fields[0]});
+        auto value_type = arrow::struct_({fields[1]});
+        ASSERT_OK_AND_ASSIGN(auto comparator, 
RowCompactedSerializer::CreateSliceComparator(
+                                                  
arrow::schema(key_type->fields()), pool));
+        ASSERT_OK_AND_ASSIGN(CoreOptions options, 
CoreOptions::FromMap(options_map));
+
+        ASSERT_OK_AND_ASSIGN(
+            auto factory,
+            LookupStoreFactory::Create(comparator, 
std::make_shared<CacheManager>(1024 * 1024, 0.0),
+                                       options));
+        int64_t row_count = 6;
+        ASSERT_OK_AND_ASSIGN(auto bloom_filter,
+                             LookupStoreFactory::BfGenerator(row_count, 
options, pool.get()));
+        ASSERT_OK_AND_ASSIGN(auto writer, factory->CreateWriter(fs, file_path, 
bloom_filter, pool));
+
+        std::shared_ptr<arrow::Array> key_array =
+            arrow::ipc::internal::json::ArrayFromJSON(key_type,
+                                                      R"([
+        ["Alex"],
+        ["Alice"],
+        ["Bob"],
+        ["David"],
+        ["Lily"],
+        ["Lucy"]
+    ])")
+                .ValueOrDie();
+        std::shared_ptr<arrow::Array> value_array =
+            arrow::ipc::internal::json::ArrayFromJSON(value_type,
+                                                      R"([
+        [0],
+        [10],
+        [20],
+        [30],
+        [40],
+        [100]
+    ])")
+                .ValueOrDie();
+
+        ASSERT_OK_AND_ASSIGN(auto key_serializer, 
RowCompactedSerializer::Create(
+                                                      
arrow::schema(key_type->fields()), pool));
+        ASSERT_OK_AND_ASSIGN(auto value_serializer, 
RowCompactedSerializer::Create(
+                                                        
arrow::schema(value_type->fields()), pool));
+        // write data
+        std::vector<std::pair<std::shared_ptr<Bytes>, std::shared_ptr<Bytes>>> 
key_value_bytes_vec;
+        for (int64_t i = 0; i < row_count; i++) {
+            auto key_struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(key_array);
+            auto value_struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(value_array);
+            ASSERT_TRUE(key_struct_array);
+            ASSERT_TRUE(value_struct_array);
+            auto key_row = std::make_shared<ColumnarRow>(
+                /*struct_array=*/nullptr, key_struct_array->fields(), pool, 
/*row_id=*/i);
+            auto value_row = std::make_shared<ColumnarRow>(
+                /*struct_array=*/nullptr, value_struct_array->fields(), pool, 
/*row_id=*/i);
+            ASSERT_OK_AND_ASSIGN(auto key_bytes, 
key_serializer->SerializeToBytes(*key_row));
+            ASSERT_OK_AND_ASSIGN(auto value_bytes, 
value_serializer->SerializeToBytes(*value_row));
+            key_value_bytes_vec.emplace_back(key_bytes, value_bytes);
+            ASSERT_OK(writer->Put(std::move(key_bytes), 
std::move(value_bytes)));
+        }
+        ASSERT_OK(writer->Close());
+
+        // read data
+        ASSERT_TRUE(fs->Exists(file_path).value());
+        ASSERT_OK_AND_ASSIGN(auto reader, factory->CreateReader(fs, file_path, 
pool));
+        for (int64_t i = 0; i < row_count; i++) {
+            const auto& [key, value] = key_value_bytes_vec[i];
+            ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(key));
+            ASSERT_TRUE(value_bytes);
+            // test value bytes
+            ASSERT_EQ(*value_bytes, *value);
+
+            // test deserialize data
+            ASSERT_OK_AND_ASSIGN(auto de_row, 
value_serializer->Deserialize(value_bytes));
+            auto value_struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(value_array);
+            ASSERT_TRUE(value_struct_array);
+            auto typed_value_array =
+                
std::dynamic_pointer_cast<arrow::Int32Array>(value_struct_array->field(0));
+            ASSERT_TRUE(typed_value_array);
+            ASSERT_EQ(de_row->GetInt(0), typed_value_array->Value(i));
+        }
+        // test non-exist key
+        auto non_exist_key = std::make_shared<Bytes>("non-exist", pool.get());
+        ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(non_exist_key));
+        ASSERT_FALSE(value_bytes);
+        ASSERT_OK(reader->Close());
+    };
+
+    check_result({});
+    check_result(std::map<std::string, std::string>(
+        {{Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, "false"}}));
+    check_result(
+        std::map<std::string, 
std::string>({{Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"}}));
+    check_result(
+        std::map<std::string, 
std::string>({{Options::SPILL_COMPRESSION_ZSTD_LEVEL, "3"}}));
+    check_result(std::map<std::string, 
std::string>({{Options::CACHE_PAGE_SIZE, "4"}}));
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/table/special_fields.h 
b/src/paimon/common/table/special_fields.h
new file mode 100644
index 0000000..384150e
--- /dev/null
+++ b/src/paimon/common/table/special_fields.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include "arrow/type_fwd.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/utils/special_field_ids.h"
+
+namespace paimon {
+
+struct SpecialFields {
+    SpecialFields() = delete;
+    ~SpecialFields() = delete;
+
+    static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
+    static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;
+
+    static const DataField& SequenceNumber() {
+        static const DataField data_field = DataField(
+            SpecialFieldIds::SEQUENCE_NUMBER, arrow::field("_SEQUENCE_NUMBER", 
arrow::int64()));
+        return data_field;
+    }
+
+    static const DataField& ValueKind() {
+        static const DataField data_field =
+            DataField(SpecialFieldIds::VALUE_KIND, arrow::field("_VALUE_KIND", 
arrow::int8()));
+        return data_field;
+    }
+
+    static const DataField& RowKind() {
+        static const DataField data_field =
+            DataField(SpecialFieldIds::ROW_KIND, arrow::field("rowkind", 
arrow::utf8()));
+        return data_field;
+    }
+
+    static const DataField& RowId() {
+        static const DataField data_field =
+            DataField(SpecialFieldIds::ROW_ID, arrow::field("_ROW_ID", 
arrow::int64()));
+        return data_field;
+    }
+
+    static const DataField& IndexScore() {
+        static const DataField data_field =
+            DataField(SpecialFieldIds::INDEX_SCORE, 
arrow::field("_INDEX_SCORE", arrow::float32()));
+        return data_field;
+    }
+
+    static bool IsSpecialFieldName(const std::string& field_name) {
+        if (field_name == SequenceNumber().Name() || field_name == 
ValueKind().Name() ||
+            field_name == RowId().Name() || field_name == IndexScore().Name()) 
{
+            return true;
+        }
+        return false;
+    }
+    // TODO(xinyu.lxy): add a func to complete row-tracking fields
+
+    static std::shared_ptr<arrow::Schema> CompleteSequenceAndValueKindField(
+        const std::shared_ptr<arrow::Schema>& schema) {
+        arrow::FieldVector target_fields;
+        
target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber()));
+        
target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind()));
+        target_fields.insert(target_fields.end(), schema->fields().begin(), 
schema->fields().end());
+        return arrow::schema(target_fields);
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/table/special_fields_test.cpp 
b/src/paimon/common/table/special_fields_test.cpp
new file mode 100644
index 0000000..1808fad
--- /dev/null
+++ b/src/paimon/common/table/special_fields_test.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/table/special_fields.h"
+
+#include <memory>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+TEST(SpecialFieldsTest, TestSequenceNumberField) {
+    ASSERT_EQ(SpecialFields::SequenceNumber().Id(), 
std::numeric_limits<int32_t>::max() - 1);
+    ASSERT_EQ(SpecialFields::SequenceNumber().Name(), "_SEQUENCE_NUMBER");
+    ASSERT_EQ(SpecialFields::SequenceNumber().Type()->id(), 
arrow::Type::INT64);
+}
+
+TEST(SpecialFieldsTest, TestValueKindField) {
+    ASSERT_EQ(SpecialFields::ValueKind().Id(), 
std::numeric_limits<int32_t>::max() - 2);
+    ASSERT_EQ(SpecialFields::ValueKind().Name(), "_VALUE_KIND");
+    ASSERT_EQ(SpecialFields::ValueKind().Type()->id(), arrow::Type::INT8);
+}
+
+TEST(SpecialFieldsTest, TestRowKindField) {
+    ASSERT_EQ(SpecialFields::RowKind().Id(), 
std::numeric_limits<int32_t>::max() - 3);
+    ASSERT_EQ(SpecialFields::RowKind().Name(), "rowkind");
+    ASSERT_EQ(SpecialFields::RowKind().Type()->id(), arrow::Type::STRING);
+}
+
+TEST(SpecialFieldsTest, TestRowIdField) {
+    ASSERT_EQ(SpecialFields::RowId().Id(), std::numeric_limits<int32_t>::max() 
- 5);
+    ASSERT_EQ(SpecialFields::RowId().Name(), "_ROW_ID");
+    ASSERT_EQ(SpecialFields::RowId().Type()->id(), arrow::Type::INT64);
+}
+
+TEST(SpecialFieldsTest, TestIndexScore) {
+    ASSERT_EQ(SpecialFields::IndexScore().Id(), 
std::numeric_limits<int32_t>::max() - 10000 - 1);
+    ASSERT_EQ(SpecialFields::IndexScore().Name(), "_INDEX_SCORE");
+    ASSERT_EQ(SpecialFields::IndexScore().Type()->id(), arrow::Type::FLOAT);
+}
+
+TEST(SpecialFieldsTest, TestKeyValueSpecialFieldCount) {
+    ASSERT_EQ(SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT, 2);
+}
+
+TEST(SpecialFieldsTest, TestIsSpecialFieldName) {
+    ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER"));
+    ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND"));
+    ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND"));
+    ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID"));
+    ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE"));
+}
+
+}  // namespace paimon::test


Reply via email to