This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f8319e3  feat: add expression visitors (#311)
f8319e3 is described below

commit f8319e38195b7f2aaf12c48ff2fc705556851e16
Author: Gang Wu <[email protected]>
AuthorDate: Wed Nov 19 14:50:30 2025 +0800

    feat: add expression visitors (#311)
    
    - Added support to And/Or/Not to create folded expressions.
    - Refactored `UnboundPredicate` to use virtual inheritance of
    `Expression` to facilitate visitor pattern.
    - Added template class `ExpressionVisitor` and `BoundVisitor`.
    - Implemented `Binder`, `IsBoundVisitor` and `RewriteNot`.
    
    Closes #70 #71 #72
    
    ---------
    
    Co-authored-by: Zehua Zou <[email protected]>
---
 src/iceberg/CMakeLists.txt                  |   2 +
 src/iceberg/expression/binder.cc            |  95 ++++++
 src/iceberg/expression/binder.h             |  72 ++++
 src/iceberg/expression/expression.cc        |  18 +
 src/iceberg/expression/expression.h         |  91 +++++
 src/iceberg/expression/expression_visitor.h | 310 +++++++++++++++++
 src/iceberg/expression/expressions.cc       | 164 +++++----
 src/iceberg/expression/expressions.h        | 145 ++++----
 src/iceberg/expression/meson.build          |  14 +-
 src/iceberg/expression/predicate.cc         |  57 ++--
 src/iceberg/expression/predicate.h          |  52 ++-
 src/iceberg/expression/rewrite_not.cc       |  63 ++++
 src/iceberg/expression/rewrite_not.h        |  51 +++
 src/iceberg/meson.build                     |   2 +
 src/iceberg/test/CMakeLists.txt             |   1 +
 src/iceberg/test/expression_visitor_test.cc | 508 ++++++++++++++++++++++++++++
 src/iceberg/test/meson.build                |   1 +
 17 files changed, 1429 insertions(+), 217 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index a13f095..c048d29 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -20,10 +20,12 @@ set(ICEBERG_INCLUDES 
"$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
 set(ICEBERG_SOURCES
     arrow_c_data_guard_internal.cc
     catalog/memory/in_memory_catalog.cc
+    expression/binder.cc
     expression/expression.cc
     expression/expressions.cc
     expression/literal.cc
     expression/predicate.cc
+    expression/rewrite_not.cc
     expression/term.cc
     file_reader.cc
     file_writer.cc
diff --git a/src/iceberg/expression/binder.cc b/src/iceberg/expression/binder.cc
new file mode 100644
index 0000000..62c7353
--- /dev/null
+++ b/src/iceberg/expression/binder.cc
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/binder.h"
+
+namespace iceberg {
+
+Binder::Binder(const Schema& schema, bool case_sensitive)
+    : schema_(schema), case_sensitive_(case_sensitive) {}
+
+Result<std::shared_ptr<Expression>> Binder::Bind(const Schema& schema,
+                                                 const 
std::shared_ptr<Expression>& expr,
+                                                 bool case_sensitive) {
+  Binder binder(schema, case_sensitive);
+  return Visit<std::shared_ptr<Expression>, Binder>(expr, binder);
+}
+
+Result<std::shared_ptr<Expression>> Binder::AlwaysTrue() { return 
True::Instance(); }
+
+Result<std::shared_ptr<Expression>> Binder::AlwaysFalse() { return 
False::Instance(); }
+
+Result<std::shared_ptr<Expression>> Binder::Not(
+    const std::shared_ptr<Expression>& child_result) {
+  return iceberg::Not::MakeFolded(child_result);
+}
+
+Result<std::shared_ptr<Expression>> Binder::And(
+    const std::shared_ptr<Expression>& left_result,
+    const std::shared_ptr<Expression>& right_result) {
+  return iceberg::And::MakeFolded(left_result, right_result);
+}
+
+Result<std::shared_ptr<Expression>> Binder::Or(
+    const std::shared_ptr<Expression>& left_result,
+    const std::shared_ptr<Expression>& right_result) {
+  return iceberg::Or::MakeFolded(left_result, right_result);
+}
+
+Result<std::shared_ptr<Expression>> Binder::Predicate(
+    const std::shared_ptr<UnboundPredicate>& pred) {
+  ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
+  return pred->Bind(schema_, case_sensitive_);
+}
+
+Result<std::shared_ptr<Expression>> Binder::Predicate(
+    const std::shared_ptr<BoundPredicate>& pred) {
+  ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
+  return InvalidExpression("Found already bound predicate: {}", 
pred->ToString());
+}
+
+Result<bool> IsBoundVisitor::IsBound(const std::shared_ptr<Expression>& expr) {
+  ICEBERG_DCHECK(expr != nullptr, "Expression cannot be null");
+  IsBoundVisitor visitor;
+  return Visit<bool, IsBoundVisitor>(expr, visitor);
+}
+
+Result<bool> IsBoundVisitor::AlwaysTrue() { return true; }
+
+Result<bool> IsBoundVisitor::AlwaysFalse() { return true; }
+
+Result<bool> IsBoundVisitor::Not(bool child_result) { return child_result; }
+
+Result<bool> IsBoundVisitor::And(bool left_result, bool right_result) {
+  return left_result && right_result;
+}
+
+Result<bool> IsBoundVisitor::Or(bool left_result, bool right_result) {
+  return left_result && right_result;
+}
+
+Result<bool> IsBoundVisitor::Predicate(const std::shared_ptr<BoundPredicate>& 
pred) {
+  return true;
+}
+
+Result<bool> IsBoundVisitor::Predicate(const 
std::shared_ptr<UnboundPredicate>& pred) {
+  return false;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/expression/binder.h b/src/iceberg/expression/binder.h
new file mode 100644
index 0000000..fcef073
--- /dev/null
+++ b/src/iceberg/expression/binder.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/binder.h
+/// Bind an expression to a schema.
+
+#include "iceberg/expression/expression_visitor.h"
+
+namespace iceberg {
+
+class ICEBERG_EXPORT Binder : public 
ExpressionVisitor<std::shared_ptr<Expression>> {
+ public:
+  Binder(const Schema& schema, bool case_sensitive);
+
+  static Result<std::shared_ptr<Expression>> Bind(const Schema& schema,
+                                                  const 
std::shared_ptr<Expression>& expr,
+                                                  bool case_sensitive);
+
+  Result<std::shared_ptr<Expression>> AlwaysTrue() override;
+  Result<std::shared_ptr<Expression>> AlwaysFalse() override;
+  Result<std::shared_ptr<Expression>> Not(
+      const std::shared_ptr<Expression>& child_result) override;
+  Result<std::shared_ptr<Expression>> And(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override;
+  Result<std::shared_ptr<Expression>> Or(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override;
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<BoundPredicate>& pred) override;
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<UnboundPredicate>& pred) override;
+
+ private:
+  const Schema& schema_;
+  const bool case_sensitive_;
+};
+
+class ICEBERG_EXPORT IsBoundVisitor : public ExpressionVisitor<bool> {
+ public:
+  static Result<bool> IsBound(const std::shared_ptr<Expression>& expr);
+
+  Result<bool> AlwaysTrue() override;
+  Result<bool> AlwaysFalse() override;
+  Result<bool> Not(bool child_result) override;
+  Result<bool> And(bool left_result, bool right_result) override;
+  Result<bool> Or(bool left_result, bool right_result) override;
+  Result<bool> Predicate(const std::shared_ptr<BoundPredicate>& pred) override;
+  Result<bool> Predicate(const std::shared_ptr<UnboundPredicate>& pred) 
override;
+};
+
+// TODO(gangwu): add the Java parity `ReferenceVisitor`
+
+}  // namespace iceberg
diff --git a/src/iceberg/expression/expression.cc 
b/src/iceberg/expression/expression.cc
index 69446a8..b40082b 100644
--- a/src/iceberg/expression/expression.cc
+++ b/src/iceberg/expression/expression.cc
@@ -120,6 +120,24 @@ Result<std::unique_ptr<Not>> 
Not::Make(std::shared_ptr<Expression> child) {
   return std::unique_ptr<Not>(new Not(std::move(child)));
 }
 
+Result<std::shared_ptr<Expression>> 
Not::MakeFolded(std::shared_ptr<Expression> child) {
+  if (child->op() == Expression::Operation::kTrue) {
+    return False::Instance();
+  }
+
+  if (child->op() == Expression::Operation::kFalse) {
+    return True::Instance();
+  }
+
+  // not(not(x)) = x
+  if (child->op() == Expression::Operation::kNot) {
+    const auto& not_expr = internal::checked_cast<const 
::iceberg::Not&>(*child);
+    return not_expr.child();
+  }
+
+  return Not::Make(std::move(child));
+}
+
 Not::Not(std::shared_ptr<Expression> child) : child_(std::move(child)) {
   ICEBERG_DCHECK(child_ != nullptr, "Not expression cannot have null child");
 }
diff --git a/src/iceberg/expression/expression.h 
b/src/iceberg/expression/expression.h
index 931e35c..e6e19ea 100644
--- a/src/iceberg/expression/expression.h
+++ b/src/iceberg/expression/expression.h
@@ -28,6 +28,7 @@
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
 #include "iceberg/util/formattable.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg {
 
@@ -80,6 +81,11 @@ class ICEBERG_EXPORT Expression : public util::Formattable {
   }
 
   std::string ToString() const override { return "Expression"; }
+
+  virtual bool is_unbound_predicate() const { return false; }
+  virtual bool is_bound_predicate() const { return false; }
+  virtual bool is_unbound_aggregate() const { return false; }
+  virtual bool is_bound_aggregate() const { return false; }
 };
 
 /// \brief An Expression that is always true.
@@ -137,6 +143,44 @@ class ICEBERG_EXPORT And : public Expression {
   static Result<std::unique_ptr<And>> Make(std::shared_ptr<Expression> left,
                                            std::shared_ptr<Expression> right);
 
+  /// \brief Creates a folded And expression from two sub-expressions.
+  ///
+  /// \param left The left operand of the AND expression
+  /// \param right The right operand of the AND expression
+  /// \param args Additional operands of the AND expression
+  /// \return A Result containing a shared pointer to the folded And 
expression, or an
+  /// error if left or right is nullptr
+  /// \note A folded And expression is an expression that is equivalent to the 
original
+  /// expression, but with the And operation removed. For example, (true and 
x) = x.
+  template <typename... Args>
+  static Result<std::shared_ptr<Expression>> 
MakeFolded(std::shared_ptr<Expression> left,
+                                                        
std::shared_ptr<Expression> right,
+                                                        Args&&... args)
+    requires std::conjunction_v<std::is_same<Args, 
std::shared_ptr<Expression>>...>
+  {
+    if constexpr (sizeof...(args) == 0) {
+      if (left->op() == Expression::Operation::kFalse ||
+          right->op() == Expression::Operation::kFalse) {
+        return False::Instance();
+      }
+
+      if (left->op() == Expression::Operation::kTrue) {
+        return right;
+      }
+
+      if (right->op() == Expression::Operation::kTrue) {
+        return left;
+      }
+
+      return And::Make(std::move(left), std::move(right));
+    } else {
+      ICEBERG_ASSIGN_OR_THROW(auto and_expr,
+                              And::Make(std::move(left), std::move(right)));
+
+      return And::MakeFolded(std::move(and_expr), std::forward<Args>(args)...);
+    }
+  }
+
   /// \brief Returns the left operand of the AND expression.
   ///
   /// \return The left operand of the AND expression
@@ -175,6 +219,43 @@ class ICEBERG_EXPORT Or : public Expression {
   static Result<std::unique_ptr<Or>> Make(std::shared_ptr<Expression> left,
                                           std::shared_ptr<Expression> right);
 
+  /// \brief Creates a folded Or expression from two sub-expressions.
+  ///
+  /// \param left The left operand of the OR expression
+  /// \param right The right operand of the OR expression
+  /// \param args Additional operands of the OR expression
+  /// \return A Result containing a shared pointer to the folded Or 
expression, or an
+  /// error if left or right is nullptr
+  /// \note A folded Or expression is an expression that is equivalent to the 
original
+  /// expression, but with the Or operation removed. For example, (false or x) 
= x.
+  template <typename... Args>
+  static Result<std::shared_ptr<Expression>> 
MakeFolded(std::shared_ptr<Expression> left,
+                                                        
std::shared_ptr<Expression> right,
+                                                        Args&&... args)
+    requires std::conjunction_v<std::is_same<Args, 
std::shared_ptr<Expression>>...>
+  {
+    if constexpr (sizeof...(args) == 0) {
+      if (left->op() == Expression::Operation::kTrue ||
+          right->op() == Expression::Operation::kTrue) {
+        return True::Instance();
+      }
+
+      if (left->op() == Expression::Operation::kFalse) {
+        return right;
+      }
+
+      if (right->op() == Expression::Operation::kFalse) {
+        return left;
+      }
+
+      return Or::Make(std::move(left), std::move(right));
+    } else {
+      ICEBERG_ASSIGN_OR_THROW(auto or_expr, Or::Make(std::move(left), 
std::move(right)));
+
+      return Or::MakeFolded(std::move(or_expr), std::forward<Args>(args)...);
+    }
+  }
+
   /// \brief Returns the left operand of the OR expression.
   ///
   /// \return The left operand of the OR expression
@@ -211,6 +292,16 @@ class ICEBERG_EXPORT Not : public Expression {
   /// \return A Result containing a unique pointer to Not, or an error if 
child is nullptr
   static Result<std::unique_ptr<Not>> Make(std::shared_ptr<Expression> child);
 
+  /// \brief Creates a folded Not expression from a child expression.
+  ///
+  /// \param child The expression to negate
+  /// \return A Result containing a shared pointer to the folded Not 
expression, or an
+  /// error if child is nullptr
+  /// \note A folded Not expression is an expression that is equivalent to the 
original
+  /// expression, but with the Not operation removed. For example, not(not(x)) 
= x.
+  static Result<std::shared_ptr<Expression>> MakeFolded(
+      std::shared_ptr<Expression> child);
+
   /// \brief Returns the child expression.
   ///
   /// \return The child expression being negated
diff --git a/src/iceberg/expression/expression_visitor.h 
b/src/iceberg/expression/expression_visitor.h
new file mode 100644
index 0000000..aeafa92
--- /dev/null
+++ b/src/iceberg/expression/expression_visitor.h
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/expression_visitor.h
+/// Visitor pattern implementation for traversing Iceberg expression trees.
+
+#include <concepts>
+#include <memory>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/literal.h"
+#include "iceberg/expression/predicate.h"
+#include "iceberg/expression/term.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+/// \brief Base visitor for traversing expression trees.
+///
+/// This visitor traverses expression trees in postorder traversal and calls 
appropriate
+/// visitor methods for each node. Subclasses can override specific methods to 
implement
+/// custom behavior.
+///
+/// \tparam R The return type produced by visitor methods
+template <typename R>
+class ICEBERG_EXPORT ExpressionVisitor {
+  using ParamType = std::conditional_t<std::is_fundamental_v<R>, R, const R&>;
+
+ public:
+  virtual ~ExpressionVisitor() = default;
+
+  /// \brief Visit a True expression (always evaluates to true).
+  virtual Result<R> AlwaysTrue() = 0;
+
+  /// \brief Visit a False expression (always evaluates to false).
+  virtual Result<R> AlwaysFalse() = 0;
+
+  /// \brief Visit a Not expression.
+  /// \param child_result The result from visiting the child expression
+  virtual Result<R> Not(ParamType child_result) = 0;
+
+  /// \brief Visit an And expression.
+  /// \param left_result The result from visiting the left child
+  /// \param right_result The result from visiting the right child
+  virtual Result<R> And(ParamType left_result, ParamType right_result) = 0;
+
+  /// \brief Visit an Or expression.
+  /// \param left_result The result from visiting the left child
+  /// \param right_result The result from visiting the right child
+  virtual Result<R> Or(ParamType left_result, ParamType right_result) = 0;
+
+  /// \brief Visit a bound predicate.
+  /// \param pred The bound predicate to visit
+  virtual Result<R> Predicate(const std::shared_ptr<BoundPredicate>& pred) = 0;
+
+  /// \brief Visit an unbound predicate.
+  /// \param pred The unbound predicate to visit
+  virtual Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) = 
0;
+};
+
+/// \brief Visitor for bound expressions.
+///
+/// This visitor is for traversing bound expression trees.
+///
+/// \tparam R The return type produced by visitor methods
+template <typename R>
+class ICEBERG_EXPORT BoundVisitor : public ExpressionVisitor<R> {
+ public:
+  ~BoundVisitor() override = default;
+
+  /// \brief Visit an IS_NULL unary predicate.
+  /// \param term The bound term being tested
+  virtual Result<R> IsNull(const std::shared_ptr<BoundTerm>& term) = 0;
+
+  /// \brief Visit a NOT_NULL unary predicate.
+  /// \param term The bound term being tested
+  virtual Result<R> NotNull(const std::shared_ptr<BoundTerm>& term) = 0;
+
+  /// \brief Visit an IS_NAN unary predicate.
+  /// \param term The bound term being tested
+  virtual Result<R> IsNaN(const std::shared_ptr<BoundTerm>& term) {
+    return NotSupported("IsNaN operation is not supported by this visitor");
+  }
+
+  /// \brief Visit a NOT_NAN unary predicate.
+  /// \param term The bound term being tested
+  virtual Result<R> NotNaN(const std::shared_ptr<BoundTerm>& term) {
+    return NotSupported("NotNaN operation is not supported by this visitor");
+  }
+
+  /// \brief Visit a less-than predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> Lt(const std::shared_ptr<BoundTerm>& term, const Literal& 
lit) = 0;
+
+  /// \brief Visit a less-than-or-equal predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> LtEq(const std::shared_ptr<BoundTerm>& term, const 
Literal& lit) = 0;
+
+  /// \brief Visit a greater-than predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> Gt(const std::shared_ptr<BoundTerm>& term, const Literal& 
lit) = 0;
+
+  /// \brief Visit a greater-than-or-equal predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> GtEq(const std::shared_ptr<BoundTerm>& term, const 
Literal& lit) = 0;
+
+  /// \brief Visit an equality predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> Eq(const std::shared_ptr<BoundTerm>& term, const Literal& 
lit) = 0;
+
+  /// \brief Visit a not-equal predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to compare against
+  virtual Result<R> NotEq(const std::shared_ptr<BoundTerm>& term, const 
Literal& lit) = 0;
+
+  /// \brief Visit a starts-with predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to check for prefix match
+  virtual Result<R> StartsWith([[maybe_unused]] const 
std::shared_ptr<BoundTerm>& term,
+                               [[maybe_unused]] const Literal& lit) {
+    return NotSupported("StartsWith operation is not supported by this 
visitor");
+  }
+
+  /// \brief Visit a not-starts-with predicate.
+  /// \param term The bound term
+  /// \param lit The literal value to check for prefix match
+  virtual Result<R> NotStartsWith([[maybe_unused]] const 
std::shared_ptr<BoundTerm>& term,
+                                  [[maybe_unused]] const Literal& lit) {
+    return NotSupported("NotStartsWith operation is not supported by this 
visitor");
+  }
+
+  /// \brief Visit an IN set predicate.
+  /// \param term The bound term
+  /// \param literal_set The set of literal values to test membership
+  virtual Result<R> In(
+      [[maybe_unused]] const std::shared_ptr<BoundTerm>& term,
+      [[maybe_unused]] const BoundSetPredicate::LiteralSet& literal_set) {
+    return NotSupported("In operation is not supported by this visitor");
+  }
+
+  /// \brief Visit a NOT_IN set predicate.
+  /// \param term The bound term
+  /// \param literal_set The set of literal values to test membership
+  virtual Result<R> NotIn(
+      [[maybe_unused]] const std::shared_ptr<BoundTerm>& term,
+      [[maybe_unused]] const BoundSetPredicate::LiteralSet& literal_set) {
+    return NotSupported("NotIn operation is not supported by this visitor");
+  }
+
+  /// \brief Visit a bound predicate.
+  ///
+  /// This method dispatches to specific visitor methods based on the predicate
+  /// type and operation.
+  ///
+  /// \param pred The bound predicate to visit
+  Result<R> Predicate(const std::shared_ptr<BoundPredicate>& pred) override {
+    ICEBERG_DCHECK(pred != nullptr, "BoundPredicate cannot be null");
+
+    switch (pred->kind()) {
+      case BoundPredicate::Kind::kUnary: {
+        switch (pred->op()) {
+          case Expression::Operation::kIsNull:
+            return IsNull(pred->term());
+          case Expression::Operation::kNotNull:
+            return NotNull(pred->term());
+          case Expression::Operation::kIsNan:
+            return IsNaN(pred->term());
+          case Expression::Operation::kNotNan:
+            return NotNaN(pred->term());
+          default:
+            return InvalidExpression("Invalid operation for 
BoundUnaryPredicate: {}",
+                                     ToString(pred->op()));
+        }
+      }
+      case BoundPredicate::Kind::kLiteral: {
+        const auto& literal_pred =
+            internal::checked_cast<const BoundLiteralPredicate&>(*pred);
+        switch (pred->op()) {
+          case Expression::Operation::kLt:
+            return Lt(pred->term(), literal_pred.literal());
+          case Expression::Operation::kLtEq:
+            return LtEq(pred->term(), literal_pred.literal());
+          case Expression::Operation::kGt:
+            return Gt(pred->term(), literal_pred.literal());
+          case Expression::Operation::kGtEq:
+            return GtEq(pred->term(), literal_pred.literal());
+          case Expression::Operation::kEq:
+            return Eq(pred->term(), literal_pred.literal());
+          case Expression::Operation::kNotEq:
+            return NotEq(pred->term(), literal_pred.literal());
+          case Expression::Operation::kStartsWith:
+            return StartsWith(pred->term(), literal_pred.literal());
+          case Expression::Operation::kNotStartsWith:
+            return NotStartsWith(pred->term(), literal_pred.literal());
+          default:
+            return InvalidExpression("Invalid operation for 
BoundLiteralPredicate: {}",
+                                     ToString(pred->op()));
+        }
+      }
+      case BoundPredicate::Kind::kSet: {
+        const auto& set_pred = internal::checked_cast<const 
BoundSetPredicate&>(*pred);
+        switch (pred->op()) {
+          case Expression::Operation::kIn:
+            return In(pred->term(), set_pred.literal_set());
+          case Expression::Operation::kNotIn:
+            return NotIn(pred->term(), set_pred.literal_set());
+          default:
+            return InvalidExpression("Invalid operation for BoundSetPredicate: 
{}",
+                                     ToString(pred->op()));
+        }
+      }
+    }
+
+    return InvalidExpression("Unsupported bound predicate: {}", 
pred->ToString());
+  }
+
+  /// \brief Visit an unbound predicate.
+  ///
+  /// Bound visitors do not support unbound predicates.
+  ///
+  /// \param pred The unbound predicate
+  Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) final {
+    ICEBERG_DCHECK(pred != nullptr, "UnboundPredicate cannot be null");
+    return NotSupported("Not a bound predicate: {}", pred->ToString());
+  }
+};
+
+/// \brief Traverse an expression tree with a visitor.
+///
+/// This function traverses the given expression tree in postorder traversal 
and calls
+/// appropriate visitor methods for each node. Results from child nodes are 
passed to
+/// parent nodes.
+///
+/// \tparam R The return type produced by the visitor
+/// \tparam V The visitor type (must derive from ExpressionVisitor<R>)
+/// \param expr The expression to traverse
+/// \param visitor The visitor to use for traversal
+/// \return The result produced by the visitor for the root expression node
+template <typename R, typename V>
+  requires std::derived_from<V, ExpressionVisitor<R>>
+Result<R> Visit(const std::shared_ptr<Expression>& expr, V& visitor) {
+  ICEBERG_DCHECK(expr != nullptr, "Expression cannot be null");
+
+  if (expr->is_bound_predicate()) {
+    return visitor.Predicate(std::dynamic_pointer_cast<BoundPredicate>(expr));
+  }
+
+  if (expr->is_unbound_predicate()) {
+    return 
visitor.Predicate(std::dynamic_pointer_cast<UnboundPredicate>(expr));
+  }
+
+  // TODO(gangwu): handle aggregate expression
+
+  switch (expr->op()) {
+    case Expression::Operation::kTrue:
+      return visitor.AlwaysTrue();
+    case Expression::Operation::kFalse:
+      return visitor.AlwaysFalse();
+    case Expression::Operation::kNot: {
+      const auto& not_expr = internal::checked_pointer_cast<Not>(expr);
+      ICEBERG_ASSIGN_OR_RAISE(auto child_result,
+                              (Visit<R, V>(not_expr->child(), visitor)));
+      return visitor.Not(std::move(child_result));
+    }
+    case Expression::Operation::kAnd: {
+      const auto& and_expr = internal::checked_pointer_cast<And>(expr);
+      ICEBERG_ASSIGN_OR_RAISE(auto left_result, (Visit<R, V>(and_expr->left(), 
visitor)));
+      ICEBERG_ASSIGN_OR_RAISE(auto right_result,
+                              (Visit<R, V>(and_expr->right(), visitor)));
+      return visitor.And(std::move(left_result), std::move(right_result));
+    }
+    case Expression::Operation::kOr: {
+      const auto& or_expr = internal::checked_pointer_cast<Or>(expr);
+      ICEBERG_ASSIGN_OR_RAISE(auto left_result, (Visit<R, V>(or_expr->left(), 
visitor)));
+      ICEBERG_ASSIGN_OR_RAISE(auto right_result,
+                              (Visit<R, V>(or_expr->right(), visitor)));
+      return visitor.Or(std::move(left_result), std::move(right_result));
+    }
+    default:
+      return InvalidExpression("Unknown expression operation: {}", 
expr->ToString());
+  }
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/expression/expressions.cc 
b/src/iceberg/expression/expressions.cc
index 7e3db18..b3e88ff 100644
--- a/src/iceberg/expression/expressions.cc
+++ b/src/iceberg/expression/expressions.cc
@@ -28,21 +28,7 @@ namespace iceberg {
 
 // Logical NOT operation
 std::shared_ptr<Expression> Expressions::Not(std::shared_ptr<Expression> 
child) {
-  if (child->op() == Expression::Operation::kTrue) {
-    return AlwaysFalse();
-  }
-
-  if (child->op() == Expression::Operation::kFalse) {
-    return AlwaysTrue();
-  }
-
-  // not(not(x)) = x
-  if (child->op() == Expression::Operation::kNot) {
-    const auto& not_expr = static_cast<const ::iceberg::Not&>(*child);
-    return not_expr.child();
-  }
-
-  ICEBERG_ASSIGN_OR_THROW(auto not_expr, iceberg::Not::Make(std::move(child)));
+  ICEBERG_ASSIGN_OR_THROW(auto not_expr, 
::iceberg::Not::MakeFolded(std::move(child)));
   return not_expr;
 }
 
@@ -97,275 +83,279 @@ std::shared_ptr<UnboundTransform> Expressions::Transform(
 
 // Template implementations for unary predicates
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNull(std::string name) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::IsNull(
+    std::string name) {
   return IsNull<BoundReference>(Ref(std::move(name)));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::IsNull(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::IsNull(
     std::shared_ptr<UnboundTerm<B>> expr) {
   ICEBERG_ASSIGN_OR_THROW(
       auto pred,
-      UnboundPredicate<B>::Make(Expression::Operation::kIsNull, 
std::move(expr)));
+      UnboundPredicateImpl<B>::Make(Expression::Operation::kIsNull, 
std::move(expr)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::NotNull(std::string name) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::NotNull(
+    std::string name) {
   return NotNull<BoundReference>(Ref(std::move(name)));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotNull(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotNull(
     std::shared_ptr<UnboundTerm<B>> expr) {
   ICEBERG_ASSIGN_OR_THROW(
       auto pred,
-      UnboundPredicate<B>::Make(Expression::Operation::kNotNull, 
std::move(expr)));
+      UnboundPredicateImpl<B>::Make(Expression::Operation::kNotNull, 
std::move(expr)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNaN(std::string name) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::IsNaN(
+    std::string name) {
   return IsNaN<BoundReference>(Ref(std::move(name)));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::IsNaN(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::IsNaN(
     std::shared_ptr<UnboundTerm<B>> expr) {
-  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicate<B>::Make(
+  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicateImpl<B>::Make(
                                          Expression::Operation::kIsNan, 
std::move(expr)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::NotNaN(std::string name) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::NotNaN(
+    std::string name) {
   return NotNaN<BoundReference>(Ref(std::move(name)));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotNaN(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotNaN(
     std::shared_ptr<UnboundTerm<B>> expr) {
   ICEBERG_ASSIGN_OR_THROW(
       auto pred,
-      UnboundPredicate<B>::Make(Expression::Operation::kNotNan, 
std::move(expr)));
+      UnboundPredicateImpl<B>::Make(Expression::Operation::kNotNan, 
std::move(expr)));
   return pred;
 }
 
 // Template implementations for comparison predicates
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::LessThan(std::string name,
-                                                                        
Literal value) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::LessThan(
+    std::string name, Literal value) {
   return LessThan<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::LessThan(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::LessThan(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kLt, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kLt,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::LessThanOrEqual(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
Expressions::LessThanOrEqual(
     std::string name, Literal value) {
   return LessThanOrEqual<BoundReference>(Ref(std::move(name)), 
std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::LessThanOrEqual(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::LessThanOrEqual(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kLtEq, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kLtEq,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::GreaterThan(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::GreaterThan(
     std::string name, Literal value) {
   return GreaterThan<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::GreaterThan(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::GreaterThan(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kGt, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kGt,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::GreaterThanOrEqual(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
Expressions::GreaterThanOrEqual(
     std::string name, Literal value) {
   return GreaterThanOrEqual<BoundReference>(Ref(std::move(name)), 
std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::GreaterThanOrEqual(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::GreaterThanOrEqual(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kGtEq, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kGtEq,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::Equal(std::string name,
-                                                                     Literal 
value) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
Expressions::Equal(std::string name,
+                                                                         
Literal value) {
   return Equal<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::Equal(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::Equal(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kEq, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kEq,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::NotEqual(std::string name,
-                                                                        
Literal value) {
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::NotEqual(
+    std::string name, Literal value) {
   return NotEqual<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotEqual(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotEqual(
     std::shared_ptr<UnboundTerm<B>> expr, Literal value) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kNotEq, 
std::move(expr),
-                                           std::move(value)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kNotEq,
+                                               std::move(expr), 
std::move(value)));
   return pred;
 }
 
 // String predicates
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::StartsWith(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::StartsWith(
     std::string name, std::string value) {
   return StartsWith<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::StartsWith(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::StartsWith(
     std::shared_ptr<UnboundTerm<B>> expr, std::string value) {
   ICEBERG_ASSIGN_OR_THROW(
       auto pred,
-      UnboundPredicate<B>::Make(Expression::Operation::kStartsWith, 
std::move(expr),
-                                Literal::String(std::move(value))));
+      UnboundPredicateImpl<B>::Make(Expression::Operation::kStartsWith, 
std::move(expr),
+                                    Literal::String(std::move(value))));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::NotStartsWith(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
Expressions::NotStartsWith(
     std::string name, std::string value) {
   return NotStartsWith<BoundReference>(Ref(std::move(name)), std::move(value));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotStartsWith(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotStartsWith(
     std::shared_ptr<UnboundTerm<B>> expr, std::string value) {
   ICEBERG_ASSIGN_OR_THROW(
       auto pred,
-      UnboundPredicate<B>::Make(Expression::Operation::kNotStartsWith, 
std::move(expr),
-                                Literal::String(std::move(value))));
+      UnboundPredicateImpl<B>::Make(Expression::Operation::kNotStartsWith,
+                                    std::move(expr), 
Literal::String(std::move(value))));
   return pred;
 }
 
 // Template implementations for set predicates
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::In(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::In(
     std::string name, std::vector<Literal> values) {
   return In<BoundReference>(Ref(std::move(name)), std::move(values));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> 
Expressions::In(std::shared_ptr<UnboundTerm<B>> expr,
-                                                     std::vector<Literal> 
values) {
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::In(
+    std::shared_ptr<UnboundTerm<B>> expr, std::vector<Literal> values) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kIn, 
std::move(expr),
-                                           std::move(values)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kIn,
+                                               std::move(expr), 
std::move(values)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::In(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::In(
     std::string name, std::initializer_list<Literal> values) {
   return In<BoundReference>(Ref(std::move(name)), 
std::vector<Literal>(values));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::In(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::In(
     std::shared_ptr<UnboundTerm<B>> expr, std::initializer_list<Literal> 
values) {
   return In<B>(std::move(expr), std::vector<Literal>(values));
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::NotIn(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::NotIn(
     std::string name, std::vector<Literal> values) {
   return NotIn<BoundReference>(Ref(std::move(name)), std::move(values));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotIn(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotIn(
     std::shared_ptr<UnboundTerm<B>> expr, std::vector<Literal> values) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(Expression::Operation::kNotIn, 
std::move(expr),
-                                           std::move(values)));
+      auto pred, UnboundPredicateImpl<B>::Make(Expression::Operation::kNotIn,
+                                               std::move(expr), 
std::move(values)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::NotIn(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::NotIn(
     std::string name, std::initializer_list<Literal> values) {
   return NotIn<BoundReference>(Ref(std::move(name)), 
std::vector<Literal>(values));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::NotIn(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::NotIn(
     std::shared_ptr<UnboundTerm<B>> expr, std::initializer_list<Literal> 
values) {
   return NotIn<B>(expr, std::vector<Literal>(values));
 }
 
 // Template implementations for generic predicate factory
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::Predicate(
     Expression::Operation op, std::string name, Literal value) {
-  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicate<BoundReference>::Make(
+  ICEBERG_ASSIGN_OR_THROW(auto pred, 
UnboundPredicateImpl<BoundReference>::Make(
                                          op, Ref(std::move(name)), 
std::move(value)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::Predicate(
     Expression::Operation op, std::string name, std::vector<Literal> values) {
-  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicate<BoundReference>::Make(
+  ICEBERG_ASSIGN_OR_THROW(auto pred, 
UnboundPredicateImpl<BoundReference>::Make(
                                          op, Ref(std::move(name)), 
std::move(values)));
   return pred;
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::Predicate(
     Expression::Operation op, std::string name, std::initializer_list<Literal> 
values) {
   return Predicate(op, name, std::vector<Literal>(values));
 }
 
-std::shared_ptr<UnboundPredicate<BoundReference>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<BoundReference>> Expressions::Predicate(
     Expression::Operation op, std::string name) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<BoundReference>::Make(op, 
Ref(std::move(name))));
+      auto pred, UnboundPredicateImpl<BoundReference>::Make(op, 
Ref(std::move(name))));
   return pred;
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::Predicate(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr,
     std::vector<Literal> values) {
   ICEBERG_ASSIGN_OR_THROW(
-      auto pred, UnboundPredicate<B>::Make(op, std::move(expr), 
std::move(values)));
+      auto pred, UnboundPredicateImpl<B>::Make(op, std::move(expr), 
std::move(values)));
   return pred;
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::Predicate(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr,
     std::initializer_list<Literal> values) {
   return Predicate<B>(op, std::move(expr), std::vector<Literal>(values));
 }
 
 template <typename B>
-std::shared_ptr<UnboundPredicate<B>> Expressions::Predicate(
+std::shared_ptr<UnboundPredicateImpl<B>> Expressions::Predicate(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr) {
-  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicate<B>::Make(op, 
std::move(expr)));
+  ICEBERG_ASSIGN_OR_THROW(auto pred, UnboundPredicateImpl<B>::Make(op, 
std::move(expr)));
   return pred;
 }
 
diff --git a/src/iceberg/expression/expressions.h 
b/src/iceberg/expression/expressions.h
index 13895bd..cf7b6d2 100644
--- a/src/iceberg/expression/expressions.h
+++ b/src/iceberg/expression/expressions.h
@@ -50,26 +50,10 @@ class ICEBERG_EXPORT Expressions {
                                          Args&&... args)
     requires std::conjunction_v<std::is_same<Args, 
std::shared_ptr<Expression>>...>
   {
-    if constexpr (sizeof...(args) == 0) {
-      if (left->op() == Expression::Operation::kFalse ||
-          right->op() == Expression::Operation::kFalse) {
-        return AlwaysFalse();
-      }
-
-      if (left->op() == Expression::Operation::kTrue) {
-        return right;
-      }
-
-      if (right->op() == Expression::Operation::kTrue) {
-        return left;
-      }
-
-      ICEBERG_ASSIGN_OR_THROW(auto and_expr,
-                              iceberg::And::Make(std::move(left), 
std::move(right)));
-      return and_expr;
-    } else {
-      return And(And(std::move(left), std::move(right)), 
std::forward<Args>(args)...);
-    }
+    ICEBERG_ASSIGN_OR_THROW(auto and_expr,
+                            iceberg::And::MakeFolded(std::move(left), 
std::move(right),
+                                                     
std::forward<Args>(args)...));
+    return and_expr;
   }
 
   /// \brief Create an OR expression.
@@ -78,26 +62,10 @@ class ICEBERG_EXPORT Expressions {
                                         std::shared_ptr<Expression> right, 
Args&&... args)
     requires std::conjunction_v<std::is_same<Args, 
std::shared_ptr<Expression>>...>
   {
-    if constexpr (sizeof...(args) == 0) {
-      if (left->op() == Expression::Operation::kTrue ||
-          right->op() == Expression::Operation::kTrue) {
-        return AlwaysTrue();
-      }
-
-      if (left->op() == Expression::Operation::kFalse) {
-        return right;
-      }
-
-      if (right->op() == Expression::Operation::kFalse) {
-        return left;
-      }
-
-      ICEBERG_ASSIGN_OR_THROW(auto or_expr,
-                              iceberg::Or::Make(std::move(left), 
std::move(right)));
-      return or_expr;
-    } else {
-      return Or(Or(std::move(left), std::move(right)), 
std::forward<Args>(args)...);
-    }
+    ICEBERG_ASSIGN_OR_THROW(auto or_expr,
+                            iceberg::Or::MakeFolded(std::move(left), 
std::move(right),
+                                                    
std::forward<Args>(args)...));
+    return or_expr;
   }
 
   /// \brief Create a NOT expression.
@@ -136,183 +104,184 @@ class ICEBERG_EXPORT Expressions {
   // Unary predicates
 
   /// \brief Create an IS NULL predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> IsNull(std::string 
name);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
IsNull(std::string name);
 
   /// \brief Create an IS NULL predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> IsNull(
+  static std::shared_ptr<UnboundPredicateImpl<B>> IsNull(
       std::shared_ptr<UnboundTerm<B>> expr);
 
   /// \brief Create a NOT NULL predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> NotNull(std::string 
name);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
NotNull(std::string name);
 
   /// \brief Create a NOT NULL predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> NotNull(
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotNull(
       std::shared_ptr<UnboundTerm<B>> expr);
 
   /// \brief Create an IS NaN predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> IsNaN(std::string 
name);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
IsNaN(std::string name);
 
   /// \brief Create an IS NaN predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> 
IsNaN(std::shared_ptr<UnboundTerm<B>> expr);
+  static std::shared_ptr<UnboundPredicateImpl<B>> IsNaN(
+      std::shared_ptr<UnboundTerm<B>> expr);
 
   /// \brief Create a NOT NaN predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> NotNaN(std::string 
name);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
NotNaN(std::string name);
 
   /// \brief Create a NOT NaN predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> NotNaN(
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotNaN(
       std::shared_ptr<UnboundTerm<B>> expr);
 
   // Comparison predicates
 
   /// \brief Create a less than predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> 
LessThan(std::string name,
-                                                                    Literal 
value);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
LessThan(std::string name,
+                                                                        
Literal value);
 
   /// \brief Create a less than predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> LessThan(
+  static std::shared_ptr<UnboundPredicateImpl<B>> LessThan(
       std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   /// \brief Create a less than or equal predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> LessThanOrEqual(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> LessThanOrEqual(
       std::string name, Literal value);
 
   /// \brief Create a less than or equal predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> LessThanOrEqual(
+  static std::shared_ptr<UnboundPredicateImpl<B>> LessThanOrEqual(
       std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   /// \brief Create a greater than predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> 
GreaterThan(std::string name,
-                                                                       Literal 
value);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> GreaterThan(
+      std::string name, Literal value);
 
   /// \brief Create a greater than predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> GreaterThan(
+  static std::shared_ptr<UnboundPredicateImpl<B>> GreaterThan(
       std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   /// \brief Create a greater than or equal predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> GreaterThanOrEqual(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
GreaterThanOrEqual(
       std::string name, Literal value);
 
   /// \brief Create a greater than or equal predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> GreaterThanOrEqual(
+  static std::shared_ptr<UnboundPredicateImpl<B>> GreaterThanOrEqual(
       std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   /// \brief Create an equal predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> Equal(std::string 
name,
-                                                                 Literal 
value);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
Equal(std::string name,
+                                                                     Literal 
value);
 
   /// \brief Create an equal predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> 
Equal(std::shared_ptr<UnboundTerm<B>> expr,
-                                                    Literal value);
+  static std::shared_ptr<UnboundPredicateImpl<B>> Equal(
+      std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   /// \brief Create a not equal predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> 
NotEqual(std::string name,
-                                                                    Literal 
value);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> 
NotEqual(std::string name,
+                                                                        
Literal value);
 
   /// \brief Create a not equal predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> NotEqual(
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotEqual(
       std::shared_ptr<UnboundTerm<B>> expr, Literal value);
 
   // String predicates
 
   /// \brief Create a starts with predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> 
StartsWith(std::string name,
-                                                                      
std::string value);
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> StartsWith(
+      std::string name, std::string value);
 
   /// \brief Create a starts with predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> StartsWith(
+  static std::shared_ptr<UnboundPredicateImpl<B>> StartsWith(
       std::shared_ptr<UnboundTerm<B>> expr, std::string value);
 
   /// \brief Create a not starts with predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> NotStartsWith(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> NotStartsWith(
       std::string name, std::string value);
 
   /// \brief Create a not starts with predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> NotStartsWith(
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotStartsWith(
       std::shared_ptr<UnboundTerm<B>> expr, std::string value);
 
   // Set predicates
 
   /// \brief Create an IN predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> In(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> In(
       std::string name, std::vector<Literal> values);
 
   /// \brief Create an IN predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> 
In(std::shared_ptr<UnboundTerm<B>> expr,
-                                                 std::vector<Literal> values);
+  static std::shared_ptr<UnboundPredicateImpl<B>> 
In(std::shared_ptr<UnboundTerm<B>> expr,
+                                                     std::vector<Literal> 
values);
 
   /// \brief Create an IN predicate for a field name with initializer list.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> In(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> In(
       std::string name, std::initializer_list<Literal> values);
 
   /// \brief Create an IN predicate for an unbound term with initializer list.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> 
In(std::shared_ptr<UnboundTerm<B>> expr,
-                                                 
std::initializer_list<Literal> values);
+  static std::shared_ptr<UnboundPredicateImpl<B>> In(
+      std::shared_ptr<UnboundTerm<B>> expr, std::initializer_list<Literal> 
values);
 
   /// \brief Create a NOT IN predicate for a field name.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> NotIn(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> NotIn(
       std::string name, std::vector<Literal> values);
 
   /// \brief Create a NOT IN predicate for an unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> 
NotIn(std::shared_ptr<UnboundTerm<B>> expr,
-                                                    std::vector<Literal> 
values);
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotIn(
+      std::shared_ptr<UnboundTerm<B>> expr, std::vector<Literal> values);
 
   /// \brief Create a NOT IN predicate for a field name with initializer list.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> NotIn(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> NotIn(
       std::string name, std::initializer_list<Literal> values);
 
   /// \brief Create a NOT IN predicate for an unbound term with initializer 
list.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> NotIn(
+  static std::shared_ptr<UnboundPredicateImpl<B>> NotIn(
       std::shared_ptr<UnboundTerm<B>> expr, std::initializer_list<Literal> 
values);
 
   // Generic predicate factory
 
   /// \brief Create a predicate with operation and single value.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> Predicate(
       Expression::Operation op, std::string name, Literal value);
 
   /// \brief Create a predicate with operation and multiple values.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> Predicate(
       Expression::Operation op, std::string name, std::vector<Literal> values);
 
   /// \brief Create a predicate with operation and multiple values.
-  static std::shared_ptr<UnboundPredicate<BoundReference>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> Predicate(
       Expression::Operation op, std::string name, 
std::initializer_list<Literal> values);
 
   /// \brief Create a unary predicate (no values).
-  static std::shared_ptr<UnboundPredicate<BoundReference>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<BoundReference>> Predicate(
       Expression::Operation op, std::string name);
 
   /// \brief Create a predicate for unbound term with multiple values.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<B>> Predicate(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr,
       std::vector<Literal> values);
 
   /// \brief Create a predicate with operation and multiple values.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<B>> Predicate(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr,
       std::initializer_list<Literal> values);
 
   /// \brief Create a unary predicate for unbound term.
   template <typename B>
-  static std::shared_ptr<UnboundPredicate<B>> Predicate(
+  static std::shared_ptr<UnboundPredicateImpl<B>> Predicate(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> expr);
 
   // Constants
diff --git a/src/iceberg/expression/meson.build 
b/src/iceberg/expression/meson.build
index 722057c..8300590 100644
--- a/src/iceberg/expression/meson.build
+++ b/src/iceberg/expression/meson.build
@@ -15,4 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-install_headers(['expression.h', 'literal.h'], subdir: 'iceberg/expression')
+install_headers(
+    [
+        'binder.h',
+        'expression.h',
+        'expression_visitor.h',
+        'expressions.h',
+        'literal.h',
+        'predicate.h',
+        'rewrite_not.h',
+        'term.h',
+    ],
+    subdir: 'iceberg/expression',
+)
diff --git a/src/iceberg/expression/predicate.cc 
b/src/iceberg/expression/predicate.cc
index c255d75..44543ec 100644
--- a/src/iceberg/expression/predicate.cc
+++ b/src/iceberg/expression/predicate.cc
@@ -45,67 +45,68 @@ Predicate<T>::~Predicate() = default;
 
 // UnboundPredicate template implementations
 template <typename B>
-Result<std::unique_ptr<UnboundPredicate<B>>> UnboundPredicate<B>::Make(
+Result<std::unique_ptr<UnboundPredicateImpl<B>>> UnboundPredicateImpl<B>::Make(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) {
   if (!term) [[unlikely]] {
     return InvalidExpression("UnboundPredicate cannot have null term");
   }
-  return std::unique_ptr<UnboundPredicate<B>>(
-      new UnboundPredicate<B>(op, std::move(term)));
+  return std::unique_ptr<UnboundPredicateImpl<B>>(
+      new UnboundPredicateImpl<B>(op, std::move(term)));
 }
 
 template <typename B>
-Result<std::unique_ptr<UnboundPredicate<B>>> UnboundPredicate<B>::Make(
+Result<std::unique_ptr<UnboundPredicateImpl<B>>> UnboundPredicateImpl<B>::Make(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term, Literal 
value) {
   if (!term) [[unlikely]] {
     return InvalidExpression("UnboundPredicate cannot have null term");
   }
-  return std::unique_ptr<UnboundPredicate<B>>(
-      new UnboundPredicate<B>(op, std::move(term), std::move(value)));
+  return std::unique_ptr<UnboundPredicateImpl<B>>(
+      new UnboundPredicateImpl<B>(op, std::move(term), std::move(value)));
 }
 
 template <typename B>
-Result<std::unique_ptr<UnboundPredicate<B>>> UnboundPredicate<B>::Make(
+Result<std::unique_ptr<UnboundPredicateImpl<B>>> UnboundPredicateImpl<B>::Make(
     Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term,
     std::vector<Literal> values) {
   if (!term) [[unlikely]] {
     return InvalidExpression("UnboundPredicate cannot have null term");
   }
-  return std::unique_ptr<UnboundPredicate<B>>(
-      new UnboundPredicate<B>(op, std::move(term), std::move(values)));
+  return std::unique_ptr<UnboundPredicateImpl<B>>(
+      new UnboundPredicateImpl<B>(op, std::move(term), std::move(values)));
 }
 
 template <typename B>
-UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
-                                      std::shared_ptr<UnboundTerm<B>> term)
+UnboundPredicateImpl<B>::UnboundPredicateImpl(Expression::Operation op,
+                                              std::shared_ptr<UnboundTerm<B>> 
term)
     : BASE(op, std::move(term)) {
   ICEBERG_DCHECK(BASE::term() != nullptr, "UnboundPredicate cannot have null 
term");
 }
 
 template <typename B>
-UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
-                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+UnboundPredicateImpl<B>::UnboundPredicateImpl(Expression::Operation op,
+                                              std::shared_ptr<UnboundTerm<B>> 
term,
+                                              Literal value)
     : BASE(op, std::move(term)), values_{std::move(value)} {
   ICEBERG_DCHECK(BASE::term() != nullptr, "UnboundPredicate cannot have null 
term");
 }
 
 template <typename B>
-UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
-                                      std::shared_ptr<UnboundTerm<B>> term,
-                                      std::vector<Literal> values)
+UnboundPredicateImpl<B>::UnboundPredicateImpl(Expression::Operation op,
+                                              std::shared_ptr<UnboundTerm<B>> 
term,
+                                              std::vector<Literal> values)
     : BASE(op, std::move(term)), values_(std::move(values)) {
   ICEBERG_DCHECK(BASE::term() != nullptr, "UnboundPredicate cannot have null 
term");
 }
 
 template <typename B>
-UnboundPredicate<B>::~UnboundPredicate() = default;
+UnboundPredicateImpl<B>::~UnboundPredicateImpl() = default;
 
 namespace {}
 
 template <typename B>
-std::string UnboundPredicate<B>::ToString() const {
+std::string UnboundPredicateImpl<B>::ToString() const {
   auto invalid_predicate_string = [](Expression::Operation op) {
-    return std::format("Invalid predicate: operation = {}", op);
+    return std::format("Invalid predicate: operation = {}", 
::iceberg::ToString(op));
   };
 
   const auto& term = *BASE::term();
@@ -154,14 +155,14 @@ std::string UnboundPredicate<B>::ToString() const {
 }
 
 template <typename B>
-Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+Result<std::shared_ptr<Expression>> UnboundPredicateImpl<B>::Negate() const {
   ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
-  return UnboundPredicate::Make(negated_op, BASE::term(), values_);
+  return UnboundPredicateImpl::Make(negated_op, BASE::term(), values_);
 }
 
 template <typename B>
-Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
-                                                              bool 
case_sensitive) const {
+Result<std::shared_ptr<Expression>> UnboundPredicateImpl<B>::Bind(
+    const Schema& schema, bool case_sensitive) const {
   ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
 
   if (values_.empty()) {
@@ -205,7 +206,7 @@ bool StartsWith(const Literal& lhs, const Literal& rhs) {
 }  // namespace
 
 template <typename B>
-Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+Result<std::shared_ptr<Expression>> 
UnboundPredicateImpl<B>::BindUnaryOperation(
     std::shared_ptr<B> bound_term) const {
   switch (BASE::op()) {
     case Expression::Operation::kIsNull:
@@ -235,7 +236,7 @@ Result<std::shared_ptr<Expression>> 
UnboundPredicate<B>::BindUnaryOperation(
 }
 
 template <typename B>
-Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+Result<std::shared_ptr<Expression>> 
UnboundPredicateImpl<B>::BindLiteralOperation(
     std::shared_ptr<B> bound_term) const {
   if (BASE::op() == Expression::Operation::kStartsWith ||
       BASE::op() == Expression::Operation::kNotStartsWith) {
@@ -292,7 +293,7 @@ Result<std::shared_ptr<Expression>> 
UnboundPredicate<B>::BindLiteralOperation(
 }
 
 template <typename B>
-Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindInOperation(
+Result<std::shared_ptr<Expression>> UnboundPredicateImpl<B>::BindInOperation(
     std::shared_ptr<B> bound_term) const {
   std::vector<Literal> converted_literals;
   for (const auto& literal : values_) {
@@ -638,7 +639,7 @@ template class Predicate<UnboundTerm<BoundReference>>;
 template class Predicate<UnboundTerm<BoundTransform>>;
 template class Predicate<BoundTerm>;
 
-template class UnboundPredicate<BoundReference>;
-template class UnboundPredicate<BoundTransform>;
+template class UnboundPredicateImpl<BoundReference>;
+template class UnboundPredicateImpl<BoundTransform>;
 
 }  // namespace iceberg
diff --git a/src/iceberg/expression/predicate.h 
b/src/iceberg/expression/predicate.h
index b35de04..2939376 100644
--- a/src/iceberg/expression/predicate.h
+++ b/src/iceberg/expression/predicate.h
@@ -38,7 +38,7 @@ concept TermType = std::derived_from<T, Term>;
 ///
 /// \tparam TermType The type of the term being tested
 template <TermType T>
-class ICEBERG_EXPORT Predicate : public Expression {
+class ICEBERG_EXPORT Predicate : public virtual Expression {
  public:
   ~Predicate() override;
 
@@ -58,13 +58,38 @@ class ICEBERG_EXPORT Predicate : public Expression {
   std::shared_ptr<T> term_;
 };
 
+/// \brief Non-template base class for all UnboundPredicate instances.
+///
+/// This class enables type erasure for template-based UnboundPredicate 
classes,
+/// allowing them to be used in non-template visitor interfaces.
+class ICEBERG_EXPORT UnboundPredicate : public virtual Expression,
+                                        public Unbound<Expression> {
+ public:
+  ~UnboundPredicate() override = default;
+
+  /// \brief Returns the reference of this UnboundPredicate.
+  std::shared_ptr<NamedReference> reference() override = 0;
+
+  /// \brief Bind this UnboundPredicate.
+  Result<std::shared_ptr<Expression>> Bind(const Schema& schema,
+                                           bool case_sensitive) const override 
= 0;
+
+  /// \brief Negate this UnboundPredicate.
+  Result<std::shared_ptr<Expression>> Negate() const override = 0;
+
+  bool is_unbound_predicate() const override { return true; }
+
+ protected:
+  UnboundPredicate() = default;
+};
+
 /// \brief Unbound predicates contain unbound terms and must be bound to a 
concrete schema
 /// before they can be evaluated.
 ///
 /// \tparam B The bound type this predicate produces when binding is successful
 template <typename B>
-class ICEBERG_EXPORT UnboundPredicate : public Predicate<UnboundTerm<B>>,
-                                        public Unbound<Expression> {
+class ICEBERG_EXPORT UnboundPredicateImpl : public UnboundPredicate,
+                                            public Predicate<UnboundTerm<B>> {
   using BASE = Predicate<UnboundTerm<B>>;
 
  public:
@@ -73,7 +98,7 @@ class ICEBERG_EXPORT UnboundPredicate : public 
Predicate<UnboundTerm<B>>,
   /// \param op The operation (kIsNull, kNotNull, kIsNan, kNotNan)
   /// \param term The unbound term
   /// \return Result containing the unbound predicate or an error
-  static Result<std::unique_ptr<UnboundPredicate>> Make(
+  static Result<std::unique_ptr<UnboundPredicateImpl<B>>> Make(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term);
 
   /// \brief Create an unbound predicate with a single value.
@@ -82,7 +107,7 @@ class ICEBERG_EXPORT UnboundPredicate : public 
Predicate<UnboundTerm<B>>,
   /// \param term The unbound term
   /// \param value The literal value
   /// \return Result containing the unbound predicate or an error
-  static Result<std::unique_ptr<UnboundPredicate>> Make(
+  static Result<std::unique_ptr<UnboundPredicateImpl<B>>> Make(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term, Literal 
value);
 
   /// \brief Create an unbound predicate with multiple values.
@@ -91,11 +116,11 @@ class ICEBERG_EXPORT UnboundPredicate : public 
Predicate<UnboundTerm<B>>,
   /// \param term The unbound term
   /// \param values Vector of literal values
   /// \return Result containing the unbound predicate or an error
-  static Result<std::unique_ptr<UnboundPredicate>> Make(
+  static Result<std::unique_ptr<UnboundPredicateImpl<B>>> Make(
       Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term,
       std::vector<Literal> values);
 
-  ~UnboundPredicate() override;
+  ~UnboundPredicateImpl() override;
 
   std::shared_ptr<NamedReference> reference() override {
     return BASE::term()->reference();
@@ -103,18 +128,17 @@ class ICEBERG_EXPORT UnboundPredicate : public 
Predicate<UnboundTerm<B>>,
 
   std::string ToString() const override;
 
-  /// \brief Bind this UnboundPredicate.
   Result<std::shared_ptr<Expression>> Bind(const Schema& schema,
                                            bool case_sensitive) const override;
 
   Result<std::shared_ptr<Expression>> Negate() const override;
 
  private:
-  UnboundPredicate(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> 
term);
-  UnboundPredicate(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> 
term,
-                   Literal value);
-  UnboundPredicate(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> 
term,
-                   std::vector<Literal> values);
+  UnboundPredicateImpl(Expression::Operation op, 
std::shared_ptr<UnboundTerm<B>> term);
+  UnboundPredicateImpl(Expression::Operation op, 
std::shared_ptr<UnboundTerm<B>> term,
+                       Literal value);
+  UnboundPredicateImpl(Expression::Operation op, 
std::shared_ptr<UnboundTerm<B>> term,
+                       std::vector<Literal> values);
 
   Result<std::shared_ptr<Expression>> BindUnaryOperation(
       std::shared_ptr<B> bound_term) const;
@@ -140,6 +164,8 @@ class ICEBERG_EXPORT BoundPredicate : public 
Predicate<BoundTerm>, public Bound
 
   Result<Literal> Evaluate(const StructLike& data) const override;
 
+  bool is_bound_predicate() const override { return true; }
+
   /// \brief Test a value against this predicate.
   ///
   /// \param value The literal value to test
diff --git a/src/iceberg/expression/rewrite_not.cc 
b/src/iceberg/expression/rewrite_not.cc
new file mode 100644
index 0000000..b3aa99e
--- /dev/null
+++ b/src/iceberg/expression/rewrite_not.cc
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/rewrite_not.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<Expression>> 
RewriteNot::Visit(std::shared_ptr<Expression> expr) {
+  ICEBERG_DCHECK(expr != nullptr, "Expression cannot be null");
+  RewriteNot visitor;
+  return iceberg::Visit<std::shared_ptr<Expression>, RewriteNot>(expr, 
visitor);
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::AlwaysTrue() { return 
True::Instance(); }
+
+Result<std::shared_ptr<Expression>> RewriteNot::AlwaysFalse() {
+  return False::Instance();
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::Not(
+    const std::shared_ptr<Expression>& child_result) {
+  return child_result->Negate();
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::And(
+    const std::shared_ptr<Expression>& left_result,
+    const std::shared_ptr<Expression>& right_result) {
+  return And::MakeFolded(left_result, right_result);
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::Or(
+    const std::shared_ptr<Expression>& left_result,
+    const std::shared_ptr<Expression>& right_result) {
+  return Or::MakeFolded(left_result, right_result);
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::Predicate(
+    const std::shared_ptr<BoundPredicate>& pred) {
+  return pred;
+}
+
+Result<std::shared_ptr<Expression>> RewriteNot::Predicate(
+    const std::shared_ptr<UnboundPredicate>& pred) {
+  return pred;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/expression/rewrite_not.h 
b/src/iceberg/expression/rewrite_not.h
new file mode 100644
index 0000000..9773920
--- /dev/null
+++ b/src/iceberg/expression/rewrite_not.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/rewrite_not.h
+/// Rewrite NOT expression to negated predicate.
+
+#include <memory>
+
+#include "iceberg/expression/expression_visitor.h"
+
+namespace iceberg {
+
+class ICEBERG_EXPORT RewriteNot : public 
ExpressionVisitor<std::shared_ptr<Expression>> {
+ public:
+  static Result<std::shared_ptr<Expression>> Visit(std::shared_ptr<Expression> 
expr);
+
+  Result<std::shared_ptr<Expression>> AlwaysTrue() override;
+  Result<std::shared_ptr<Expression>> AlwaysFalse() override;
+  Result<std::shared_ptr<Expression>> Not(
+      const std::shared_ptr<Expression>& child_result) override;
+  Result<std::shared_ptr<Expression>> And(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override;
+  Result<std::shared_ptr<Expression>> Or(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override;
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<BoundPredicate>& pred) override;
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<UnboundPredicate>& pred) override;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 1b24f85..3a4f888 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -42,10 +42,12 @@ iceberg_include_dir = include_directories('..')
 iceberg_sources = files(
     'arrow_c_data_guard_internal.cc',
     'catalog/memory/in_memory_catalog.cc',
+    'expression/binder.cc',
     'expression/expression.cc',
     'expression/expressions.cc',
     'expression/literal.cc',
     'expression/predicate.cc',
+    'expression/rewrite_not.cc',
     'expression/term.cc',
     'file_reader.cc',
     'file_writer.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 544e92f..f1eb77e 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -88,6 +88,7 @@ add_iceberg_test(table_test
 add_iceberg_test(expression_test
                  SOURCES
                  expression_test.cc
+                 expression_visitor_test.cc
                  literal_test.cc
                  predicate_test.cc)
 
diff --git a/src/iceberg/test/expression_visitor_test.cc 
b/src/iceberg/test/expression_visitor_test.cc
new file mode 100644
index 0000000..f2bbe70
--- /dev/null
+++ b/src/iceberg/test/expression_visitor_test.cc
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "iceberg/expression/binder.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/rewrite_not.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class ExpressionVisitorTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int64()),
+                                 SchemaField::MakeOptional(2, "name", 
string()),
+                                 SchemaField::MakeRequired(3, "age", int32()),
+                                 SchemaField::MakeOptional(4, "salary", 
float64()),
+                                 SchemaField::MakeRequired(5, "active", 
boolean())},
+        /*schema_id=*/0);
+  }
+
+  Result<std::shared_ptr<Expression>> Bind(const std::shared_ptr<Expression>& 
expr,
+                                           bool case_sensitive = true) {
+    return Binder::Bind(*schema_, expr, case_sensitive);
+  }
+
+  std::shared_ptr<Schema> schema_;
+};
+
+class BinderTest : public ExpressionVisitorTest {};
+
+TEST_F(BinderTest, UnaryPredicates) {
+  // Test IsNull
+  auto unbound_is_null = Expressions::IsNull("name");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_is_null, Bind(unbound_is_null));
+  EXPECT_EQ(bound_is_null->op(), Expression::Operation::kIsNull);
+  EXPECT_TRUE(bound_is_null->is_bound_predicate());
+  EXPECT_EQ(bound_is_null->ToString(), "is_null(ref(id=2, type=string))");
+
+  // Test NotNull
+  auto unbound_not_null = Expressions::NotNull("name");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_not_null, Bind(unbound_not_null));
+  EXPECT_EQ(bound_not_null->op(), Expression::Operation::kNotNull);
+  EXPECT_TRUE(bound_not_null->is_bound_predicate());
+  EXPECT_EQ(bound_not_null->ToString(), "not_null(ref(id=2, type=string))");
+
+  // Test IsNaN
+  auto unbound_is_nan = Expressions::IsNaN("salary");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_is_nan, Bind(unbound_is_nan));
+  EXPECT_EQ(bound_is_nan->op(), Expression::Operation::kIsNan);
+  EXPECT_TRUE(bound_is_nan->is_bound_predicate());
+  EXPECT_EQ(bound_is_nan->ToString(), "is_nan(ref(id=4, type=double))");
+
+  // Test NotNaN
+  auto unbound_not_nan = Expressions::NotNaN("salary");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_not_nan, Bind(unbound_not_nan));
+  EXPECT_EQ(bound_not_nan->op(), Expression::Operation::kNotNan);
+  EXPECT_TRUE(bound_not_nan->is_bound_predicate());
+  EXPECT_EQ(bound_not_nan->ToString(), "not_nan(ref(id=4, type=double))");
+}
+
+TEST_F(BinderTest, ComparisonPredicates) {
+  // Test LessThan
+  auto unbound_lt = Expressions::LessThan("age", Literal::Int(30));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_lt, Bind(unbound_lt));
+  EXPECT_EQ(bound_lt->op(), Expression::Operation::kLt);
+  EXPECT_TRUE(bound_lt->is_bound_predicate());
+  EXPECT_EQ(bound_lt->ToString(), "ref(id=3, type=int) < 30");
+
+  // Test LessThanOrEqual
+  auto unbound_lte = Expressions::LessThanOrEqual("age", Literal::Int(30));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_lte, Bind(unbound_lte));
+  EXPECT_EQ(bound_lte->op(), Expression::Operation::kLtEq);
+  EXPECT_TRUE(bound_lte->is_bound_predicate());
+  EXPECT_EQ(bound_lte->ToString(), "ref(id=3, type=int) <= 30");
+
+  // Test GreaterThan
+  auto unbound_gt = Expressions::GreaterThan("salary", 
Literal::Double(50000.0));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_gt, Bind(unbound_gt));
+  EXPECT_EQ(bound_gt->op(), Expression::Operation::kGt);
+  EXPECT_TRUE(bound_gt->is_bound_predicate());
+  EXPECT_EQ(bound_gt->ToString(), "ref(id=4, type=double) > 50000.000000");
+
+  // Test GreaterThanOrEqual
+  auto unbound_gte = Expressions::GreaterThanOrEqual("salary", 
Literal::Double(50000.0));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_gte, Bind(unbound_gte));
+  EXPECT_EQ(bound_gte->op(), Expression::Operation::kGtEq);
+  EXPECT_TRUE(bound_gte->is_bound_predicate());
+  EXPECT_EQ(bound_gte->ToString(), "ref(id=4, type=double) >= 50000.000000");
+
+  // Test Equal
+  auto unbound_eq = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_eq, Bind(unbound_eq));
+  EXPECT_EQ(bound_eq->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(bound_eq->is_bound_predicate());
+  EXPECT_EQ(bound_eq->ToString(), "ref(id=2, type=string) == \"Alice\"");
+
+  // Test NotEqual
+  auto unbound_neq = Expressions::NotEqual("name", Literal::String("Bob"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_neq, Bind(unbound_neq));
+  EXPECT_EQ(bound_neq->op(), Expression::Operation::kNotEq);
+  EXPECT_TRUE(bound_neq->is_bound_predicate());
+  EXPECT_EQ(bound_neq->ToString(), "ref(id=2, type=string) != \"Bob\"");
+}
+
+TEST_F(BinderTest, StringPredicates) {
+  // Test StartsWith
+  auto unbound_starts = Expressions::StartsWith("name", "Al");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_starts, Bind(unbound_starts));
+  EXPECT_EQ(bound_starts->op(), Expression::Operation::kStartsWith);
+  EXPECT_TRUE(bound_starts->is_bound_predicate());
+  EXPECT_EQ(bound_starts->ToString(), "ref(id=2, type=string) startsWith 
\"\"Al\"\"");
+
+  // Test NotStartsWith
+  auto unbound_not_starts = Expressions::NotStartsWith("name", "Bo");
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_not_starts, Bind(unbound_not_starts));
+  EXPECT_EQ(bound_not_starts->op(), Expression::Operation::kNotStartsWith);
+  EXPECT_TRUE(bound_not_starts->is_bound_predicate());
+  EXPECT_EQ(bound_not_starts->ToString(),
+            "ref(id=2, type=string) notStartsWith \"\"Bo\"\"");
+}
+
+TEST_F(BinderTest, SetPredicates) {
+  // Test In
+  auto unbound_in =
+      Expressions::In("age", {Literal::Int(25), Literal::Int(30), 
Literal::Int(35)});
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_in, Bind(unbound_in));
+  EXPECT_EQ(bound_in->op(), Expression::Operation::kIn);
+  EXPECT_TRUE(bound_in->is_bound_predicate());
+  EXPECT_THAT(bound_in->ToString(), testing::HasSubstr("ref(id=3, type=int) in 
("));
+
+  // Test NotIn
+  auto unbound_not_in = Expressions::NotIn("age", {Literal::Int(40), 
Literal::Int(45)});
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_not_in, Bind(unbound_not_in));
+  EXPECT_EQ(bound_not_in->op(), Expression::Operation::kNotIn);
+  EXPECT_TRUE(bound_not_in->is_bound_predicate());
+  EXPECT_THAT(bound_not_in->ToString(),
+              testing::HasSubstr("ref(id=3, type=int) not in ("));
+}
+
+TEST_F(BinderTest, Constants) {
+  // Test AlwaysTrue
+  auto true_expr = Expressions::AlwaysTrue();
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_true, Bind(true_expr));
+  EXPECT_EQ(bound_true->op(), Expression::Operation::kTrue);
+
+  // Test AlwaysFalse
+  auto false_expr = Expressions::AlwaysFalse();
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_false, Bind(false_expr));
+  EXPECT_EQ(bound_false->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(BinderTest, AndExpression) {
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  auto unbound_and = Expressions::And(pred1, pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_and, Bind(unbound_and));
+  EXPECT_EQ(bound_and->op(), Expression::Operation::kAnd);
+  EXPECT_EQ(bound_and->ToString(),
+            "(ref(id=2, type=string) == \"Alice\" and ref(id=3, type=int) > 
25)");
+
+  // Verify both children are bound
+  auto result = IsBoundVisitor::IsBound(bound_and);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_TRUE(result.value());
+}
+
+TEST_F(BinderTest, OrExpression) {
+  auto pred1 = Expressions::IsNull("name");
+  auto pred2 = Expressions::LessThan("salary", Literal::Double(30000.0));
+  auto unbound_or = Expressions::Or(pred1, pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_or, Bind(unbound_or));
+  EXPECT_EQ(bound_or->op(), Expression::Operation::kOr);
+  EXPECT_EQ(bound_or->ToString(),
+            "(is_null(ref(id=2, type=string)) or ref(id=4, type=double) < 
30000.000000)");
+
+  // Verify both children are bound
+  auto result = IsBoundVisitor::IsBound(bound_or);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_TRUE(result.value());
+}
+
+TEST_F(BinderTest, NotExpression) {
+  auto pred = Expressions::Equal("active", Literal::Boolean(true));
+  auto unbound_not = Expressions::Not(pred);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_not, Bind(unbound_not));
+  EXPECT_EQ(bound_not->op(), Expression::Operation::kNot);
+  EXPECT_EQ(bound_not->ToString(), "not(ref(id=5, type=boolean) == true)");
+
+  // Verify child is bound
+  auto result = IsBoundVisitor::IsBound(bound_not);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_TRUE(result.value());
+}
+
+TEST_F(BinderTest, ComplexNestedExpression) {
+  // (name = 'Alice' AND age > 25) OR (salary < 30000 AND active = true)
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  auto pred3 = Expressions::LessThan("salary", Literal::Double(30000.0));
+  auto pred4 = Expressions::Equal("active", Literal::Boolean(true));
+
+  auto and1 = Expressions::And(pred1, pred2);
+  auto and2 = Expressions::And(pred3, pred4);
+  auto complex_or = Expressions::Or(and1, and2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_complex, Bind(complex_or));
+  EXPECT_EQ(bound_complex->op(), Expression::Operation::kOr);
+
+  // Verify entire tree is bound
+  auto result = IsBoundVisitor::IsBound(bound_complex);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_TRUE(result.value());
+}
+
+TEST_F(BinderTest, CaseSensitive) {
+  // Create predicate with exact field name
+  auto pred_exact = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_exact, Bind(pred_exact, true));
+  EXPECT_EQ(bound_exact->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(bound_exact->is_bound_predicate());
+
+  // Create predicate with different case - should fail with case-sensitive 
binding
+  auto pred_wrong_case = Expressions::Equal("NAME", Literal::String("Alice"));
+  auto result_case_sensitive = Bind(pred_wrong_case, true);
+  EXPECT_THAT(result_case_sensitive, HasErrorMessage("NAME"));
+}
+
+TEST_F(BinderTest, CaseInsensitive) {
+  // Create predicate with different case
+  auto pred_upper = Expressions::Equal("NAME", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_upper, Bind(pred_upper, false));
+  EXPECT_EQ(bound_upper->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(bound_upper->is_bound_predicate());
+
+  // Create predicate with mixed case
+  auto pred_mixed = Expressions::Equal("NaMe", Literal::String("Bob"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_mixed, Bind(pred_mixed, false));
+  EXPECT_EQ(bound_mixed->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(bound_mixed->is_bound_predicate());
+}
+
+TEST_F(BinderTest, ErrorFieldNotFound) {
+  // Try to bind with non-existent field
+  auto pred_nonexistent =
+      Expressions::Equal("nonexistent_field", Literal::String("value"));
+  auto result = Bind(pred_nonexistent);
+  EXPECT_THAT(result, HasErrorMessage("Cannot find field 
'nonexistent_field'"));
+}
+
+TEST_F(BinderTest, ErrorAlreadyBound) {
+  // First bind the predicate
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+
+  // Try to bind it again - should fail
+  auto result = Bind(bound_pred);
+  EXPECT_THAT(result, HasErrorMessage("already bound"));
+}
+
+TEST_F(BinderTest, ErrorNestedUnboundField) {
+  // Create complex expression with one invalid field
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::Equal("invalid_field", Literal::String("value"));
+  auto complex_and = Expressions::And(pred1, pred2);
+
+  auto result = Bind(complex_and);
+  EXPECT_THAT(result, HasErrorMessage("invalid_field"));
+}
+
+class IsBoundVisitorTest : public ExpressionVisitorTest {};
+
+TEST_F(IsBoundVisitorTest, Constants) {
+  // True and False are always considered bound
+  auto true_expr = Expressions::AlwaysTrue();
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound_true, 
IsBoundVisitor::IsBound(true_expr));
+  EXPECT_TRUE(is_bound_true);
+
+  auto false_expr = Expressions::AlwaysFalse();
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound_false, 
IsBoundVisitor::IsBound(false_expr));
+  EXPECT_TRUE(is_bound_false);
+}
+
+TEST_F(IsBoundVisitorTest, UnboundPredicate) {
+  // Unbound predicates should return false
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(unbound_pred));
+  EXPECT_FALSE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, BoundPredicate) {
+  // Bound predicates should return true
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(bound_pred));
+  EXPECT_TRUE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, AndWithBoundChildren) {
+  // AND with all bound children should return true
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  auto unbound_and = Expressions::And(pred1, pred2);
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_and, Bind(unbound_and));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(bound_and));
+  EXPECT_TRUE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, AndWithUnboundChild) {
+  // AND with any unbound child should return false
+  auto bound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto pred1, Bind(bound_pred));
+  auto pred2 = Expressions::Equal("age", Literal::Int(25));  // unbound
+  auto mixed_and = Expressions::And(pred1, pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(mixed_and));
+  EXPECT_FALSE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, OrWithBoundChildren) {
+  // OR with all bound children should return true
+  auto pred1 = Expressions::IsNull("name");
+  auto pred2 = Expressions::LessThan("salary", Literal::Double(30000.0));
+  auto unbound_or = Expressions::Or(pred1, pred2);
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_or, Bind(unbound_or));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(bound_or));
+  EXPECT_TRUE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, OrWithUnboundChild) {
+  // OR with any unbound child should return false
+  auto pred1 = Expressions::IsNull("name");  // unbound
+  auto bound_pred2 = Expressions::Equal("age", Literal::Int(25));
+  ICEBERG_UNWRAP_OR_FAIL(auto pred2, Bind(bound_pred2));
+  auto mixed_or = Expressions::Or(pred1, pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(mixed_or));
+  EXPECT_FALSE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, NotWithBoundChild) {
+  // NOT with bound child should return true
+  auto unbound_pred = Expressions::Equal("active", Literal::Boolean(true));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+  auto not_expr = Expressions::Not(bound_pred);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(not_expr));
+  EXPECT_TRUE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, NotWithUnboundChild) {
+  // NOT with unbound child should return false
+  auto unbound_pred = Expressions::Equal("active", Literal::Boolean(true));
+  auto not_expr = Expressions::Not(unbound_pred);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, IsBoundVisitor::IsBound(not_expr));
+  EXPECT_FALSE(is_bound);
+}
+
+TEST_F(IsBoundVisitorTest, ComplexExpression) {
+  // Complex expression: all bound should return true
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  auto pred3 = Expressions::LessThan("salary", Literal::Double(30000.0));
+  auto and_expr = Expressions::And(pred1, pred2);
+  auto complex_or = Expressions::Or(and_expr, pred3);
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_complex, Bind(complex_or));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound, 
IsBoundVisitor::IsBound(bound_complex));
+  EXPECT_TRUE(is_bound);
+
+  // Complex expression: one unbound should return false
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred2, Bind(pred2));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred3, Bind(pred3));
+  auto mixed_and = Expressions::And(unbound_pred, bound_pred2);
+  auto mixed_complex = Expressions::Or(mixed_and, bound_pred3);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto is_bound_mixed, 
IsBoundVisitor::IsBound(mixed_complex));
+  EXPECT_FALSE(is_bound_mixed);
+}
+
+class RewriteNotTest : public ExpressionVisitorTest {};
+
+TEST_F(RewriteNotTest, Constants) {
+  // True remains True
+  auto true_expr = Expressions::AlwaysTrue();
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten_true, RewriteNot::Visit(true_expr));
+  EXPECT_EQ(rewritten_true->op(), Expression::Operation::kTrue);
+  EXPECT_TRUE(rewritten_true->Equals(*True::Instance()));
+
+  // False remains False
+  auto false_expr = Expressions::AlwaysFalse();
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten_false, RewriteNot::Visit(false_expr));
+  EXPECT_EQ(rewritten_false->op(), Expression::Operation::kFalse);
+  EXPECT_TRUE(rewritten_false->Equals(*False::Instance()));
+}
+
+TEST_F(RewriteNotTest, Predicates) {
+  // Bound predicates pass through unchanged
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(bound_pred));
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(rewritten->is_bound_predicate());
+
+  // Unbound predicates pass through unchanged
+  auto unbound_pred2 = Expressions::IsNull("salary");
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten_unbound, 
RewriteNot::Visit(unbound_pred2));
+  EXPECT_EQ(rewritten_unbound->op(), Expression::Operation::kIsNull);
+  EXPECT_TRUE(rewritten_unbound->is_unbound_predicate());
+}
+
+TEST_F(RewriteNotTest, NotExpression) {
+  // NOT(predicate) should be rewritten to negated predicate
+  auto unbound_pred = Expressions::Equal("name", Literal::String("Alice"));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+  auto not_expr = Expressions::Not(bound_pred);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(not_expr));
+  // Equal should be negated to NotEqual
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kNotEq);
+  EXPECT_TRUE(rewritten->is_bound_predicate());
+  EXPECT_EQ(rewritten->ToString(), "ref(id=2, type=string) != \"Alice\"");
+}
+
+TEST_F(RewriteNotTest, DoubleNegation) {
+  // NOT(NOT(predicate)) should be rewritten back to predicate
+  auto unbound_pred = Expressions::Equal("age", Literal::Int(25));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred));
+  auto not_expr = Expressions::Not(bound_pred);
+  auto double_not = Expressions::Not(not_expr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(double_not));
+  // Should be back to Equal
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kEq);
+  EXPECT_TRUE(rewritten->is_bound_predicate());
+  EXPECT_EQ(rewritten->ToString(), "ref(id=3, type=int) == 25");
+}
+
+TEST_F(RewriteNotTest, AndExpression) {
+  // AND expressions pass through (children are processed)
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred1, Bind(pred1));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred2, Bind(pred2));
+  auto and_expr = Expressions::And(bound_pred1, bound_pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(and_expr));
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kAnd);
+}
+
+TEST_F(RewriteNotTest, OrExpression) {
+  // OR expressions pass through (children are processed)
+  auto pred1 = Expressions::IsNull("name");
+  auto pred2 = Expressions::LessThan("salary", Literal::Double(30000.0));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred1, Bind(pred1));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred2, Bind(pred2));
+  auto or_expr = Expressions::Or(bound_pred1, bound_pred2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(or_expr));
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kOr);
+}
+
+TEST_F(RewriteNotTest, ComplexExpression) {
+  // Complex: NOT(pred1 AND NOT(pred2))
+  auto pred1 = Expressions::Equal("name", Literal::String("Alice"));
+  auto pred2 = Expressions::GreaterThan("age", Literal::Int(25));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred1, Bind(pred1));
+  ICEBERG_UNWRAP_OR_FAIL(auto bound_pred2, Bind(pred2));
+
+  auto not_pred2 = Expressions::Not(bound_pred2);
+  auto and_expr = Expressions::And(bound_pred1, not_pred2);
+  auto not_and = Expressions::Not(and_expr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto rewritten, RewriteNot::Visit(not_and));
+  // The outer NOT should push down via negation
+  // NOT(pred1 AND NOT(pred2)) becomes NOT(pred1) OR pred2
+  EXPECT_EQ(rewritten->op(), Expression::Operation::kOr);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 22ed4bd..6fbe82d 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -55,6 +55,7 @@ iceberg_tests = {
     'expression_test': {
         'sources': files(
             'expression_test.cc',
+            'expression_visitor_test.cc',
             'literal_test.cc',
             'predicate_test.cc',
         ),

Reply via email to