This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 354bf18  feat: add predicate module with Literal, LeafPredicate, and 
CompoundPredicate (#27)
354bf18 is described below

commit 354bf189fa1d96f5221a9fddbc3da3fe013029d3
Author: lxy <[email protected]>
AuthorDate: Fri May 29 12:03:30 2026 +0800

    feat: add predicate module with Literal, LeafPredicate, and 
CompoundPredicate (#27)
    
    Squash merge PR #27.
---
 include/paimon/predicate/compound_predicate.h      |  53 +++
 include/paimon/predicate/function.h                |  52 +++
 include/paimon/predicate/function_visitor.h        |  80 ++++
 include/paimon/predicate/leaf_predicate.h          |  69 ++++
 include/paimon/predicate/literal.h                 | 124 ++++++
 include/paimon/predicate/predicate.h               |  44 ++
 src/paimon/common/data/decimal.cpp                 |   6 +-
 src/paimon/common/memory/memory_segment_utils.cpp  |  16 +-
 src/paimon/common/memory/memory_segment_utils.h    |  30 +-
 src/paimon/common/predicate/compound_function.h    |  49 +++
 src/paimon/common/predicate/compound_predicate.cpp |  90 +++++
 .../common/predicate/compound_predicate_impl.h     |  56 +++
 src/paimon/common/predicate/leaf_function.h        |  42 ++
 src/paimon/common/predicate/leaf_predicate.cpp     |  86 ++++
 src/paimon/common/predicate/leaf_predicate_impl.h  |  98 +++++
 src/paimon/common/predicate/leaf_unary_function.h  |  60 +++
 src/paimon/common/predicate/literal.cpp            | 425 ++++++++++++++++++++
 src/paimon/common/predicate/literal_converter.cpp  | 290 ++++++++++++++
 src/paimon/common/predicate/literal_converter.h    | 139 +++++++
 .../common/predicate/literal_converter_test.cpp    | 441 +++++++++++++++++++++
 src/paimon/common/predicate/literal_test.cpp       | 293 ++++++++++++++
 .../predicate/multi_literals_leaf_function.h       |  74 ++++
 .../predicate/null_false_leaf_binary_function.h    |  88 ++++
 src/paimon/common/predicate/predicate_filter.h     |  40 ++
 .../common/predicate/string_leaf_binary_function.h |  44 ++
 25 files changed, 2768 insertions(+), 21 deletions(-)

diff --git a/include/paimon/predicate/compound_predicate.h 
b/include/paimon/predicate/compound_predicate.h
new file mode 100644
index 0000000..f10727a
--- /dev/null
+++ b/include/paimon/predicate/compound_predicate.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/predicate/predicate.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class CompoundFunction;
+class Function;
+
+/// Non-leaf node in a `Predicate` tree. Its evaluation result depends on the 
results of its
+/// children.
+class PAIMON_EXPORT CompoundPredicate : virtual public Predicate {
+ public:
+    const Function& GetFunction() const override;
+
+    const std::vector<std::shared_ptr<Predicate>>& Children() const {
+        return children_;
+    }
+
+    std::shared_ptr<Predicate> Negate() const override;
+    std::string ToString() const override;
+
+    bool operator==(const Predicate& other) const override;
+
+ protected:
+    CompoundPredicate(const CompoundFunction& compound_function,
+                      const std::vector<std::shared_ptr<Predicate>>& children);
+
+    const CompoundFunction& compound_function_;
+    std::vector<std::shared_ptr<Predicate>> children_;
+};
+}  // namespace paimon
diff --git a/include/paimon/predicate/function.h 
b/include/paimon/predicate/function.h
new file mode 100644
index 0000000..f383f74
--- /dev/null
+++ b/include/paimon/predicate/function.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// `Function` represents a predicate function used in query expressions and 
filtering operations.
+/// It serves as the base class for all predicate functions in Paimon.
+class PAIMON_EXPORT Function {
+ public:
+    enum class PAIMON_EXPORT Type {
+        IS_NULL = 1,
+        IS_NOT_NULL = 2,
+        EQUAL = 3,
+        NOT_EQUAL = 4,
+        GREATER_THAN = 5,
+        GREATER_OR_EQUAL = 6,
+        LESS_THAN = 7,
+        LESS_OR_EQUAL = 8,
+        IN = 9,
+        NOT_IN = 10,
+        AND = 11,
+        OR = 12,
+        STARTS_WITH = 13,
+        ENDS_WITH = 14,
+        CONTAINS = 15,
+        LIKE = 16
+    };
+    virtual ~Function() = default;
+    virtual Type GetType() const = 0;
+    virtual std::string ToString() const = 0;
+};
+}  // namespace paimon
diff --git a/include/paimon/predicate/function_visitor.h 
b/include/paimon/predicate/function_visitor.h
new file mode 100644
index 0000000..8b759ff
--- /dev/null
+++ b/include/paimon/predicate/function_visitor.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// A visitor interface for evaluating filter predicates on indexed columns.
+/// @tparam T The result type produced by each visit method (e.g., a file 
index result or global
+/// index result).
+template <typename T>
+class PAIMON_EXPORT FunctionVisitor {
+ public:
+    virtual ~FunctionVisitor() = default;
+
+    /// Evaluates the IS NOT NULL predicate on the indexed column.
+    virtual Result<T> VisitIsNotNull() = 0;
+
+    /// Evaluates the IS NULL predicate on the indexed column.
+    virtual Result<T> VisitIsNull() = 0;
+
+    /// Evaluates the equality (==) predicate against the given literal.
+    virtual Result<T> VisitEqual(const Literal& literal) = 0;
+
+    /// Evaluates the inequality (!=) predicate against the given literal.
+    virtual Result<T> VisitNotEqual(const Literal& literal) = 0;
+
+    /// Evaluates the less-than (<) predicate against the given literal.
+    virtual Result<T> VisitLessThan(const Literal& literal) = 0;
+
+    /// Evaluates the less-than-or-equal (<=) predicate against the given 
literal.
+    virtual Result<T> VisitLessOrEqual(const Literal& literal) = 0;
+
+    /// Evaluates the greater-than (>) predicate against the given literal.
+    virtual Result<T> VisitGreaterThan(const Literal& literal) = 0;
+
+    /// Evaluates the greater-than-or-equal (>=) predicate against the given 
literal.
+    virtual Result<T> VisitGreaterOrEqual(const Literal& literal) = 0;
+
+    /// Evaluates the IN predicate against a list of literals.
+    virtual Result<T> VisitIn(const std::vector<Literal>& literals) = 0;
+
+    /// Evaluates the NOT IN predicate against a list of literals.
+    virtual Result<T> VisitNotIn(const std::vector<Literal>& literals) = 0;
+
+    /// Evaluates whether string values start with the given prefix.
+    virtual Result<T> VisitStartsWith(const Literal& prefix) = 0;
+
+    /// Evaluates whether string values end with the given prefix.
+    virtual Result<T> VisitEndsWith(const Literal& suffix) = 0;
+
+    /// Evaluates whether string values contain the given substring.
+    virtual Result<T> VisitContains(const Literal& literal) = 0;
+
+    /// Evaluates whether string values like the given string.
+    virtual Result<T> VisitLike(const Literal& literal) = 0;
+};
+}  // namespace paimon
diff --git a/include/paimon/predicate/leaf_predicate.h 
b/include/paimon/predicate/leaf_predicate.h
new file mode 100644
index 0000000..2969848
--- /dev/null
+++ b/include/paimon/predicate/leaf_predicate.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class LeafFunction;
+class Function;
+enum class FieldType;
+
+/// Leaf node of a `Predicate` tree. Compares a field with literals.
+class PAIMON_EXPORT LeafPredicate : virtual public Predicate {
+ public:
+    int32_t FieldIndex() const {
+        return field_index_;
+    }
+    const std::string& FieldName() const {
+        return field_name_;
+    }
+    FieldType GetFieldType() const {
+        return field_type_;
+    }
+    const std::vector<Literal>& Literals() const {
+        return literals_;
+    }
+    const Function& GetFunction() const override;
+
+    std::string ToString() const override;
+
+    std::shared_ptr<Predicate> Negate() const override;
+
+    bool operator==(const Predicate& other) const override;
+
+ protected:
+    LeafPredicate(const LeafFunction& leaf_function, int32_t field_index,
+                  const std::string& field_name, const FieldType& field_type,
+                  const std::vector<Literal>& literals);
+
+    const LeafFunction& leaf_function_;
+    int32_t field_index_;
+    std::string field_name_;
+    FieldType field_type_;
+    std::vector<Literal> literals_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/predicate/literal.h 
b/include/paimon/predicate/literal.h
new file mode 100644
index 0000000..ef58645
--- /dev/null
+++ b/include/paimon/predicate/literal.h
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/defs.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Literal represents a constant value used in predicate expressions.
+///
+/// Literal support BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, 
STRING, BINARY,
+/// TIMESTAMP, DECIMAL, DATE.
+class PAIMON_EXPORT Literal {
+ public:
+    /// Creates a null literal of the specified type.
+    /// @param type The field type for this null literal.
+    explicit Literal(FieldType type);
+
+    /// Creates a literal from a typed value.
+    /// The template parameter T must be compatible with one of the supported 
field types
+    /// (must match a supported FieldType).
+    /// T can be bool, int8_t, int16_t, int32_t, int64_t, float, double, 
Timestamp and Decimal.
+    /// @param val The value to store in the literal.
+    template <typename T>
+    explicit Literal(const T& val);
+
+    /// Creates a literal from binary data (string or binary type).
+    /// The data is copied into the literal's internal storage.
+    /// @param binary_type Must be either `STRING` or `BINARY` field type.
+    /// @param str Pointer to the binary data.
+    /// @param size Size of the binary data in bytes.
+    /// @note `BLOB` type is not supported by literal
+    Literal(FieldType binary_type, const char* str, size_t size);
+
+    /// Creates a literal from binary data with optional data ownership.
+    /// @param binary_type Must be either `STRING` or `BINARY` field type.
+    /// @param str Pointer to the binary data.
+    /// @param size Size of the binary data in bytes.
+    /// @param own_data If true, the literal takes ownership and will free the 
data;
+    ///                 if false, the caller must ensure the data remains 
valid.
+    Literal(FieldType binary_type, const char* str, size_t size, bool 
own_data);
+
+    /// Creates a date literal from an integer value.
+    /// @param date_type Must be `DATE` field type.
+    /// @param date_value Date value as days since epoch (1970-01-01).
+    Literal(FieldType date_type, int32_t date_value);
+
+    Literal(const Literal& other);
+    ~Literal();
+    Literal(Literal&& other);
+    Literal& operator=(Literal&& other);
+    Literal& operator=(const Literal& other);
+    bool operator==(const Literal& other) const;
+    bool operator!=(const Literal& other) const;
+
+    /// Checks if this literal represents a null value.
+    bool IsNull() const;
+
+    /// Gets the typed value stored in this literal.
+    /// @tparam T The expected C++ type of the value.
+    /// @return The value of type `T`.
+    /// @warning This method is unsafe - caller must verify the type and null 
status first.
+    template <typename T>
+    T GetValue() const;
+
+    /// Gets the field type of this literal.
+    FieldType GetType() const;
+
+    std::string ToString() const;
+
+    /// Gets the hash code for this literal.
+    /// @note HashCode() hashes the exact bit representation (including 
Decimal scale), while
+    /// operator== delegates to CompareTo() which uses numeric equality (e.g. 
decimals with
+    /// different scales can compare equal). This means the hash-equality 
contract (equal objects
+    /// must have equal hashes) may be violated for Decimal literals with 
different scales. In
+    /// practice this is safe because all current std::unordered_map<Literal, 
...> usages (bitmap
+    /// file index) only store values from the same column, which guarantees a 
fixed precision and
+    /// scale.
+    size_t HashCode() const;
+
+    /// Compares this literal with another literal. The comparison follows SQL 
semantics for the
+    /// respective data types.
+    /// @param other The literal to compare with.
+    /// @return Result containing -1 (this < other), 0 (this == other), or 1 
(this > other),
+    ///         or an error if the literals are not comparable.
+    Result<int32_t> CompareTo(const Literal& other) const;
+
+ private:
+    class Impl;
+
+    std::unique_ptr<Impl> impl_;
+};
+}  // namespace paimon
+
+namespace std {
+template <>
+struct hash<::paimon::Literal> {
+    size_t operator()(const ::paimon::Literal& literal) const {
+        return literal.HashCode();
+    }
+};
+}  // namespace std
diff --git a/include/paimon/predicate/predicate.h 
b/include/paimon/predicate/predicate.h
new file mode 100644
index 0000000..bd2dbb4
--- /dev/null
+++ b/include/paimon/predicate/predicate.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <memory>
+#include <string>
+
+#include "paimon/predicate/function.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray;
+struct ArrowSchema;
+
+namespace paimon {
+class Function;
+
+/// Predicate interface. To create a predicate, please use `PredicateBuilder`.
+/// @see PredicateBuilder
+class PAIMON_EXPORT Predicate {
+ public:
+    virtual ~Predicate() = default;
+    virtual bool operator==(const Predicate& other) const = 0;
+
+    virtual const Function& GetFunction() const = 0;
+    /// @return The negation predicate of this predicate if possible.
+    virtual std::shared_ptr<Predicate> Negate() const = 0;
+    virtual std::string ToString() const = 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/decimal.cpp 
b/src/paimon/common/data/decimal.cpp
index c05982c..0ce5b22 100644
--- a/src/paimon/common/data/decimal.cpp
+++ b/src/paimon/common/data/decimal.cpp
@@ -49,10 +49,10 @@ const int64_t Decimal::POWERS_OF_TEN[MAX_COMPACT_PRECISION 
+ 1] = {1,
                                                                    
10000000000000000l,
                                                                    
100000000000000000l,
                                                                    
1000000000000000000l};
-const Decimal::int128_t Decimal::INT128_MAXIMUM_VALUE =
-    static_cast<Decimal::int128_t>(0x7fffffffffffffff) << 64 | 
0xffffffffffffffff;
+const Decimal::int128_t Decimal::INT128_MAXIMUM_VALUE = 
static_cast<Decimal::int128_t>(
+    static_cast<Decimal::uint128_t>(0x7fffffffffffffffULL) << 64 | 
0xffffffffffffffff);
 const Decimal::int128_t Decimal::INT128_MINIMUM_VALUE =
-    static_cast<Decimal::int128_t>(0x8000000000000000) << 64;
+    
static_cast<Decimal::int128_t>(static_cast<Decimal::uint128_t>(0x8000000000000000ULL)
 << 64);
 
 std::string Decimal::ToString() const {
     auto type = arrow::decimal128(Precision(), Scale());
diff --git a/src/paimon/common/memory/memory_segment_utils.cpp 
b/src/paimon/common/memory/memory_segment_utils.cpp
index 2801854..15c0545 100644
--- a/src/paimon/common/memory/memory_segment_utils.cpp
+++ b/src/paimon/common/memory/memory_segment_utils.cpp
@@ -135,21 +135,21 @@ int32_t MemorySegmentUtils::ByteIndex(int32_t bit_index) {
 void MemorySegmentUtils::BitUnSet(MemorySegment* segment, int32_t base_offset, 
int32_t index) {
     int32_t offset = base_offset + ByteIndex(index);
     char current = segment->Get(offset);
-    current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+    current &= static_cast<char>(~(1u << (index & BIT_BYTE_INDEX_MASK)));
     segment->Put(offset, current);
 }
 
 void MemorySegmentUtils::BitSet(MemorySegment* segment, int32_t base_offset, 
int32_t index) {
     int32_t offset = base_offset + ByteIndex(index);
     char current = segment->Get(offset);
-    current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+    current |= static_cast<char>(1u << (index & BIT_BYTE_INDEX_MASK));
     segment->Put(offset, current);
 }
 
 bool MemorySegmentUtils::BitGet(const MemorySegment& segment, int32_t 
base_offset, int32_t index) {
     int32_t offset = base_offset + ByteIndex(index);
     char current = segment.Get(offset);
-    return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+    return (current & static_cast<char>(1u << (index & BIT_BYTE_INDEX_MASK))) 
!= 0;
 }
 
 void MemorySegmentUtils::BitSet(std::vector<MemorySegment>* segments, int32_t 
base_offset,
@@ -158,7 +158,7 @@ void MemorySegmentUtils::BitSet(std::vector<MemorySegment>* 
segments, int32_t ba
         int32_t offset = base_offset + ByteIndex(index);
         MemorySegment& segment = (*segments)[0];
         char current = segment.Get(offset);
-        current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+        current |= static_cast<char>(1u << (index & BIT_BYTE_INDEX_MASK));
         segment.Put(offset, current);
     } else {
         BitSetMultiSegments(segments, base_offset, index);
@@ -174,7 +174,7 @@ void 
MemorySegmentUtils::BitSetMultiSegments(std::vector<MemorySegment>* segment
     MemorySegment& segment = (*segments)[seg_index];
 
     char current = segment.Get(seg_offset);
-    current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+    current |= static_cast<char>(1u << (index & BIT_BYTE_INDEX_MASK));
     segment.Put(seg_offset, current);
 }
 
@@ -182,7 +182,7 @@ bool MemorySegmentUtils::BitGet(const 
std::vector<MemorySegment>& segments, int3
                                 int32_t index) {
     int32_t offset = base_offset + ByteIndex(index);
     char current = GetValue<char>(segments, offset);
-    return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+    return (current & static_cast<char>(1u << (index & BIT_BYTE_INDEX_MASK))) 
!= 0;
 }
 
 void MemorySegmentUtils::BitUnSet(std::vector<MemorySegment>* segments, 
int32_t base_offset,
@@ -191,7 +191,7 @@ void 
MemorySegmentUtils::BitUnSet(std::vector<MemorySegment>* segments, int32_t
         MemorySegment& segment = (*segments)[0];
         int32_t offset = base_offset + ByteIndex(index);
         char current = segment.Get(offset);
-        current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+        current &= static_cast<char>(~(1u << (index & BIT_BYTE_INDEX_MASK)));
         segment.Put(offset, current);
     } else {
         BitUnSetMultiSegments(segments, base_offset, index);
@@ -207,7 +207,7 @@ void 
MemorySegmentUtils::BitUnSetMultiSegments(std::vector<MemorySegment>* segme
     MemorySegment& segment = (*segments)[seg_index];
 
     char current = segment.Get(seg_offset);
-    current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+    current &= static_cast<char>(~(1u << (index & BIT_BYTE_INDEX_MASK)));
     segment.Put(seg_offset, current);
 }
 
diff --git a/src/paimon/common/memory/memory_segment_utils.h 
b/src/paimon/common/memory/memory_segment_utils.h
index 6208ff5..b904728 100644
--- a/src/paimon/common/memory/memory_segment_utils.h
+++ b/src/paimon/common/memory/memory_segment_utils.h
@@ -23,6 +23,7 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <type_traits>
 #include <vector>
 
 #include "fmt/format.h"
@@ -432,22 +433,31 @@ template <typename T>
 inline T MemorySegmentUtils::GetValueSlowly(const std::vector<MemorySegment>& 
segments,
                                             int32_t seg_size, int32_t seg_num, 
int32_t seg_offset) {
     MemorySegment segment = segments[seg_num];
-    T ret = 0;
-    for (size_t i = 0; i < sizeof(T); i++) {
+    if constexpr (std::is_same_v<T, bool>) {
         if (seg_offset == seg_size) {
             segment = segments[++seg_num];
             seg_offset = 0;
         }
-        T unsigned_byte = segment.Get(seg_offset) & 0xff;
-        if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
-            ret |= (unsigned_byte << (i * 8));
-        } else {
-            int32_t shift_count = sizeof(T) - 1;
-            ret |= (unsigned_byte << ((shift_count - i) * 8));
+        return 
static_cast<bool>(static_cast<uint8_t>(segment.Get(seg_offset)));
+    } else {
+        using UnsignedT = std::make_unsigned_t<T>;
+        UnsignedT ret = 0;
+        for (size_t i = 0; i < sizeof(T); i++) {
+            if (seg_offset == seg_size) {
+                segment = segments[++seg_num];
+                seg_offset = 0;
+            }
+            UnsignedT unsigned_byte = 
static_cast<uint8_t>(segment.Get(seg_offset));
+            if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+                ret |= (unsigned_byte << (i * 8));
+            } else {
+                int32_t shift_count = sizeof(T) - 1;
+                ret |= (unsigned_byte << ((shift_count - i) * 8));
+            }
+            seg_offset++;
         }
-        seg_offset++;
+        return static_cast<T>(ret);
     }
-    return ret;
 }
 
 inline Status MemorySegmentUtils::CopyToStream(const 
std::vector<MemorySegment>& segments,
diff --git a/src/paimon/common/predicate/compound_function.h 
b/src/paimon/common/predicate/compound_function.h
new file mode 100644
index 0000000..f093fc4
--- /dev/null
+++ b/src/paimon/common/predicate/compound_function.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class CompoundFunction : public Function {
+ public:
+    // input array is the struct array of all fields
+    virtual Result<std::vector<char>> Test(
+        const arrow::Array& array,
+        const std::vector<std::shared_ptr<Predicate>>& children) const = 0;
+
+    virtual Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema, 
const InternalRow& row,
+                              const std::vector<std::shared_ptr<Predicate>>& 
children) const = 0;
+
+    virtual Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema, 
int64_t row_count,
+                              const InternalRow& min_values, const 
InternalRow& max_values,
+                              const InternalArray& null_counts,
+                              const std::vector<std::shared_ptr<Predicate>>& 
children) const = 0;
+
+    virtual const CompoundFunction& Negate() const = 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/compound_predicate.cpp 
b/src/paimon/common/predicate/compound_predicate.cpp
new file mode 100644
index 0000000..d94abf2
--- /dev/null
+++ b/src/paimon/common/predicate/compound_predicate.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/predicate/compound_predicate.h"
+
+#include <cassert>
+#include <cstddef>
+#include <utility>
+
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/predicate/compound_function.h"
+#include "paimon/common/predicate/compound_predicate_impl.h"
+#include "paimon/predicate/function.h"
+
+namespace paimon {
+CompoundPredicate::CompoundPredicate(const CompoundFunction& compound_function,
+                                     const 
std::vector<std::shared_ptr<Predicate>>& children)
+    : compound_function_(compound_function), children_(children) {}
+
+const Function& CompoundPredicate::GetFunction() const {
+    return compound_function_;
+}
+
+std::shared_ptr<Predicate> CompoundPredicate::Negate() const {
+    const auto& negate_func = compound_function_.Negate();
+    std::vector<std::shared_ptr<Predicate>> negated_children;
+    negated_children.reserve(children_.size());
+    for (const auto& child : children_) {
+        auto negated_child = child->Negate();
+        if (!negated_child) {
+            return nullptr;
+        }
+        negated_children.push_back(std::move(negated_child));
+    }
+    return std::make_shared<CompoundPredicateImpl>(negate_func, 
negated_children);
+}
+
+bool CompoundPredicate::operator==(const Predicate& other) const {
+    if (this == &other) {
+        return true;
+    }
+    auto compound_predicate = 
dynamic_cast<CompoundPredicate*>(const_cast<Predicate*>(&other));
+    if (!compound_predicate) {
+        return false;
+    }
+    if (GetFunction().GetType() != 
compound_predicate->GetFunction().GetType()) {
+        return false;
+    }
+    const auto& children = Children();
+    const auto& other_children = compound_predicate->Children();
+    if (children.size() != other_children.size()) {
+        return false;
+    }
+    for (size_t i = 0; i < children.size(); ++i) {
+        if (*children[i] == *other_children[i]) {
+            continue;
+        } else {
+            return false;
+        }
+    }
+    return true;
+}
+
+std::string CompoundPredicate::ToString() const {
+    std::vector<std::string> children_str;
+    children_str.reserve(children_.size());
+    for (const auto& child : children_) {
+        assert(child);
+        children_str.emplace_back(child->ToString());
+    }
+    return fmt::format("{}([{}])", compound_function_.ToString(), 
fmt::join(children_str, ", "));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/compound_predicate_impl.h 
b/src/paimon/common/predicate/compound_predicate_impl.h
new file mode 100644
index 0000000..3cf6d9c
--- /dev/null
+++ b/src/paimon/common/predicate/compound_predicate_impl.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "paimon/common/predicate/compound_function.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/predicate/compound_predicate.h"
+
+namespace paimon {
+class CompoundPredicateImpl : public CompoundPredicate, public PredicateFilter 
{
+ public:
+    CompoundPredicateImpl(const CompoundFunction& compound_function,
+                          const std::vector<std::shared_ptr<Predicate>>& 
children)
+        : CompoundPredicate(compound_function, children) {}
+
+    Result<std::vector<char>> Test(const arrow::Array& array) const override {
+        return compound_function_.Test(array, children_);
+    }
+
+    Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema,
+                      const InternalRow& row) const override {
+        return compound_function_.Test(schema, row, children_);
+    }
+
+    Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema, int64_t 
row_count,
+                      const InternalRow& min_values, const InternalRow& 
max_values,
+                      const InternalArray& null_counts) const override {
+        return compound_function_.Test(schema, row_count, min_values, 
max_values, null_counts,
+                                       children_);
+    }
+
+    std::shared_ptr<CompoundPredicateImpl> NewCompoundPredicate(
+        const std::vector<std::shared_ptr<Predicate>>& new_children) const {
+        return std::make_shared<CompoundPredicateImpl>(compound_function_, 
new_children);
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/leaf_function.h 
b/src/paimon/common/predicate/leaf_function.h
new file mode 100644
index 0000000..9fdb4d8
--- /dev/null
+++ b/src/paimon/common/predicate/leaf_function.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "paimon/predicate/function.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/status.h"
+namespace paimon {
+class LeafFunction : public Function {
+ public:
+    // input array is the exact single field array
+    virtual Result<std::vector<char>> Test(const arrow::Array& array,
+                                           const std::vector<Literal>& 
literals) const = 0;
+
+    virtual Result<bool> Test(const Literal& value, const 
std::vector<Literal>& literals) const = 0;
+
+    virtual Result<bool> Test(int64_t row_count, const Literal& min_value, 
const Literal& max_value,
+                              const std::optional<int64_t>& null_count,
+                              const std::vector<Literal>& literals) const = 0;
+
+    virtual const LeafFunction* Negate() const = 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/leaf_predicate.cpp 
b/src/paimon/common/predicate/leaf_predicate.cpp
new file mode 100644
index 0000000..9ff45d4
--- /dev/null
+++ b/src/paimon/common/predicate/leaf_predicate.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/predicate/leaf_predicate.h"
+
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/predicate/leaf_function.h"
+#include "paimon/common/predicate/leaf_predicate_impl.h"
+#include "paimon/predicate/function.h"
+
+namespace paimon {
+enum class FieldType;
+
+LeafPredicate::LeafPredicate(const LeafFunction& leaf_function, int32_t 
field_index,
+                             const std::string& field_name, const FieldType& 
field_type,
+                             const std::vector<Literal>& literals)
+    : leaf_function_(leaf_function),
+      field_index_(field_index),
+      field_name_(field_name),
+      field_type_(field_type),
+      literals_(literals) {}
+
+const Function& LeafPredicate::GetFunction() const {
+    return leaf_function_;
+}
+
+std::shared_ptr<Predicate> LeafPredicate::Negate() const {
+    const auto* negate_func = leaf_function_.Negate();
+    if (!negate_func) {
+        return nullptr;
+    }
+    return std::make_shared<LeafPredicateImpl>(*negate_func, field_index_, 
field_name_, field_type_,
+                                               literals_);
+}
+
+bool LeafPredicate::operator==(const Predicate& other) const {
+    if (this == &other) {
+        return true;
+    }
+    auto leaf_predicate = 
dynamic_cast<LeafPredicate*>(const_cast<Predicate*>(&other));
+    if (!leaf_predicate) {
+        return false;
+    }
+    return GetFunction().GetType() == leaf_predicate->GetFunction().GetType() 
&&
+           FieldIndex() == leaf_predicate->FieldIndex() &&
+           FieldName() == leaf_predicate->FieldName() &&
+           GetFieldType() == leaf_predicate->GetFieldType() &&
+           Literals() == leaf_predicate->Literals();
+}
+
+std::string LeafPredicate::ToString() const {
+    std::string literals_str;
+    if (literals_.empty()) {
+        literals_str = "";
+    } else if (literals_.size() == 1) {
+        literals_str = literals_[0].ToString();
+    } else {
+        std::vector<std::string> tmp_literals_strs;
+        tmp_literals_strs.reserve(literals_.size());
+        for (const auto& literal : literals_) {
+            tmp_literals_strs.emplace_back(literal.ToString());
+        }
+        literals_str = fmt::format("[{}]", fmt::join(tmp_literals_strs, ", "));
+    }
+    return literals_str.empty()
+               ? fmt::format("{}({})", leaf_function_.ToString(), field_name_)
+               : fmt::format("{}({}, {})", leaf_function_.ToString(), 
field_name_, literals_str);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/leaf_predicate_impl.h 
b/src/paimon/common/predicate/leaf_predicate_impl.h
new file mode 100644
index 0000000..856fd46
--- /dev/null
+++ b/src/paimon/common/predicate/leaf_predicate_impl.h
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/predicate/compound_function.h"
+#include "paimon/common/predicate/leaf_function.h"
+#include "paimon/common/predicate/literal_converter.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/predicate/leaf_predicate.h"
+namespace paimon {
+class LeafPredicateImpl : public LeafPredicate, public PredicateFilter {
+ public:
+    LeafPredicateImpl(const LeafFunction& leaf_function, int32_t field_index,
+                      const std::string& field_name, const FieldType& 
field_type,
+                      const std::vector<Literal>& literals)
+        : LeafPredicate(leaf_function, field_index, field_name, field_type, 
literals) {}
+
+    const LeafFunction& GetLeafFunction() const {
+        return leaf_function_;
+    }
+
+    Result<std::vector<char>> Test(const arrow::Array& array) const override {
+        const auto& struct_array = arrow::internal::checked_cast<const 
arrow::StructArray&>(array);
+        if (field_index_ >= 
static_cast<int32_t>(struct_array.fields().size())) {
+            return Status::Invalid(
+                fmt::format("field index {} exceed field count {} in struct 
array", field_index_,
+                            struct_array.fields().size()));
+        }
+        const auto& field_array = struct_array.field(field_index_);
+        return leaf_function_.Test(*field_array, literals_);
+    }
+
+    Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema,
+                      const InternalRow& row) const override {
+        if (field_index_ >= row.GetFieldCount()) {
+            return Status::Invalid(fmt::format("field index {} exceed field 
count {} in row",
+                                               field_index_, 
row.GetFieldCount()));
+        }
+        PAIMON_ASSIGN_OR_RAISE(Literal value, 
LiteralConverter::ConvertLiteralsFromRow(
+                                                  schema, row, field_index_, 
field_type_));
+        return leaf_function_.Test(value, literals_);
+    }
+
+    Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema, int64_t 
row_count,
+                      const InternalRow& min_values, const InternalRow& 
max_values,
+                      const InternalArray& null_counts) const override {
+        PAIMON_ASSIGN_OR_RAISE(Literal min_value,
+                               
LiteralConverter::ConvertLiteralsFromRow(schema, min_values,
+                                                                        
field_index_, field_type_));
+        PAIMON_ASSIGN_OR_RAISE(Literal max_value,
+                               
LiteralConverter::ConvertLiteralsFromRow(schema, max_values,
+                                                                        
field_index_, field_type_));
+        std::optional<int64_t> null_count = null_counts.IsNullAt(field_index_)
+                                                ? std::optional<int64_t>()
+                                                : 
null_counts.GetLong(field_index_);
+        if (null_count == std::nullopt || row_count != null_count.value()) {
+            // not all null
+            // min or max is null
+            // unknown stats
+            if (min_value.IsNull() || max_value.IsNull()) {
+                return true;
+            }
+        }
+        return leaf_function_.Test(row_count, min_value, max_value, 
null_count, literals_);
+    }
+
+    std::shared_ptr<LeafPredicateImpl> NewLeafPredicate(int32_t 
new_field_index) const {
+        return std::make_shared<LeafPredicateImpl>(leaf_function_, 
new_field_index, field_name_,
+                                                   field_type_, literals_);
+    }
+
+    std::shared_ptr<LeafPredicateImpl> NewLeafPredicate(const std::string& 
new_field_name) const {
+        return std::make_shared<LeafPredicateImpl>(leaf_function_, 
field_index_, new_field_name,
+                                                   field_type_, literals_);
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/leaf_unary_function.h 
b/src/paimon/common/predicate/leaf_unary_function.h
new file mode 100644
index 0000000..b260bd3
--- /dev/null
+++ b/src/paimon/common/predicate/leaf_unary_function.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/predicate/leaf_function.h"
+#include "paimon/common/predicate/literal_converter.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class LeafUnaryFunction : public LeafFunction {
+ public:
+    Result<std::vector<char>> Test(const arrow::Array& array,
+                                   const std::vector<Literal>& literals) const 
override {
+        std::vector<char> is_valid(array.length(), false);
+        PAIMON_ASSIGN_OR_RAISE(
+            std::vector<Literal> array_values,
+            LiteralConverter::ConvertLiteralsFromArray(array, 
/*own_data=*/false));
+        for (int64_t i = 0; i < array.length(); i++) {
+            PAIMON_ASSIGN_OR_RAISE(is_valid[i], Test(array_values[i]));
+        }
+        return is_valid;
+    }
+
+    Result<bool> Test(const Literal& value, const std::vector<Literal>& 
literals) const override {
+        return Test(value);
+    }
+
+    Result<bool> Test(int64_t row_count, const Literal& min_value, const 
Literal& max_value,
+                      const std::optional<int64_t>& null_count,
+                      const std::vector<Literal>& literals) const override {
+        return Test(row_count, min_value, max_value, null_count);
+    }
+
+    virtual Result<bool> Test(const Literal& field) const = 0;
+    virtual Result<bool> Test(int64_t row_count, const Literal& min_value, 
const Literal& max_value,
+                              const std::optional<int64_t>& null_count) const 
= 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/literal.cpp 
b/src/paimon/common/predicate/literal.cpp
new file mode 100644
index 0000000..0157a38
--- /dev/null
+++ b/src/paimon/common/predicate/literal.cpp
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/predicate/literal.h"
+
+#include <cmath>
+#include <cstring>
+#include <functional>
+#include <sstream>
+#include <string_view>
+#include <type_traits>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/common/utils/fields_comparator.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Literal::Impl {
+ public:
+    void Release() {
+        if ((type_ == FieldType::STRING || type_ == FieldType::BINARY ||
+             type_ == FieldType::BLOB) &&
+            value_.Buffer) {
+            if (own_data_) {
+                delete[] value_.Buffer;
+            }
+            value_.Buffer = nullptr;
+        }
+    }
+
+    size_t CalculateHashCode() const {
+        if (is_null_) {
+            return 0;
+        }
+        switch (type_) {
+            case FieldType::BOOLEAN:
+                return std::hash<bool>{}(value_.BooleanVal);
+            case FieldType::TINYINT:
+                return std::hash<int8_t>{}(value_.TinyIntVal);
+            case FieldType::SMALLINT:
+                return std::hash<int16_t>{}(value_.SmallIntVal);
+            case FieldType::INT:
+                return std::hash<int32_t>{}(value_.IntVal);
+            case FieldType::BIGINT:
+                return std::hash<int64_t>{}(value_.BigIntVal);
+            case FieldType::FLOAT:
+                return std::hash<float>{}(value_.FloatVal);
+            case FieldType::DOUBLE:
+                return std::hash<double>{}(value_.DoubleVal);
+            case FieldType::STRING:
+            case FieldType::BINARY:
+                return 
std::hash<std::string_view>{}(std::string_view(value_.Buffer, size_));
+            case FieldType::TIMESTAMP:
+                return 
std::hash<int64_t>{}(value_.TimestampVal.GetMillisecond()) * 17 +
+                       
std::hash<int64_t>{}(value_.TimestampVal.GetNanoOfMillisecond());
+            case FieldType::DECIMAL:
+                return std::hash<int64_t>{}(value_.DecimalVal.HighBits()) * 31 
+
+                       std::hash<int64_t>{}(value_.DecimalVal.LowBits()) * 17 +
+                       std::hash<int32_t>{}(value_.DecimalVal.Scale());
+            case FieldType::DATE:
+                return std::hash<int32_t>{}(value_.IntVal);
+            default:
+                return 0;
+        }
+    }
+
+    union LiteralVal {
+        int64_t BigIntVal;
+        bool BooleanVal;
+        int8_t TinyIntVal;
+        int16_t SmallIntVal;
+        int32_t IntVal;
+        float FloatVal;
+        double DoubleVal;
+        char* Buffer;
+        Decimal DecimalVal;
+        Timestamp TimestampVal;
+    } value_ = {};
+
+    FieldType type_ = FieldType::UNKNOWN;
+    // size of value_ if it is Buffer
+    size_t size_ = 0;
+    // indicate if this literal is null
+    bool is_null_ = false;
+    size_t hash_code_ = 0;
+    bool own_data_ = true;
+};
+
+Literal::Literal(FieldType type) : impl_(std::make_unique<Impl>()) {
+    impl_->type_ = type;
+    impl_->size_ = 0;
+    impl_->is_null_ = true;
+}
+
+template <typename T>
+Literal::Literal(const T& val) : impl_(std::make_unique<Impl>()) {
+    if constexpr (std::is_same_v<T, bool>) {
+        impl_->type_ = FieldType::BOOLEAN;
+        impl_->value_.BooleanVal = val;
+    } else if constexpr (std::is_same_v<T, int8_t>) {
+        impl_->type_ = FieldType::TINYINT;
+        impl_->value_.TinyIntVal = val;
+    } else if constexpr (std::is_same_v<T, int16_t>) {
+        impl_->type_ = FieldType::SMALLINT;
+        impl_->value_.SmallIntVal = val;
+    } else if constexpr (std::is_same_v<T, int32_t>) {
+        impl_->type_ = FieldType::INT;
+        impl_->value_.IntVal = val;
+    } else if constexpr (std::is_same_v<T, int64_t>) {
+        impl_->type_ = FieldType::BIGINT;
+        impl_->value_.BigIntVal = val;
+    } else if constexpr (std::is_same_v<T, float>) {
+        impl_->type_ = FieldType::FLOAT;
+        impl_->value_.FloatVal = val;
+    } else if constexpr (std::is_same_v<T, double>) {
+        impl_->type_ = FieldType::DOUBLE;
+        impl_->value_.DoubleVal = val;
+    } else if constexpr (std::is_same_v<T, Timestamp>) {
+        impl_->type_ = FieldType::TIMESTAMP;
+        impl_->value_.TimestampVal = val;
+    } else if constexpr (std::is_same_v<T, Decimal>) {
+        impl_->type_ = FieldType::DECIMAL;
+        impl_->value_.DecimalVal = val;
+    } else {
+        impl_->type_ = FieldType::UNKNOWN;
+    }
+    impl_->is_null_ = false;
+    impl_->hash_code_ = impl_->CalculateHashCode();
+}
+
+Literal::Literal(FieldType binary_type, const char* str, size_t size)
+    : Literal(binary_type, str, size, /*own_data=*/true) {}
+
+Literal::Literal(FieldType binary_type, const char* str, size_t size, bool 
own_data)
+    : impl_(std::make_unique<Impl>()) {
+    impl_->type_ = binary_type;
+    impl_->size_ = size;
+    impl_->is_null_ = false;
+    impl_->own_data_ = own_data;
+    if (own_data) {
+        impl_->value_.Buffer = new char[size];
+        memcpy(impl_->value_.Buffer, str, size);
+        impl_->hash_code_ = impl_->CalculateHashCode();
+    } else {
+        impl_->value_.Buffer = const_cast<char*>(str);
+        impl_->hash_code_ = impl_->CalculateHashCode();
+    }
+}
+
+Literal::Literal(FieldType date_type, int32_t date_value) : 
impl_(std::make_unique<Impl>()) {
+    impl_->type_ = date_type;
+    impl_->is_null_ = false;
+    impl_->value_.IntVal = date_value;
+    impl_->hash_code_ = impl_->CalculateHashCode();
+}
+
+Literal::Literal(Literal&& other) {
+    *this = std::move(other);
+}
+
+Literal::Literal(const Literal& other) {
+    *this = other;
+}
+
+Literal& Literal::operator=(Literal&& other) {
+    if (&other == this) {
+        return *this;
+    }
+    if (this->impl_) {
+        this->impl_->Release();
+    } else {
+        impl_ = std::make_unique<Impl>();
+    }
+    impl_->type_ = other.impl_->type_;
+    impl_->size_ = other.impl_->size_;
+    impl_->is_null_ = other.impl_->is_null_;
+    impl_->hash_code_ = other.impl_->hash_code_;
+    impl_->own_data_ = other.impl_->own_data_;
+    impl_->value_ = other.impl_->value_;
+    if (impl_->type_ == FieldType::STRING || impl_->type_ == FieldType::BINARY 
||
+        impl_->type_ == FieldType::BLOB) {
+        other.impl_->value_.Buffer = nullptr;
+    }
+    return *this;
+}
+
+Literal& Literal::operator=(const Literal& other) {
+    if (&other == this) {
+        return *this;
+    }
+    if (this->impl_) {
+        this->impl_->Release();
+    } else {
+        impl_ = std::make_unique<Impl>();
+    }
+    impl_->type_ = other.impl_->type_;
+    impl_->size_ = other.impl_->size_;
+    impl_->is_null_ = other.impl_->is_null_;
+    impl_->hash_code_ = other.impl_->hash_code_;
+    impl_->own_data_ = other.impl_->own_data_;
+    if ((impl_->type_ == FieldType::STRING || impl_->type_ == 
FieldType::BINARY ||
+         impl_->type_ == FieldType::BLOB) &&
+        impl_->own_data_) {
+        impl_->value_.Buffer = new char[other.impl_->size_];
+        memcpy(impl_->value_.Buffer, other.impl_->value_.Buffer, 
other.impl_->size_);
+    } else {
+        impl_->value_ = other.impl_->value_;
+    }
+    return *this;
+}
+
+Literal::~Literal() {
+    impl_->Release();
+}
+
+bool Literal::IsNull() const {
+    return impl_->is_null_;
+}
+
+FieldType Literal::GetType() const {
+    return impl_->type_;
+}
+
+size_t Literal::HashCode() const {
+    return impl_->hash_code_;
+}
+
+std::string Literal::ToString() const {
+    if (impl_->is_null_) {
+        return "null";
+    }
+
+    std::ostringstream sstream;
+    std::string str;
+    switch (impl_->type_) {
+        case FieldType::BOOLEAN:
+            sstream << (impl_->value_.BooleanVal ? "true" : "false");
+            break;
+        case FieldType::TINYINT:
+            sstream << impl_->value_.TinyIntVal;
+            break;
+        case FieldType::SMALLINT:
+            sstream << impl_->value_.SmallIntVal;
+            break;
+        case FieldType::INT:
+            sstream << impl_->value_.IntVal;
+            break;
+        case FieldType::BIGINT:
+            sstream << impl_->value_.BigIntVal;
+            break;
+        case FieldType::FLOAT:
+            sstream << impl_->value_.FloatVal;
+            break;
+        case FieldType::DOUBLE:
+            sstream << impl_->value_.DoubleVal;
+            break;
+        case FieldType::STRING:
+        case FieldType::BINARY:
+            str.assign(impl_->value_.Buffer, impl_->size_);
+            sstream << str;
+            break;
+        case FieldType::TIMESTAMP:
+            sstream << impl_->value_.TimestampVal.ToString();
+            break;
+        case FieldType::DECIMAL:
+            sstream << impl_->value_.DecimalVal.ToString();
+            break;
+        case FieldType::DATE:
+            sstream << impl_->value_.IntVal;
+            break;
+        default:
+            sstream << "unknown type id:" << 
FieldTypeUtils::FieldTypeToString(impl_->type_);
+    }
+    return sstream.str();
+}
+
+Result<int32_t> Literal::CompareTo(const Literal& other) const {
+    if (this == &other) {
+        return 0;
+    }
+    // TODO(xinyu.lxy): compare with BIGINT and FLOAT/ INT and BIGINT
+    if (impl_->type_ != other.impl_->type_) {
+        return Status::Invalid(
+            fmt::format("cannot compare with different type [{}: {}], [{}: 
{}]", ToString(),
+                        FieldTypeUtils::FieldTypeToString(impl_->type_), 
other.ToString(),
+                        
FieldTypeUtils::FieldTypeToString(other.impl_->type_)));
+    }
+    if (impl_->is_null_ && other.impl_->is_null_) {
+        return 0;
+    }
+    if (impl_->is_null_ || other.impl_->is_null_) {
+        return Status::Invalid("cannot compare with null");
+    }
+    switch (impl_->type_) {
+        case FieldType::BOOLEAN:
+            return impl_->value_.BooleanVal == other.impl_->value_.BooleanVal
+                       ? 0
+                       : ((impl_->value_.BooleanVal < 
other.impl_->value_.BooleanVal) ? -1 : 1);
+        case FieldType::TINYINT:
+            return impl_->value_.TinyIntVal == other.impl_->value_.TinyIntVal
+                       ? 0
+                       : ((impl_->value_.TinyIntVal < 
other.impl_->value_.TinyIntVal) ? -1 : 1);
+        case FieldType::SMALLINT:
+            return impl_->value_.SmallIntVal == other.impl_->value_.SmallIntVal
+                       ? 0
+                       : ((impl_->value_.SmallIntVal < 
other.impl_->value_.SmallIntVal) ? -1 : 1);
+        case FieldType::INT:
+            return impl_->value_.IntVal == other.impl_->value_.IntVal
+                       ? 0
+                       : ((impl_->value_.IntVal < other.impl_->value_.IntVal) 
? -1 : 1);
+        case FieldType::BIGINT:
+            return impl_->value_.BigIntVal == other.impl_->value_.BigIntVal
+                       ? 0
+                       : ((impl_->value_.BigIntVal < 
other.impl_->value_.BigIntVal) ? -1 : 1);
+        case FieldType::FLOAT:
+            return 
FieldsComparator::CompareFloatingPoint(impl_->value_.FloatVal,
+                                                          
other.impl_->value_.FloatVal);
+        case FieldType::DOUBLE:
+            return 
FieldsComparator::CompareFloatingPoint(impl_->value_.DoubleVal,
+                                                          
other.impl_->value_.DoubleVal);
+        case FieldType::STRING:
+        case FieldType::BINARY: {
+            std::string_view v1(impl_->value_.Buffer, impl_->size_);
+            std::string_view v2(other.impl_->value_.Buffer, 
other.impl_->size_);
+            int32_t cmp = v1.compare(v2);
+            return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+        }
+        case FieldType::TIMESTAMP:
+            return impl_->value_.TimestampVal == 
other.impl_->value_.TimestampVal
+                       ? 0
+                       : (impl_->value_.TimestampVal < 
other.impl_->value_.TimestampVal ? -1 : 1);
+        case FieldType::DECIMAL:
+            return 
impl_->value_.DecimalVal.CompareTo(other.impl_->value_.DecimalVal);
+        case FieldType::DATE:
+            return impl_->value_.IntVal == other.impl_->value_.IntVal
+                       ? 0
+                       : ((impl_->value_.IntVal < other.impl_->value_.IntVal) 
? -1 : 1);
+        default:
+            return Status::Invalid(fmt::format("unsupported type {}",
+                                               
FieldTypeUtils::FieldTypeToString(impl_->type_)));
+    }
+}
+
+bool Literal::operator==(const Literal& other) const {
+    if (this == &other) {
+        return true;
+    }
+    auto result = CompareTo(other);
+    if (!result.ok()) {
+        return false;
+    }
+    return result.value() == 0;
+}
+
+bool Literal::operator!=(const Literal& r) const {
+    return !(*this == r);
+}
+
+template <typename T>
+T Literal::GetValue() const {
+    if constexpr (std::is_same_v<T, bool>) {
+        return impl_->value_.BooleanVal;
+    } else if constexpr (std::is_same_v<T, int8_t>) {
+        return impl_->value_.TinyIntVal;
+    } else if constexpr (std::is_same_v<T, int16_t>) {
+        return impl_->value_.SmallIntVal;
+    } else if constexpr (std::is_same_v<T, int32_t>) {
+        return impl_->value_.IntVal;
+    } else if constexpr (std::is_same_v<T, int64_t>) {
+        return impl_->value_.BigIntVal;
+    } else if constexpr (std::is_same_v<T, float>) {
+        return impl_->value_.FloatVal;
+    } else if constexpr (std::is_same_v<T, double>) {
+        return impl_->value_.DoubleVal;
+    } else if constexpr (std::is_same_v<T, std::string>) {
+        return std::string(impl_->value_.Buffer, impl_->size_);
+    } else if constexpr (std::is_same_v<T, Timestamp>) {
+        return impl_->value_.TimestampVal;
+    } else if constexpr (std::is_same_v<T, Decimal>) {
+        return impl_->value_.DecimalVal;
+    } else {
+        return T();
+    }
+}
+
+template Literal::Literal(const bool&);
+template Literal::Literal(const int8_t&);
+template Literal::Literal(const int16_t&);
+template Literal::Literal(const int32_t&);
+template Literal::Literal(const int64_t&);
+template Literal::Literal(const float&);
+template Literal::Literal(const double&);
+template Literal::Literal(const Timestamp&);
+template Literal::Literal(const Decimal&);
+
+template bool Literal::GetValue() const;
+template int8_t Literal::GetValue() const;
+template int16_t Literal::GetValue() const;
+template int32_t Literal::GetValue() const;
+template int64_t Literal::GetValue() const;
+template float Literal::GetValue() const;
+template double Literal::GetValue() const;
+template std::string Literal::GetValue() const;
+template Timestamp Literal::GetValue() const;
+template Decimal Literal::GetValue() const;
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/literal_converter.cpp 
b/src/paimon/common/predicate/literal_converter.cpp
new file mode 100644
index 0000000..dafedc1
--- /dev/null
+++ b/src/paimon/common/predicate/literal_converter.cpp
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/predicate/literal_converter.h"
+
+#include <optional>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+Result<Literal> LiteralConverter::ConvertLiteralsFromString(const FieldType& 
type,
+                                                            const std::string& 
value_str) {
+    switch (type) {
+        case FieldType::BOOLEAN: {
+            auto value = StringUtils::StringToValue<bool>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to 
BOOLEAN type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::TINYINT: {
+            auto value = StringUtils::StringToValue<int8_t>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to 
TINYINT type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::SMALLINT: {
+            auto value = StringUtils::StringToValue<int16_t>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(
+                    fmt::format("cannot convert {} to SMALLINT type", 
value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::INT: {
+            auto value = StringUtils::StringToValue<int32_t>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to INT 
type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::DATE: {
+            PAIMON_ASSIGN_OR_RAISE(int32_t value, 
StringUtils::StringToDate(value_str));
+            return Literal(FieldType::DATE, value);
+        }
+        case FieldType::BIGINT: {
+            auto value = StringUtils::StringToValue<int64_t>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to 
BIGINT type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::FLOAT: {
+            auto value = StringUtils::StringToValue<float>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to FLOAT 
type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::DOUBLE: {
+            auto value = StringUtils::StringToValue<double>(value_str);
+            if (value == std::nullopt) {
+                return Status::Invalid(fmt::format("cannot convert {} to 
DOUBLE type", value_str));
+            }
+            return Literal(value.value());
+        }
+        case FieldType::STRING:
+        case FieldType::BINARY:
+            return Literal(type, value_str.data(), value_str.size());
+        default:
+            return Status::Invalid(
+                fmt::format("Do not support type {} in 
ConvertLiteralsFromString",
+                            FieldTypeUtils::FieldTypeToString(type)));
+    }
+}
+
+Result<Literal> LiteralConverter::ConvertLiteralsFromRow(
+    const std::shared_ptr<arrow::Schema>& schema, const InternalRow& row, 
int32_t field_idx,
+    const FieldType& type) {
+    if (row.IsNullAt(field_idx)) {
+        return Literal(type);
+    }
+    switch (type) {
+        case FieldType::BOOLEAN:
+            return Literal(row.GetBoolean(field_idx));
+        case FieldType::TINYINT:
+            return Literal(static_cast<int8_t>(row.GetByte(field_idx)));
+        case FieldType::SMALLINT:
+            return Literal(row.GetShort(field_idx));
+        case FieldType::INT:
+            return Literal(row.GetInt(field_idx));
+        case FieldType::BIGINT:
+            return Literal(row.GetLong(field_idx));
+        case FieldType::FLOAT:
+            return Literal(row.GetFloat(field_idx));
+        case FieldType::DOUBLE:
+            return Literal(row.GetDouble(field_idx));
+        case FieldType::STRING: {
+            std::string field = row.GetString(field_idx).ToString();
+            return Literal(type, field.data(), field.size());
+        }
+        case FieldType::BINARY: {
+            auto field = row.GetBinary(field_idx);
+            return Literal(type, field->data(), field->size());
+        }
+        case FieldType::TIMESTAMP: {
+            auto timestamp_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(
+                schema->field(field_idx)->type());
+            if (!timestamp_type) {
+                return Status::Invalid(
+                    fmt::format("Convert literal from row not valid for schema 
{}, field_idx {}",
+                                schema->ToString(), field_idx));
+            }
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            Timestamp field = row.GetTimestamp(field_idx, precision);
+            return Literal(field);
+        }
+        case FieldType::DECIMAL: {
+            auto* decimal_type = 
arrow::internal::checked_cast<arrow::Decimal128Type*>(
+                schema->field(field_idx)->type().get());
+            if (!decimal_type) {
+                return Status::Invalid(
+                    fmt::format("Convert literal from row not valid for schema 
{}, field_idx {}",
+                                schema->ToString(), field_idx));
+            }
+            auto precision = decimal_type->precision();
+            auto scale = decimal_type->scale();
+            Decimal field = row.GetDecimal(field_idx, precision, scale);
+            return Literal(field);
+        }
+        case FieldType::DATE:
+            return Literal(FieldType::DATE, row.GetInt(field_idx));
+        case FieldType::ARRAY:
+        case FieldType::MAP:
+        case FieldType::STRUCT:
+        default:
+            return Status::Invalid(fmt::format("Convert literal from row not 
valid for {}",
+                                               
FieldTypeUtils::FieldTypeToString(type)));
+    }
+}
+
+Result<std::vector<Literal>> LiteralConverter::ConvertLiteralsFromArray(const 
arrow::Array& array,
+                                                                        bool 
own_data) {
+    const auto kind = array.type_id();
+    switch (kind) {
+        case arrow::Type::type::BOOL:
+            return GetLiteralFromGenericArray<arrow::BooleanType>(array, 
FieldType::BOOLEAN);
+        case arrow::Type::type::INT8:
+            return GetLiteralFromGenericArray<arrow::Int8Type>(array, 
FieldType::TINYINT);
+        case arrow::Type::type::INT16:
+            return GetLiteralFromGenericArray<arrow::Int16Type>(array, 
FieldType::SMALLINT);
+        case arrow::Type::type::INT32:
+            return GetLiteralFromGenericArray<arrow::Int32Type>(array, 
FieldType::INT);
+        case arrow::Type::type::INT64:
+            return GetLiteralFromGenericArray<arrow::Int64Type>(array, 
FieldType::BIGINT);
+        case arrow::Type::type::FLOAT:
+            return GetLiteralFromGenericArray<arrow::FloatType>(array, 
FieldType::FLOAT);
+        case arrow::Type::type::DOUBLE:
+            return GetLiteralFromGenericArray<arrow::DoubleType>(array, 
FieldType::DOUBLE);
+        case arrow::Type::type::STRING:
+            return GetLiteralFromStringArray<arrow::StringType>(array, 
FieldType::STRING, own_data);
+        case arrow::Type::type::BINARY:
+            return GetLiteralFromStringArray<arrow::BinaryType>(array, 
FieldType::BINARY, own_data);
+        case arrow::Type::type::TIMESTAMP:
+            return GetLiteralFromTimestampArray(array);
+        case arrow::Type::type::DECIMAL128:
+            return GetLiteralFromDecimalArray(array);
+        case arrow::Type::type::DATE32:
+            return GetLiteralFromDateArray(array);
+        case arrow::Type::type::DICTIONARY: {
+            const auto& dict_array =
+                arrow::internal::checked_cast<const 
arrow::DictionaryArray&>(array);
+            auto* dict_type =
+                
arrow::internal::checked_cast<arrow::DictionaryType*>(dict_array.type().get());
+            auto value_type_id = dict_type->value_type()->id();
+            auto index_type_id = dict_type->index_type()->id();
+            if (value_type_id == arrow::Type::type::STRING &&
+                index_type_id == arrow::Type::type::INT32) {
+                return GetLiteralFromDictionaryArray<arrow::StringArray, 
arrow::Int32Array>(
+                    dict_array, FieldType::STRING, own_data);
+            } else if (value_type_id == arrow::Type::type::LARGE_STRING &&
+                       index_type_id == arrow::Type::type::INT64) {
+                return GetLiteralFromDictionaryArray<arrow::LargeStringArray, 
arrow::Int64Array>(
+                    dict_array, FieldType::STRING, own_data);
+            } else {
+                return Status::Invalid(
+                    "only support [STRING, INT32] or [LARGE_STRING, INT64] for 
DictionaryArray");
+            }
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("Not support literal on arrow {} type", 
array.type()->ToString()));
+    }
+}
+
+std::vector<Literal> LiteralConverter::GetLiteralFromDecimalArray(const 
arrow::Array& array) {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+    const auto& array_(arrow::internal::checked_cast<const ArrayType&>(array));
+    auto* arrow_type = 
arrow::internal::checked_cast<arrow::Decimal128Type*>(array.type().get());
+    int32_t precision = arrow_type->precision();
+    int32_t scale = arrow_type->scale();
+    std::vector<Literal> literals;
+    literals.reserve(array_.length());
+    for (int64_t i = 0; i < array_.length(); i++) {
+        if (array_.IsNull(i)) {
+            literals.emplace_back(FieldType::DECIMAL);
+        } else {
+            const arrow::Decimal128 decimal(array_.GetValue(i));
+            auto value = static_cast<Decimal::int128_t>(
+                
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) << 
64 |
+                decimal.low_bits());
+            literals.emplace_back(Decimal(precision, scale, value));
+        }
+    }
+    return literals;
+}
+
+std::vector<Literal> LiteralConverter::GetLiteralFromDateArray(const 
arrow::Array& array) {
+    using ArrayType = typename arrow::TypeTraits<arrow::Date32Type>::ArrayType;
+    const auto& array_(arrow::internal::checked_cast<const ArrayType&>(array));
+    std::vector<Literal> literals;
+    literals.reserve(array_.length());
+    for (int64_t i = 0; i < array_.length(); i++) {
+        if (array_.IsNull(i)) {
+            literals.emplace_back(FieldType::DATE);
+        } else {
+            literals.emplace_back(FieldType::DATE, array_.Value(i));
+        }
+    }
+    return literals;
+}
+
+std::vector<Literal> LiteralConverter::GetLiteralFromTimestampArray(const 
arrow::Array& array) {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+    const auto& array_(arrow::internal::checked_cast<const ArrayType&>(array));
+    auto timestamp_type =
+        
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array_.type());
+    assert(timestamp_type);
+    DateTimeUtils::TimeType time_type = 
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+    std::vector<Literal> literals;
+    literals.reserve(array_.length());
+    for (int64_t i = 0; i < array_.length(); i++) {
+        if (array_.IsNull(i)) {
+            literals.emplace_back(FieldType::TIMESTAMP);
+        } else {
+            int64_t data = array_.Value(i);
+            auto [milli, nano] = DateTimeUtils::TimestampConverter(
+                data, time_type, DateTimeUtils::TimeType::MILLISECOND,
+                DateTimeUtils::TimeType::NANOSECOND);
+            literals.emplace_back(Timestamp(milli, nano));
+        }
+    }
+    return literals;
+}
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/literal_converter.h 
b/src/paimon/common/predicate/literal_converter.h
new file mode 100644
index 0000000..7b990a2
--- /dev/null
+++ b/src/paimon/common/predicate/literal_converter.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_dict.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+namespace arrow {
+class Array;
+class Schema;
+class StringArray;
+}  // namespace arrow
+
+namespace paimon {
+class InternalRow;
+enum class FieldType;
+/// Utils for convert `Literal`.
+class PAIMON_EXPORT LiteralConverter {
+ public:
+    LiteralConverter() = delete;
+    ~LiteralConverter() = delete;
+
+    static Result<std::vector<Literal>> ConvertLiteralsFromArray(const 
arrow::Array& array,
+                                                                 bool 
own_data);
+
+    static Result<Literal> ConvertLiteralsFromString(const FieldType& type,
+                                                     const std::string& 
value_str);
+
+    static Result<Literal> ConvertLiteralsFromRow(const 
std::shared_ptr<arrow::Schema>& schema,
+                                                  const InternalRow& row, 
int32_t field_idx,
+                                                  const FieldType& type);
+
+ private:
+    template <class DataType>
+    static std::vector<Literal> GetLiteralFromGenericArray(const arrow::Array& 
array,
+                                                           const FieldType& 
literal_type) {
+        using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+        using ValueType = typename arrow::TypeTraits<DataType>::CType;
+        const ArrayType& array_(arrow::internal::checked_cast<const 
ArrayType&>(array));
+        std::vector<Literal> literals;
+        literals.reserve(array_.length());
+        for (int64_t i = 0; i < array_.length(); i++) {
+            if (array_.IsNull(i)) {
+                literals.emplace_back(literal_type);
+            } else {
+                literals.emplace_back(static_cast<ValueType>(array_.Value(i)));
+            }
+        }
+        return literals;
+    }
+
+    template <class DataType>
+    static std::vector<Literal> GetLiteralFromStringArray(const arrow::Array& 
array,
+                                                          const FieldType& 
literal_type,
+                                                          bool own_data) {
+        using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+        using OffsetType = typename ArrayType::offset_type;
+        const ArrayType& array_(arrow::internal::checked_cast<const 
ArrayType&>(array));
+        std::vector<Literal> literals;
+        literals.reserve(array_.length());
+        for (int64_t i = 0; i < array_.length(); i++) {
+            if (array_.IsNull(i)) {
+                literals.emplace_back(literal_type);
+            } else {
+                OffsetType length = 0;
+                const uint8_t* value = array_.GetValue(i, &length);
+                literals.emplace_back(literal_type, reinterpret_cast<const 
char*>(value), length,
+                                      own_data);
+            }
+        }
+        return literals;
+    }
+
+    template <typename DictArrayType, typename IndicesArrayType>
+    static std::vector<Literal> GetLiteralFromDictionaryArray(
+        const arrow::DictionaryArray& dict_array, const FieldType& 
literal_type, bool own_data) {
+        auto* dictionary =
+            
arrow::internal::checked_cast<DictArrayType*>(dict_array.dictionary().get());
+        auto* indices =
+            
arrow::internal::checked_cast<IndicesArrayType*>(dict_array.indices().get());
+        assert(dictionary);
+        assert(indices);
+        std::vector<Literal> literals;
+        literals.reserve(dict_array.length());
+        for (int64_t i = 0; i < dict_array.length(); ++i) {
+            if (dict_array.IsNull(i)) {
+                literals.emplace_back(literal_type);
+            } else {
+                int64_t dict_index = indices->Value(i);
+                if constexpr (std::is_same_v<DictArrayType, 
arrow::StringArray>) {
+                    int32_t length = 0;
+                    const uint8_t* value = dictionary->GetValue(dict_index, 
&length);
+                    literals.emplace_back(literal_type, reinterpret_cast<const 
char*>(value),
+                                          length, own_data);
+                } else {
+                    int64_t length = 0;
+                    const uint8_t* value = dictionary->GetValue(dict_index, 
&length);
+                    literals.emplace_back(literal_type, reinterpret_cast<const 
char*>(value),
+                                          length, own_data);
+                }
+            }
+        }
+        return literals;
+    }
+
+    static std::vector<Literal> GetLiteralFromDecimalArray(const arrow::Array& 
array);
+
+    static std::vector<Literal> GetLiteralFromDateArray(const arrow::Array& 
array);
+
+    static std::vector<Literal> GetLiteralFromTimestampArray(const 
arrow::Array& array);
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/literal_converter_test.cpp 
b/src/paimon/common/predicate/literal_converter_test.cpp
new file mode 100644
index 0000000..7d03f9c
--- /dev/null
+++ b/src/paimon/common/predicate/literal_converter_test.cpp
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/predicate/literal_converter.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstddef>
+#include <variant>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class LiteralConverterTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+    void CheckResult(const std::shared_ptr<arrow::Array>& field_array,
+                     const std::vector<Literal>& expected) const {
+        ASSERT_OK_AND_ASSIGN(
+            std::vector<Literal> literals,
+            LiteralConverter::ConvertLiteralsFromArray(*field_array, 
/*own_data=*/false));
+        ASSERT_EQ(literals.size(), expected.size());
+        ASSERT_EQ(literals, expected);
+    }
+
+    void CheckLiteralsFromString(const FieldType& type, const 
std::vector<std::string>& strs,
+                                 const std::vector<Literal>& expected) const {
+        ASSERT_EQ(strs.size(), expected.size());
+        for (size_t i = 0; i < strs.size(); i++) {
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
LiteralConverter::ConvertLiteralsFromString(type, strs[i]));
+            ASSERT_EQ(result, expected[i]);
+        }
+    }
+
+    void CheckLiteralFromRow(const std::shared_ptr<arrow::DataType>& data_type,
+                             const BinaryRowGenerator::ValueType& values, 
const FieldType& type,
+                             const std::vector<Literal>& expected) const {
+        auto pool = GetDefaultPool();
+        auto schema = arrow::schema(arrow::FieldVector({arrow::field("f0", 
data_type)}));
+        for (size_t i = 0; i < values.size(); i++) {
+            // each value generates a row with 1 arity
+            BinaryRow row = BinaryRowGenerator::GenerateRow({values[i]}, 
pool.get());
+            ASSERT_OK_AND_ASSIGN(auto result, 
LiteralConverter::ConvertLiteralsFromRow(
+                                                  schema, row, 
/*field_idx=*/0, type));
+            ASSERT_EQ(result, expected[i]);
+        }
+    }
+};
+
+TEST_F(LiteralConverterTest, TestBooleanLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([true, 
false, null])")
+            .ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(true), Literal(false), 
Literal(FieldType::BOOLEAN)}));
+    CheckLiteralsFromString(
+        FieldType::BOOLEAN, {"true", "false", "yes", "no"},
+        std::vector<Literal>({Literal(true), Literal(false), Literal(true), 
Literal(false)}));
+    CheckLiteralFromRow(arrow::boolean(), {true, false, NullType()}, 
FieldType::BOOLEAN,
+                        {Literal(true), Literal(false), 
Literal(FieldType::BOOLEAN)});
+}
+
+TEST_F(LiteralConverterTest, TestTinyIntLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([4, 5, 
null])").ValueOrDie();
+    CheckResult(field_array, 
std::vector<Literal>({Literal(static_cast<int8_t>(4)),
+                                                   
Literal(static_cast<int8_t>(5)),
+                                                   
Literal(FieldType::TINYINT)}));
+    CheckLiteralsFromString(
+        FieldType::TINYINT, {"4", "5"},
+        std::vector<Literal>({Literal(static_cast<int8_t>(4)), 
Literal(static_cast<int8_t>(5))}));
+    CheckLiteralFromRow(arrow::int8(), {static_cast<int8_t>(4), 
static_cast<int8_t>(5), NullType()},
+                        FieldType::TINYINT,
+                        {Literal(static_cast<int8_t>(4)), 
Literal(static_cast<int8_t>(5)),
+                         Literal(FieldType::TINYINT)});
+}
+TEST_F(LiteralConverterTest, TestSmallIntLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int16(), R"([45, 55, 
null])").ValueOrDie();
+    CheckResult(field_array, 
std::vector<Literal>({Literal(static_cast<int16_t>(45)),
+                                                   
Literal(static_cast<int16_t>(55)),
+                                                   
Literal(FieldType::SMALLINT)}));
+    CheckLiteralsFromString(FieldType::SMALLINT, {"45", "55"},
+                            
std::vector<Literal>({Literal(static_cast<int16_t>(45)),
+                                                  
Literal(static_cast<int16_t>(55))}));
+    CheckLiteralFromRow(arrow::int16(),
+                        {static_cast<int16_t>(45), static_cast<int16_t>(55), 
NullType()},
+                        FieldType::SMALLINT,
+                        {Literal(static_cast<int16_t>(45)), 
Literal(static_cast<int16_t>(55)),
+                         Literal(FieldType::SMALLINT)});
+}
+TEST_F(LiteralConverterTest, TestIntLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([456, 
567, null])")
+            .ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(456), Literal(567), 
Literal(FieldType::INT)}));
+    CheckLiteralsFromString(FieldType::INT, {"456", "567"},
+                            std::vector<Literal>({Literal(456), 
Literal(567)}));
+    CheckLiteralFromRow(arrow::int32(),
+                        {static_cast<int32_t>(456), static_cast<int32_t>(567), 
NullType()},
+                        FieldType::INT,
+                        {Literal(static_cast<int32_t>(456)), 
Literal(static_cast<int32_t>(567)),
+                         Literal(FieldType::INT)});
+}
+
+TEST_F(LiteralConverterTest, TestBigIntLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), R"([4, 5, 
null])").ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(4l), Literal(5l), 
Literal(FieldType::BIGINT)}));
+    CheckLiteralsFromString(FieldType::BIGINT, {"4", "5"},
+                            std::vector<Literal>({Literal(4l), Literal(5l)}));
+    CheckLiteralFromRow(arrow::int64(),
+                        {static_cast<int64_t>(4), static_cast<int64_t>(5), 
NullType()},
+                        FieldType::BIGINT,
+                        {Literal(static_cast<int64_t>(4)), 
Literal(static_cast<int64_t>(5)),
+                         Literal(FieldType::BIGINT)});
+}
+
+TEST_F(LiteralConverterTest, TestFloatLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::float32(), R"([4.0, 
5.1, NaN, null])")
+            .ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>(
+                    {Literal(static_cast<float>(4.0)), 
Literal(static_cast<float>(5.1)),
+                     Literal(static_cast<float>(std::nan(""))), 
Literal(FieldType::FLOAT)}));
+    // literal from string do not support nan and inf
+    CheckLiteralsFromString(FieldType::FLOAT, {"4.0", "5.1"},
+                            std::vector<Literal>({Literal(4.0f), 
Literal(5.1f)}));
+    CheckLiteralFromRow(arrow::float32(), {4.0f, 5.1f, INFINITY, -INFINITY, 
NAN, NullType()},
+                        FieldType::FLOAT,
+                        {Literal(4.0f), Literal(5.1f), Literal(INFINITY), 
Literal(-INFINITY),
+                         Literal(NAN), Literal(FieldType::FLOAT)});
+}
+
+TEST_F(LiteralConverterTest, TestDoubleLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::float64(), R"([4.05, 
5.17, NaN, null])")
+            .ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(4.05), Literal(5.17), 
Literal(std::nan("")),
+                                      Literal(FieldType::DOUBLE)}));
+    // literal from string do not support nan and inf
+    CheckLiteralsFromString(FieldType::DOUBLE, {"4.05", "5.17"},
+                            std::vector<Literal>({Literal(4.05), 
Literal(5.17)}));
+    CheckLiteralFromRow(arrow::float64(),
+                        {4.05, 5.17, static_cast<double> INFINITY, 
static_cast<double>(-INFINITY),
+                         static_cast<double> NAN, NullType()},
+                        FieldType::DOUBLE,
+                        {Literal(4.05), Literal(5.17), 
Literal(static_cast<double> INFINITY),
+                         Literal(static_cast<double>(-INFINITY)), 
Literal(static_cast<double> NAN),
+                         Literal(FieldType::DOUBLE)});
+}
+
+TEST_F(LiteralConverterTest, TestStringLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["apple", 
"苹果", null])")
+            .ValueOrDie();
+    std::string str = "苹果";
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(FieldType::STRING, "apple", 5),
+                                      Literal(FieldType::STRING, str.data(), 
str.size()),
+                                      Literal(FieldType::STRING)}));
+    CheckLiteralsFromString(
+        FieldType::STRING, {"apple", "苹果"},
+        std::vector<Literal>({Literal(FieldType::STRING, "apple", 5),
+                              Literal(FieldType::STRING, str.data(), 
str.size())}));
+    CheckLiteralFromRow(
+        arrow::utf8(), {std::string("apple"), std::string("苹果"), NullType()}, 
FieldType::STRING,
+        {Literal(FieldType::STRING, "apple", 5), Literal(FieldType::STRING, 
str.data(), str.size()),
+         Literal(FieldType::STRING)});
+}
+
+TEST_F(LiteralConverterTest, TestBinaryLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), 
R"(["apple", "苹果", null])")
+            .ValueOrDie();
+    std::string str = "苹果";
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(FieldType::BINARY, "apple", 5),
+                                      Literal(FieldType::BINARY, str.data(), 
str.size()),
+                                      Literal(FieldType::BINARY)}));
+    CheckLiteralsFromString(
+        FieldType::BINARY, {"apple", "苹果"},
+        std::vector<Literal>({Literal(FieldType::BINARY, "apple", 5),
+                              Literal(FieldType::BINARY, str.data(), 
str.size())}));
+    CheckLiteralFromRow(
+        arrow::binary(), {std::string("apple"), std::string("苹果"), 
NullType()}, FieldType::BINARY,
+        {Literal(FieldType::BINARY, "apple", 5), Literal(FieldType::BINARY, 
str.data(), str.size()),
+         Literal(FieldType::BINARY)});
+}
+
+TEST_F(LiteralConverterTest, TestTimestampLiteral) {
+    {
+        // nano
+        std::string timestamp_json =
+            R"(["1970-01-01T00:00:59.123456789", 
"2000-02-29T23:23:23.999999999",
+          "1899-01-01T00:59:20.001001001", "2033-05-18T03:33:20.000000000",
+          "2020-01-01T01:05:05.001", "2010-01-03T06:30:30.006163",
+          "2010-01-04T07:35:35", "2008-12-28", "2012-01-01 01:02:03", null])";
+        auto field_array = arrow::ipc::internal::json::ArrayFromJSON(
+                               arrow::timestamp(arrow::TimeUnit::NANO), 
timestamp_json)
+                               .ValueOrDie();
+        CheckResult(
+            field_array,
+            std::vector<Literal>(
+                {Literal(Timestamp(59123l, 456789)), 
Literal(Timestamp(951866603999l, 999999)),
+                 Literal(Timestamp(-2240521239999l, 1001)), 
Literal(Timestamp(2000000000000l, 0)),
+                 Literal(Timestamp(1577840705001l, 0)), 
Literal(Timestamp(1262500230006l, 163000)),
+                 Literal(Timestamp(1262590535000l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+                 Literal(Timestamp(1325379723000l, 0)), 
Literal(FieldType::TIMESTAMP)}));
+        CheckLiteralFromRow(
+            arrow::timestamp(arrow::TimeUnit::NANO),
+            {TimestampType(Timestamp(59123l, 456789), 9),
+             TimestampType(Timestamp(951866603999l, 999999), 9),
+             TimestampType(Timestamp(-2240521239999l, 1001), 9),
+             TimestampType(Timestamp(2000000000000l, 0), 9),
+             TimestampType(Timestamp(1577840705001l, 0), 9),
+             TimestampType(Timestamp(1262500230006l, 163000), 9),
+             TimestampType(Timestamp(1262590535000l, 0), 9),
+             TimestampType(Timestamp(1230422400000l, 0), 9),
+             TimestampType(Timestamp(1325379723000l, 0), 9), NullType()},
+            FieldType::TIMESTAMP,
+            {Literal(Timestamp(59123l, 456789)), 
Literal(Timestamp(951866603999l, 999999)),
+             Literal(Timestamp(-2240521239999l, 1001)), 
Literal(Timestamp(2000000000000l, 0)),
+             Literal(Timestamp(1577840705001l, 0)), 
Literal(Timestamp(1262500230006l, 163000)),
+             Literal(Timestamp(1262590535000l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+             Literal(Timestamp(1325379723000l, 0)), 
Literal(FieldType::TIMESTAMP)});
+    }
+    {
+        // second
+        std::string timestamp_json =
+            R"(["1970-01-01T00:00:59", "2000-02-29T23:23:23",
+          "1899-01-01T00:59:20", "2033-05-18T03:33:20",
+          "2020-01-01T01:05:05", "2010-01-03T06:30:30",
+          "2010-01-04T07:35:35", "2008-12-28", "2012-01-01 01:02:03", null])";
+        auto field_array = arrow::ipc::internal::json::ArrayFromJSON(
+                               arrow::timestamp(arrow::TimeUnit::SECOND), 
timestamp_json)
+                               .ValueOrDie();
+        CheckResult(
+            field_array,
+            std::vector<Literal>(
+                {Literal(Timestamp(59000l, 0)), 
Literal(Timestamp(951866603000l, 0)),
+                 Literal(Timestamp(-2240521240000l, 0)), 
Literal(Timestamp(2000000000000l, 0)),
+                 Literal(Timestamp(1577840705000l, 0)), 
Literal(Timestamp(1262500230000l, 0)),
+                 Literal(Timestamp(1262590535000l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+                 Literal(Timestamp(1325379723000l, 0)), 
Literal(FieldType::TIMESTAMP)}));
+        CheckLiteralFromRow(
+            arrow::timestamp(arrow::TimeUnit::SECOND),
+            {TimestampType(Timestamp(59000l, 0), 0), 
TimestampType(Timestamp(951866603000l, 0), 0),
+             TimestampType(Timestamp(-2240521240000l, 0), 0),
+             TimestampType(Timestamp(2000000000000l, 0), 0),
+             TimestampType(Timestamp(1577840705000l, 0), 0),
+             TimestampType(Timestamp(1262500230000l, 0), 0),
+             TimestampType(Timestamp(1262590535000l, 0), 0),
+             TimestampType(Timestamp(1230422400000l, 0), 0),
+             TimestampType(Timestamp(1325379723000l, 0), 0), NullType()},
+            FieldType::TIMESTAMP,
+            {Literal(Timestamp(59000l, 0)), Literal(Timestamp(951866603000l, 
0)),
+             Literal(Timestamp(-2240521240000l, 0)), 
Literal(Timestamp(2000000000000l, 0)),
+             Literal(Timestamp(1577840705000l, 0)), 
Literal(Timestamp(1262500230000l, 0)),
+             Literal(Timestamp(1262590535000l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+             Literal(Timestamp(1325379723000l, 0)), 
Literal(FieldType::TIMESTAMP)});
+    }
+    {
+        // milli
+        std::string timestamp_json =
+            R"(["1970-01-01T00:00:59.001", "2000-02-29T23:23:23.001",
+          "1899-01-01T00:59:20.001", "2033-05-18T03:33:20.001",
+          "2020-01-01T01:05:05.001", "2010-01-03T06:30:30.001",
+          "2010-01-04T07:35:35.001", "2008-12-28", "2012-01-01 01:02:03.001", 
null])";
+        auto field_array = arrow::ipc::internal::json::ArrayFromJSON(
+                               arrow::timestamp(arrow::TimeUnit::MILLI), 
timestamp_json)
+                               .ValueOrDie();
+        CheckResult(
+            field_array,
+            std::vector<Literal>(
+                {Literal(Timestamp(59001l, 0)), 
Literal(Timestamp(951866603001l, 0)),
+                 Literal(Timestamp(-2240521239999l, 0)), 
Literal(Timestamp(2000000000001l, 0)),
+                 Literal(Timestamp(1577840705001l, 0)), 
Literal(Timestamp(1262500230001l, 0)),
+                 Literal(Timestamp(1262590535001l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+                 Literal(Timestamp(1325379723001l, 0)), 
Literal(FieldType::TIMESTAMP)}));
+        CheckLiteralFromRow(
+            arrow::timestamp(arrow::TimeUnit::MILLI),
+            {TimestampType(Timestamp(59001l, 0), 3), 
TimestampType(Timestamp(951866603001l, 0), 3),
+             TimestampType(Timestamp(-2240521239999l, 0), 3),
+             TimestampType(Timestamp(2000000000001l, 0), 3),
+             TimestampType(Timestamp(1577840705001l, 0), 3),
+             TimestampType(Timestamp(1262500230001l, 0), 3),
+             TimestampType(Timestamp(1262590535001l, 0), 3),
+             TimestampType(Timestamp(1230422400000l, 0), 3),
+             TimestampType(Timestamp(1325379723001l, 0), 3), NullType()},
+            FieldType::TIMESTAMP,
+            {Literal(Timestamp(59001l, 0)), Literal(Timestamp(951866603001l, 
0)),
+             Literal(Timestamp(-2240521239999l, 0)), 
Literal(Timestamp(2000000000001l, 0)),
+             Literal(Timestamp(1577840705001l, 0)), 
Literal(Timestamp(1262500230001l, 0)),
+             Literal(Timestamp(1262590535001l, 0)), 
Literal(Timestamp(1230422400000l, 0)),
+             Literal(Timestamp(1325379723001l, 0)), 
Literal(FieldType::TIMESTAMP)});
+    }
+    {
+        // micro
+        std::string timestamp_json =
+            R"(["1970-01-01T00:00:59.001001", "2000-02-29T23:23:23.001001",
+          "1899-01-01T00:59:20.001001", "2033-05-18T03:33:20.001001",
+          "2020-01-01T01:05:05.001001", "2010-01-03T06:30:30.001001",
+          "2010-01-04T07:35:35.001001", "2008-12-28", "2012-01-01 
01:02:03.001001", null])";
+        auto field_array = arrow::ipc::internal::json::ArrayFromJSON(
+                               arrow::timestamp(arrow::TimeUnit::MICRO), 
timestamp_json)
+                               .ValueOrDie();
+        CheckResult(
+            field_array,
+            std::vector<Literal>(
+                {Literal(Timestamp(59001l, 1000)), 
Literal(Timestamp(951866603001l, 1000)),
+                 Literal(Timestamp(-2240521239999l, 1000)),
+                 Literal(Timestamp(2000000000001l, 1000)), 
Literal(Timestamp(1577840705001l, 1000)),
+                 Literal(Timestamp(1262500230001l, 1000)), 
Literal(Timestamp(1262590535001l, 1000)),
+                 Literal(Timestamp(1230422400000l, 0)), 
Literal(Timestamp(1325379723001l, 1000)),
+                 Literal(FieldType::TIMESTAMP)}));
+        CheckLiteralFromRow(
+            arrow::timestamp(arrow::TimeUnit::MICRO),
+            {TimestampType(Timestamp(59001l, 1000), 6),
+             TimestampType(Timestamp(951866603001l, 1000), 6),
+             TimestampType(Timestamp(-2240521239999l, 1000), 6),
+             TimestampType(Timestamp(2000000000001l, 1000), 6),
+             TimestampType(Timestamp(1577840705001l, 1000), 6),
+             TimestampType(Timestamp(1262500230001l, 1000), 6),
+             TimestampType(Timestamp(1262590535001l, 1000), 6),
+             TimestampType(Timestamp(1230422400000l, 0), 6),
+             TimestampType(Timestamp(1325379723001l, 1000), 6), NullType()},
+            FieldType::TIMESTAMP,
+            {Literal(Timestamp(59001l, 1000)), 
Literal(Timestamp(951866603001l, 1000)),
+             Literal(Timestamp(-2240521239999l, 1000)), 
Literal(Timestamp(2000000000001l, 1000)),
+             Literal(Timestamp(1577840705001l, 1000)), 
Literal(Timestamp(1262500230001l, 1000)),
+             Literal(Timestamp(1262590535001l, 1000)), 
Literal(Timestamp(1230422400000l, 0)),
+             Literal(Timestamp(1325379723001l, 1000)), 
Literal(FieldType::TIMESTAMP)});
+    }
+}
+
+TEST_F(LiteralConverterTest, TestDecimalLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(
+            arrow::decimal128(21, 3),
+            R"(["-123456789987654321.234", "123456789987654321.012", "0.000", 
"123.456", "-123.456", null])")
+            .ValueOrDie();
+    CheckResult(
+        field_array,
+        std::vector<Literal>(
+            {Literal(Decimal(21, 3, 
DecimalUtils::StrToInt128("-123456789987654321234").value())),
+             Literal(Decimal(21, 3, 
DecimalUtils::StrToInt128("123456789987654321012").value())),
+             Literal(Decimal(21, 3, 0)), Literal(Decimal(21, 3, 123456)),
+             Literal(Decimal(21, 3, -123456)), Literal(FieldType::DECIMAL)}));
+    CheckLiteralFromRow(
+        arrow::decimal128(38, 3),
+        {Decimal(38, 3, 
DecimalUtils::StrToInt128("-123456789987654338234").value()),
+         Decimal(38, 3, 
DecimalUtils::StrToInt128("123456789987654338012").value()),
+         Decimal(38, 3, 0), Decimal(38, 3, 123456), Decimal(38, 3, -123456), 
NullType()},
+        FieldType::DECIMAL,
+        {Literal(Decimal(38, 3, 
DecimalUtils::StrToInt128("-123456789987654338234").value())),
+         Literal(Decimal(38, 3, 
DecimalUtils::StrToInt128("123456789987654338012").value())),
+         Literal(Decimal(38, 3, 0)), Literal(Decimal(38, 3, 123456)),
+         Literal(Decimal(38, 3, -123456)), Literal(FieldType::DECIMAL)});
+}
+
+TEST_F(LiteralConverterTest, TestDateLiteral) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::date32(), R"([0, 4, 
-5, null])")
+            .ValueOrDie();
+    CheckResult(field_array,
+                std::vector<Literal>({Literal(FieldType::DATE, 0l), 
Literal(FieldType::DATE, 4l),
+                                      Literal(FieldType::DATE, -5l), 
Literal(FieldType::DATE)}));
+    CheckLiteralsFromString(
+        FieldType::DATE, {"1", "0", "1970-01-02", "1969-12-31"},
+        std::vector<Literal>({Literal(FieldType::DATE, 1), 
Literal(FieldType::DATE, 0),
+                              Literal(FieldType::DATE, 1), 
Literal(FieldType::DATE, -1)}));
+
+    CheckLiteralFromRow(arrow::date32(), {0, 4, -5, NullType()}, 
FieldType::DATE,
+                        {Literal(FieldType::DATE, 0l), 
Literal(FieldType::DATE, 4l),
+                         Literal(FieldType::DATE, -5l), 
Literal(FieldType::DATE)});
+}
+
+TEST_F(LiteralConverterTest, TestInvalidType) {
+    auto field_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::large_utf8(), 
R"(["apple", "苹果", null])")
+            .ValueOrDie();
+    ASSERT_NOK_WITH_MSG(
+        LiteralConverter::ConvertLiteralsFromArray(*field_array, 
/*own_data=*/false),
+        "Not support literal on arrow large_string type");
+}
+
+TEST_F(LiteralConverterTest, TestDictType) {
+    auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), 
R"(["foo", "bar", "baz"])")
+                    .ValueOrDie();
+    auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8());
+    auto indices =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0, 
2, 0, null]")
+            .ValueOrDie();
+    std::shared_ptr<arrow::DictionaryArray> field_array =
+        std::make_shared<arrow::DictionaryArray>(dict_type, indices, dict);
+
+    CheckResult(field_array,
+                std::vector<Literal>(
+                    {Literal(FieldType::STRING, "bar", 3), 
Literal(FieldType::STRING, "baz", 3),
+                     Literal(FieldType::STRING, "foo", 3), 
Literal(FieldType::STRING, "baz", 3),
+                     Literal(FieldType::STRING, "foo", 3), 
Literal(FieldType::STRING)}));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/predicate/literal_test.cpp 
b/src/paimon/common/predicate/literal_test.cpp
new file mode 100644
index 0000000..a561ccd
--- /dev/null
+++ b/src/paimon/common/predicate/literal_test.cpp
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/predicate/literal.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class LiteralTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+    void CheckResult(const Literal& literal1, const Literal& literal2, 
FieldType type) {
+        ASSERT_FALSE(literal1.IsNull());
+        ASSERT_EQ(literal1.GetType(), type);
+        ASSERT_FALSE(literal2.IsNull());
+        ASSERT_EQ(literal2.GetType(), type);
+
+        // literal1 < literal2
+        ASSERT_OK_AND_ASSIGN(auto compare_result, 
literal1.CompareTo(literal2));
+        ASSERT_EQ(compare_result, -1);
+        ASSERT_OK_AND_ASSIGN(compare_result, literal2.CompareTo(literal1));
+        ASSERT_EQ(compare_result, 1);
+        ASSERT_FALSE(literal1 == literal2);
+        ASSERT_TRUE(literal1 != literal2);
+
+        // literal1 == literal1
+        ASSERT_OK_AND_ASSIGN(compare_result, literal1.CompareTo(literal1));
+        ASSERT_EQ(compare_result, 0);
+        ASSERT_TRUE(literal1 == literal1);
+        ASSERT_FALSE(literal1 != literal1);
+
+        // literal1 == copy_literal1
+        Literal copy_literal1 = literal1;
+        ASSERT_OK_AND_ASSIGN(compare_result, 
copy_literal1.CompareTo(literal1));
+        ASSERT_EQ(compare_result, 0);
+        ASSERT_TRUE(literal1 == copy_literal1);
+        ASSERT_FALSE(literal1 != copy_literal1);
+    }
+};
+
+TEST_F(LiteralTest, TestSimple) {
+    {
+        Literal literal1(false);
+        Literal literal2(true);
+        CheckResult(literal1, literal2, FieldType::BOOLEAN);
+        ASSERT_EQ(literal1.ToString(), "false");
+        ASSERT_EQ(literal2.ToString(), "true");
+    }
+    {
+        Literal literal1(static_cast<int8_t>(10));
+        Literal literal2(static_cast<int8_t>(20));
+        CheckResult(literal1, literal2, FieldType::TINYINT);
+        ASSERT_EQ(literal1.ToString(), std::string(1, 10));
+        ASSERT_EQ(literal2.ToString(), std::string(1, 20));
+    }
+    {
+        Literal literal1(static_cast<int16_t>(100));
+        Literal literal2(static_cast<int16_t>(200));
+        CheckResult(literal1, literal2, FieldType::SMALLINT);
+        ASSERT_EQ(literal1.ToString(), "100");
+        ASSERT_EQ(literal2.ToString(), "200");
+    }
+    {
+        Literal literal1(static_cast<int32_t>(10000));
+        Literal literal2(static_cast<int32_t>(20000));
+        CheckResult(literal1, literal2, FieldType::INT);
+        ASSERT_EQ(literal1.ToString(), "10000");
+        ASSERT_EQ(literal2.ToString(), "20000");
+    }
+    {
+        Literal literal1(static_cast<int64_t>(1000000));
+        Literal literal2(static_cast<int64_t>(2000000));
+        CheckResult(literal1, literal2, FieldType::BIGINT);
+        ASSERT_EQ(literal1.ToString(), "1000000");
+        ASSERT_EQ(literal2.ToString(), "2000000");
+    }
+    {
+        Literal literal1(static_cast<float>(1000.5));
+        Literal literal2(static_cast<float>(2000.6));
+        CheckResult(literal1, literal2, FieldType::FLOAT);
+        ASSERT_EQ(literal1.ToString(), "1000.5");
+        ASSERT_EQ(literal2.ToString(), "2000.6");
+    }
+    {
+        Literal literal1(1000.5555);
+        Literal literal2(2000.6666);
+        CheckResult(literal1, literal2, FieldType::DOUBLE);
+        ASSERT_EQ(literal1.ToString(), "1000.56");
+        ASSERT_EQ(literal2.ToString(), "2000.67");
+    }
+    {
+        std::string str1("abandon");
+        std::string str2("abandon1");
+        Literal literal1(FieldType::STRING, str1.data(), str1.size());
+        Literal literal2(FieldType::STRING, str2.data(), str2.size());
+        CheckResult(literal1, literal2, FieldType::STRING);
+        ASSERT_EQ(literal1.ToString(), "abandon");
+        ASSERT_EQ(literal2.ToString(), "abandon1");
+    }
+    {
+        std::string str1("快乐每一天");
+        std::string str2("快乐每一天!");
+        Literal literal1(FieldType::BINARY, str1.data(), str1.size());
+        Literal literal2(FieldType::BINARY, str2.data(), str2.size());
+        CheckResult(literal1, literal2, FieldType::BINARY);
+        ASSERT_EQ(literal1.ToString(), "快乐每一天");
+        ASSERT_EQ(literal2.ToString(), "快乐每一天!");
+    }
+    {
+        Literal literal1(FieldType::DATE, 10000);
+        Literal literal2(FieldType::DATE, 20000);
+        CheckResult(literal1, literal2, FieldType::DATE);
+        ASSERT_EQ(literal1.ToString(), "10000");
+        ASSERT_EQ(literal2.ToString(), "20000");
+    }
+    {
+        Literal literal1(Timestamp(1725875365442l, 120000));
+        Literal literal2(Timestamp(1725875365442l, 120001));
+        CheckResult(literal1, literal2, FieldType::TIMESTAMP);
+        ASSERT_EQ(literal1.ToString(), "2024-09-09 09:49:25.442120000");
+        ASSERT_EQ(literal2.ToString(), "2024-09-09 09:49:25.442120001");
+    }
+    {
+        Literal literal1(Timestamp(1725875365442l, 120000));
+        Literal literal2(Timestamp(1725875365443l, 110000));
+        CheckResult(literal1, literal2, FieldType::TIMESTAMP);
+        ASSERT_EQ(literal1.ToString(), "2024-09-09 09:49:25.442120000");
+        ASSERT_EQ(literal2.ToString(), "2024-09-09 09:49:25.443110000");
+    }
+    {
+        // 1234.56 vs. 1235.56
+        Literal literal1(Decimal(6, 2, 123456));
+        Literal literal2(Decimal(6, 2, 123556));
+        CheckResult(literal1, literal2, FieldType::DECIMAL);
+        ASSERT_EQ(literal1.ToString(), "1234.56");
+        ASSERT_EQ(literal2.ToString(), "1235.56");
+    }
+    {
+        // 1234.56 vs. 1234.567
+        Literal literal1(Decimal(6, 2, 123456));
+        Literal literal2(Decimal(7, 3, 1234567));
+        CheckResult(literal1, literal2, FieldType::DECIMAL);
+        ASSERT_EQ(literal1.ToString(), "1234.56");
+        ASSERT_EQ(literal2.ToString(), "1234.567");
+    }
+    {
+        // 1234.56 vs. 123456789987654321.45678
+        Literal literal1(Decimal(6, 2, 123456));
+        Literal literal2(
+            Decimal(23, 5, 
DecimalUtils::StrToInt128("12345678998765432145678").value()));
+        CheckResult(literal1, literal2, FieldType::DECIMAL);
+        ASSERT_EQ(literal1.ToString(), "1234.56");
+        ASSERT_EQ(literal2.ToString(), "123456789987654321.45678");
+    }
+}
+
+TEST_F(LiteralTest, TestWithNull) {
+    {
+        Literal literal1(FieldType::BIGINT);
+        Literal literal2(FieldType::BIGINT);
+        ASSERT_TRUE(literal1.IsNull());
+        ASSERT_EQ(literal1.GetType(), FieldType::BIGINT);
+        ASSERT_TRUE(literal2.IsNull());
+        ASSERT_EQ(literal2.GetType(), FieldType::BIGINT);
+        ASSERT_OK_AND_ASSIGN(auto compare_result, 
literal1.CompareTo(literal2));
+        ASSERT_EQ(compare_result, 0);
+    }
+    {
+        Literal literal1(FieldType::BIGINT);
+        Literal literal2(FieldType::STRING);
+        ASSERT_TRUE(literal1.IsNull());
+        ASSERT_EQ(literal1.GetType(), FieldType::BIGINT);
+        ASSERT_TRUE(literal2.IsNull());
+        ASSERT_EQ(literal2.GetType(), FieldType::STRING);
+        ASSERT_NOK(literal1.CompareTo(literal2));
+    }
+    {
+        Literal literal1(FieldType::INT);
+        Literal literal2(static_cast<int32_t>(10000));
+        ASSERT_NOK(literal1.CompareTo(literal2));
+    }
+    {
+        Literal literal1(FieldType::BIGINT);
+        Literal literal2(10000.5);
+        ASSERT_NOK(literal1.CompareTo(literal2));
+    }
+    {
+        // type mismatch
+        Literal literal1(static_cast<int64_t>(10000));
+        Literal literal2(10000.5);
+        ASSERT_NOK(literal1.CompareTo(literal2));
+    }
+}
+
+TEST_F(LiteralTest, TestOwnData) {
+    {
+        std::string data = "hello world";
+        // literal points to data
+        Literal literal(FieldType::STRING, data.data(), data.size(), /*own 
data*/ false);
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+        // literal and literal2 point to data
+        Literal literal2 = literal;
+        ASSERT_EQ(data, literal2.GetValue<std::string>());
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+
+        std::string data3 = "copy hello world";
+        Literal literal3(FieldType::STRING, data3.data(), data3.size(), /*own 
data*/ true);
+        ASSERT_EQ(data3, literal3.GetValue<std::string>());
+        // literal3 point to data
+        literal3 = literal2;
+        ASSERT_EQ(data, literal3.GetValue<std::string>());
+        ASSERT_EQ(data, literal2.GetValue<std::string>());
+    }
+    {
+        std::string data = "hello world";
+        // literal points to data
+        Literal literal(FieldType::BINARY, data.data(), data.size(), /*own 
data*/ false);
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+        // literal and literal2 point to data
+        Literal literal2 = literal;
+        ASSERT_EQ(data, literal2.GetValue<std::string>());
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+
+        std::string data3 = "copy hello world";
+        Literal literal3(FieldType::BINARY, data3.data(), data3.size(), /*own 
data*/ true);
+        ASSERT_EQ(data3, literal3.GetValue<std::string>());
+        // literal3 point to data
+        literal3 = literal2;
+        ASSERT_EQ(data, literal3.GetValue<std::string>());
+        ASSERT_EQ(data, literal2.GetValue<std::string>());
+    }
+    {
+        std::string data = "hello world";
+        // literal points to data
+        Literal literal(FieldType::STRING, data.data(), data.size(), /*own 
data*/ false);
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+        Literal literal2 = std::move(literal);
+        // literal2 point to data, literal point to null
+        ASSERT_EQ(data, literal2.GetValue<std::string>());
+
+        std::string data3 = "copy hello world";
+        Literal literal3(FieldType::STRING, data3.data(), data3.size(), /*own 
data*/ true);
+        ASSERT_EQ(data3, literal3.GetValue<std::string>());
+        // literal3 point to data, literal2 point to null
+        literal3 = std::move(literal2);
+        ASSERT_EQ(data, literal3.GetValue<std::string>());
+    }
+    {
+        // test empty string
+        std::string data = "";
+        Literal literal(FieldType::STRING, data.data(), data.size(), /*own 
data*/ false);
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+        Literal literal2 = literal;
+        ASSERT_EQ(data, literal.GetValue<std::string>());
+
+        std::string data3 = "copy hello world";
+        Literal literal3(FieldType::STRING, data3.data(), data3.size(), /*own 
data*/ true);
+        ASSERT_EQ(data3, literal3.GetValue<std::string>());
+        literal3 = std::move(literal2);
+        ASSERT_EQ(data, literal3.GetValue<std::string>());
+
+        Literal literal4(FieldType::STRING, data.data(), data.size(), /*own 
data*/ true);
+        ASSERT_EQ(data, literal4.GetValue<std::string>());
+        literal4 = literal3;
+        ASSERT_EQ(data, literal4.GetValue<std::string>());
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/predicate/multi_literals_leaf_function.h 
b/src/paimon/common/predicate/multi_literals_leaf_function.h
new file mode 100644
index 0000000..93c057e
--- /dev/null
+++ b/src/paimon/common/predicate/multi_literals_leaf_function.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/common/predicate/leaf_function.h"
+#include "paimon/common/predicate/literal_converter.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class MultiLiteralsLeafFunction : public LeafFunction {
+ public:
+    Result<std::vector<char>> Test(const arrow::Array& array,
+                                   const std::vector<Literal>& literals) const 
override {
+        PAIMON_ASSIGN_OR_RAISE(
+            std::vector<Literal> array_values,
+            LiteralConverter::ConvertLiteralsFromArray(array, 
/*own_data=*/false));
+        std::vector<char> is_valid(array.length(), false);
+        for (int64_t i = 0; i < array.length(); i++) {
+            if (!array.IsNull(i)) {
+                PAIMON_ASSIGN_OR_RAISE(is_valid[i], Test(array_values[i], 
literals));
+            }
+        }
+        return is_valid;
+    }
+
+    Result<bool> Test(int64_t row_count, const Literal& min_value, const 
Literal& max_value,
+                      const std::optional<int64_t>& null_count,
+                      const std::vector<Literal>& literals) const override {
+        if (null_count != std::nullopt && row_count == null_count.value()) {
+            return false;
+        }
+        return InnerTest(row_count, min_value, max_value, null_count, 
literals);
+    }
+
+    Result<bool> Test(const Literal& field, const std::vector<Literal>& 
literals) const override {
+        if (field.IsNull()) {
+            return false;
+        }
+        return InnerTest(field, literals);
+    }
+
+    // Precondition: field is not empty
+    virtual Result<bool> InnerTest(const Literal& field,
+                                   const std::vector<Literal>& literals) const 
= 0;
+
+    virtual Result<bool> InnerTest(int64_t row_count, const Literal& min_value,
+                                   const Literal& max_value,
+                                   const std::optional<int64_t>& null_count,
+                                   const std::vector<Literal>& literals) const 
= 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/null_false_leaf_binary_function.h 
b/src/paimon/common/predicate/null_false_leaf_binary_function.h
new file mode 100644
index 0000000..56fc6ca
--- /dev/null
+++ b/src/paimon/common/predicate/null_false_leaf_binary_function.h
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/predicate/leaf_function.h"
+#include "paimon/common/predicate/literal_converter.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class NullFalseLeafBinaryFunction : public LeafFunction {
+ public:
+    Result<std::vector<char>> Test(const arrow::Array& array,
+                                   const std::vector<Literal>& literals) const 
override {
+        if (literals.size() < LITERAL_LIMIT) {
+            return Status::Invalid("NullFalseLeafBinaryFunction needs single 
literal for field");
+        }
+        std::vector<char> is_valid(array.length(), false);
+        if (literals[0].IsNull()) {
+            return is_valid;
+        }
+        PAIMON_ASSIGN_OR_RAISE(
+            std::vector<Literal> array_values,
+            LiteralConverter::ConvertLiteralsFromArray(array, 
/*own_data=*/false));
+        for (int64_t i = 0; i < array.length(); i++) {
+            if (!array.IsNull(i)) {
+                PAIMON_ASSIGN_OR_RAISE(is_valid[i], Test(array_values[i], 
literals[0]));
+            }
+        }
+        return is_valid;
+    }
+
+    Result<bool> Test(const Literal& value, const std::vector<Literal>& 
literals) const override {
+        if (literals.size() < LITERAL_LIMIT) {
+            return Status::Invalid("NullFalseLeafBinaryFunction needs single 
literal for field");
+        }
+        if (literals[0].IsNull() || value.IsNull()) {
+            return false;
+        }
+        return Test(value, literals[0]);
+    }
+
+    Result<bool> Test(int64_t row_count, const Literal& min_value, const 
Literal& max_value,
+                      const std::optional<int64_t>& null_count,
+                      const std::vector<Literal>& literals) const override {
+        if (literals.size() < LITERAL_LIMIT) {
+            return Status::Invalid("NullFalseLeafBinaryFunction needs single 
literal for field");
+        }
+        if (null_count != std::nullopt) {
+            if (row_count == null_count.value() || literals[0].IsNull()) {
+                return false;
+            }
+        }
+        return Test(row_count, min_value, max_value, null_count, literals[0]);
+    }
+
+    // Precondition: field and literals are not empty
+    virtual Result<bool> Test(const Literal& field, const Literal& literal) 
const = 0;
+    virtual Result<bool> Test(int64_t row_count, const Literal& min_value, 
const Literal& max_value,
+                              const std::optional<int64_t>& null_count,
+                              const Literal& literal) const = 0;
+
+ private:
+    static constexpr size_t LITERAL_LIMIT = 1;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/predicate_filter.h 
b/src/paimon/common/predicate/predicate_filter.h
new file mode 100644
index 0000000..799ed15
--- /dev/null
+++ b/src/paimon/common/predicate/predicate_filter.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/predicate/predicate.h"
+
+namespace paimon {
+class PredicateFilter : virtual public Predicate {
+ public:
+    /// @param array is the struct array of all fields
+    virtual Result<std::vector<char>> Test(const arrow::Array& array) const = 
0;
+    virtual Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema,
+                              const InternalRow& row) const = 0;
+    virtual Result<bool> Test(const std::shared_ptr<arrow::Schema>& schema, 
int64_t row_count,
+                              const InternalRow& min_values, const 
InternalRow& max_values,
+                              const InternalArray& null_counts) const = 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/predicate/string_leaf_binary_function.h 
b/src/paimon/common/predicate/string_leaf_binary_function.h
new file mode 100644
index 0000000..8904c77
--- /dev/null
+++ b/src/paimon/common/predicate/string_leaf_binary_function.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/predicate/null_false_leaf_binary_function.h"
+
+namespace paimon {
+class LeafFunction;
+
+class StringLeafBinaryFunction : public NullFalseLeafBinaryFunction {
+ public:
+    virtual Result<bool> TestString(const std::string& field, const 
std::string& pattern) const = 0;
+
+    Result<bool> Test(const Literal& field, const Literal& pattern_literal) 
const override {
+        return TestString(field.GetValue<std::string>(), 
pattern_literal.GetValue<std::string>());
+    }
+
+    Result<bool> Test(int64_t row_count, const Literal& min_value, const 
Literal& max_value,
+                      const std::optional<int64_t>& null_count,
+                      const Literal& literal) const override {
+        return true;
+    }
+
+    const LeafFunction* Negate() const override {
+        return nullptr;
+    }
+};
+}  // namespace paimon

Reply via email to