zeroshade commented on code in PR #177:
URL: https://github.com/apache/iceberg-cpp/pull/177#discussion_r2372848744


##########
src/iceberg/expression/expression.cc:
##########
@@ -87,4 +91,100 @@ bool Or::Equals(const Expression& expr) const {
   return false;
 }
 
+std::string_view ToString(Expression::Operation op) {
+  switch (op) {
+    case Expression::Operation::kAnd:
+      return "AND";
+    case Expression::Operation::kOr:
+      return "OR";
+    case Expression::Operation::kTrue:
+      return "TRUE";
+    case Expression::Operation::kFalse:
+      return "FALSE";
+    case Expression::Operation::kIsNull:
+      return "IS_NULL";
+    case Expression::Operation::kNotNull:
+      return "NOT_NULL";
+    case Expression::Operation::kIsNan:
+      return "IS_NAN";
+    case Expression::Operation::kNotNan:
+      return "NOT_NAN";
+    case Expression::Operation::kLt:
+      return "LT";
+    case Expression::Operation::kLtEq:
+      return "LT_EQ";
+    case Expression::Operation::kGt:
+      return "GT";
+    case Expression::Operation::kGtEq:
+      return "GT_EQ";
+    case Expression::Operation::kEq:
+      return "EQ";
+    case Expression::Operation::kNotEq:
+      return "NOT_EQ";
+    case Expression::Operation::kIn:
+      return "IN";
+    case Expression::Operation::kNotIn:
+      return "NOT_IN";
+    case Expression::Operation::kStartsWith:
+      return "STARTS_WITH";
+    case Expression::Operation::kNotStartsWith:
+      return "NOT_STARTS_WITH";
+    case Expression::Operation::kCount:
+      return "COUNT";
+    case Expression::Operation::kNot:
+      return "NOT";
+    case Expression::Operation::kCountStar:
+      return "COUNT_STAR";
+    case Expression::Operation::kMax:
+      return "MAX";
+    case Expression::Operation::kMin:
+      return "MIN";
+  }
+  std::unreachable();
+}
+
+Result<Expression::Operation> Negate(Expression::Operation op) {
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return Expression::Operation::kNotNull;
+    case Expression::Operation::kNotNull:
+      return Expression::Operation::kIsNull;
+    case Expression::Operation::kIsNan:
+      return Expression::Operation::kNotNan;
+    case Expression::Operation::kNotNan:
+      return Expression::Operation::kIsNan;
+    case Expression::Operation::kLt:
+      return Expression::Operation::kGtEq;
+    case Expression::Operation::kLtEq:
+      return Expression::Operation::kGt;
+    case Expression::Operation::kGt:
+      return Expression::Operation::kLtEq;
+    case Expression::Operation::kGtEq:
+      return Expression::Operation::kLt;
+    case Expression::Operation::kEq:
+      return Expression::Operation::kNotEq;
+    case Expression::Operation::kNotEq:
+      return Expression::Operation::kEq;
+    case Expression::Operation::kIn:
+      return Expression::Operation::kNotIn;
+    case Expression::Operation::kNotIn:
+      return Expression::Operation::kIn;
+    case Expression::Operation::kStartsWith:
+      return Expression::Operation::kNotStartsWith;
+    case Expression::Operation::kNotStartsWith:
+      return Expression::Operation::kStartsWith;
+    case Expression::Operation::kTrue:
+    case Expression::Operation::kFalse:
+    case Expression::Operation::kNot:

Review Comment:
   shouldn't there be negations for these?



##########
src/iceberg/expression/expression.cc:
##########
@@ -87,4 +91,100 @@ bool Or::Equals(const Expression& expr) const {
   return false;
 }
 
+std::string_view ToString(Expression::Operation op) {
+  switch (op) {
+    case Expression::Operation::kAnd:
+      return "AND";
+    case Expression::Operation::kOr:
+      return "OR";
+    case Expression::Operation::kTrue:
+      return "TRUE";
+    case Expression::Operation::kFalse:
+      return "FALSE";
+    case Expression::Operation::kIsNull:
+      return "IS_NULL";
+    case Expression::Operation::kNotNull:
+      return "NOT_NULL";
+    case Expression::Operation::kIsNan:
+      return "IS_NAN";
+    case Expression::Operation::kNotNan:
+      return "NOT_NAN";
+    case Expression::Operation::kLt:
+      return "LT";
+    case Expression::Operation::kLtEq:
+      return "LT_EQ";
+    case Expression::Operation::kGt:
+      return "GT";
+    case Expression::Operation::kGtEq:
+      return "GT_EQ";
+    case Expression::Operation::kEq:
+      return "EQ";
+    case Expression::Operation::kNotEq:
+      return "NOT_EQ";
+    case Expression::Operation::kIn:
+      return "IN";
+    case Expression::Operation::kNotIn:
+      return "NOT_IN";
+    case Expression::Operation::kStartsWith:
+      return "STARTS_WITH";
+    case Expression::Operation::kNotStartsWith:
+      return "NOT_STARTS_WITH";
+    case Expression::Operation::kCount:
+      return "COUNT";
+    case Expression::Operation::kNot:
+      return "NOT";
+    case Expression::Operation::kCountStar:
+      return "COUNT_STAR";
+    case Expression::Operation::kMax:
+      return "MAX";
+    case Expression::Operation::kMin:
+      return "MIN";

Review Comment:
   I don't think these are in the spec, are they?



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {

Review Comment:
   It's not necessary, but would be convenient (and come in handy) if you have 
a version of this which takes an arbitrary number of arguments and then 
collapses them into groups of `And(And(And(arg1, arg2), arg3), ....)`



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {
+  if (left->op() == Expression::Operation::kFalse ||
+      right->op() == Expression::Operation::kFalse) {
+    return AlwaysFalse();
+  }
+
+  if (left->op() == Expression::Operation::kTrue) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kTrue) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::And>(std::move(left), std::move(right));
+}
+
+std::shared_ptr<Expression> Expressions::Or(std::shared_ptr<Expression> left,
+                                            std::shared_ptr<Expression> right) 
{
+  if (left->op() == Expression::Operation::kTrue ||
+      right->op() == Expression::Operation::kTrue) {
+    return AlwaysTrue();
+  }
+
+  if (left->op() == Expression::Operation::kFalse) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kFalse) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::Or>(std::move(left), std::move(right));
+}
+
+// Transform functions
+
+std::shared_ptr<UnboundTransform> Expressions::Bucket(std::string name,
+                                                      int32_t num_buckets) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Bucket(num_buckets));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Year(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Year());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Month(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Month());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Day(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Day());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Hour(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Hour());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Truncate(std::string name, 
int32_t width) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Truncate(width));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Transform(
+    std::string name, std::shared_ptr<::iceberg::Transform> transform) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
std::move(transform));
+}
+
+// Template implementations for unary predicates
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNull(std::string name) {
+  return IsNull<BoundReference>(Ref(std::move(name)));
+}

Review Comment:
   this isn't a bound reference, shouldn't this be UnboundReference?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());

Review Comment:
   I believe other implementations just return Null in this case rather than 
error



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {
+  if (left->op() == Expression::Operation::kFalse ||
+      right->op() == Expression::Operation::kFalse) {
+    return AlwaysFalse();
+  }
+
+  if (left->op() == Expression::Operation::kTrue) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kTrue) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::And>(std::move(left), std::move(right));
+}
+
+std::shared_ptr<Expression> Expressions::Or(std::shared_ptr<Expression> left,
+                                            std::shared_ptr<Expression> right) 
{
+  if (left->op() == Expression::Operation::kTrue ||
+      right->op() == Expression::Operation::kTrue) {
+    return AlwaysTrue();
+  }
+
+  if (left->op() == Expression::Operation::kFalse) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kFalse) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::Or>(std::move(left), std::move(right));
+}
+
+// Transform functions
+
+std::shared_ptr<UnboundTransform> Expressions::Bucket(std::string name,
+                                                      int32_t num_buckets) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Bucket(num_buckets));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Year(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Year());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Month(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Month());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Day(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Day());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Hour(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Hour());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Truncate(std::string name, 
int32_t width) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Truncate(width));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Transform(
+    std::string name, std::shared_ptr<::iceberg::Transform> transform) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
std::move(transform));
+}
+
+// Template implementations for unary predicates
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNull(std::string name) {
+  return IsNull<BoundReference>(Ref(std::move(name)));
+}
+
+template <typename B>
+std::shared_ptr<UnboundPredicate<B>> Expressions::IsNull(
+    std::shared_ptr<UnboundTerm<B>> expr) {
+  return std::make_shared<UnboundPredicate<B>>(Expression::Operation::kIsNull,
+                                               std::move(expr));
+}
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::NotNull(std::string name) {
+  return NotNull<BoundReference>(Ref(std::move(name)));
+}
+
+template <typename B>
+std::shared_ptr<UnboundPredicate<B>> Expressions::NotNull(
+    std::shared_ptr<UnboundTerm<B>> expr) {
+  return std::make_shared<UnboundPredicate<B>>(Expression::Operation::kNotNull,
+                                               std::move(expr));
+}
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNaN(std::string name) {
+  return IsNaN<BoundReference>(Ref(std::move(name)));
+}

Review Comment:
   and so on....



##########
src/iceberg/expression/expressions.h:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/expressions.h
+/// Factory methods for creating expressions.
+
+#include <initializer_list>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/expression/predicate.h"
+#include "iceberg/expression/term.h"
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+/// \brief Factory methods for creating expressions.
+class ICEBERG_EXPORT Expressions {
+ public:
+  // Logical operations
+
+  /// \brief Create an AND expression.
+  static std::shared_ptr<Expression> And(std::shared_ptr<Expression> left,
+                                         std::shared_ptr<Expression> right);
+
+  /// \brief Create an OR expression.
+  static std::shared_ptr<Expression> Or(std::shared_ptr<Expression> left,
+                                        std::shared_ptr<Expression> right);
+
+  // Transform functions
+
+  /// \brief Create a bucket transform term.
+  static std::shared_ptr<UnboundTransform> Bucket(std::string name, int32_t 
num_buckets);
+
+  /// \brief Create a year transform term.
+  static std::shared_ptr<UnboundTransform> Year(std::string name);
+
+  /// \brief Create a month transform term.
+  static std::shared_ptr<UnboundTransform> Month(std::string name);
+
+  /// \brief Create a day transform term.
+  static std::shared_ptr<UnboundTransform> Day(std::string name);
+
+  /// \brief Create an hour transform term.
+  static std::shared_ptr<UnboundTransform> Hour(std::string name);
+
+  /// \brief Create a truncate transform term.
+  static std::shared_ptr<UnboundTransform> Truncate(std::string name, int32_t 
width);
+
+  /// \brief Create a transform expression.
+  static std::shared_ptr<UnboundTransform> Transform(
+      std::string name, std::shared_ptr<Transform> transform);
+
+  // Unary predicates
+
+  /// \brief Create an IS NULL predicate for a field name.
+  static std::shared_ptr<UnboundPredicate<BoundReference>> IsNull(std::string 
name);
+
+  /// \brief Create an IS NULL predicate for an unbound term.
+  template <typename B>
+  static std::shared_ptr<UnboundPredicate<B>> IsNull(
+      std::shared_ptr<UnboundTerm<B>> expr);
+
+  /// \brief Create a NOT NULL predicate for a field name.
+  static std::shared_ptr<UnboundPredicate<BoundReference>> NotNull(std::string 
name);

Review Comment:
   A field name in and of itself is not a bound reference until you bind it to 
a schema and have the bound type, etc.



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value but 
got {}",
+                             values_.size());
+  }
+
+  const auto& literal = values_[0];
+
+  if (literal.IsNull()) {
+    return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                             *bound_term->type(), literal.ToString(), 
*literal.type());

Review Comment:
   why is it invalid to create a null for a type?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value but 
got {}",
+                             values_.size());
+  }
+
+  const auto& literal = values_[0];
+
+  if (literal.IsNull()) {
+    return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                             *bound_term->type(), literal.ToString(), 
*literal.type());
+  } else if (literal.IsAboveMax()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  } else if (literal.IsBelowMin()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  }
+
+  // TODO(gangwu): translate truncate(col) == value to startsWith(value)
+  return std::make_shared<BoundLiteralPredicate>(BASE::op(), 
std::move(bound_term),
+                                                 literal);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindInOperation(
+    std::shared_ptr<B> bound_term) const {
+  std::vector<Literal> converted_literals;
+  for (const auto& literal : values_) {
+    auto primitive_type =
+        internal::checked_pointer_cast<PrimitiveType>(bound_term->type());
+    ICEBERG_ASSIGN_OR_RAISE(auto converted, literal.CastTo(primitive_type));
+    if (converted.IsNull()) {
+      return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                               *bound_term->type(), literal.ToString(), 
*literal.type());
+    }

Review Comment:
   why filter out BelowMin and AboveMax but error on the cast?



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value but 
got {}",
+                             values_.size());
+  }
+
+  const auto& literal = values_[0];
+
+  if (literal.IsNull()) {
+    return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                             *bound_term->type(), literal.ToString(), 
*literal.type());
+  } else if (literal.IsAboveMax()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  } else if (literal.IsBelowMin()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kGt:
+      case Expression::Operation::kGtEq:
+      case Expression::Operation::kNotEq:
+        return Expressions::AlwaysTrue();
+      case Expression::Operation::kLt:
+      case Expression::Operation::kLtEq:
+      case Expression::Operation::kEq:
+        return Expressions::AlwaysFalse();
+      default:
+        break;
+    }
+  }
+
+  // TODO(gangwu): translate truncate(col) == value to startsWith(value)
+  return std::make_shared<BoundLiteralPredicate>(BASE::op(), 
std::move(bound_term),
+                                                 literal);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindInOperation(
+    std::shared_ptr<B> bound_term) const {
+  std::vector<Literal> converted_literals;
+  for (const auto& literal : values_) {
+    auto primitive_type =
+        internal::checked_pointer_cast<PrimitiveType>(bound_term->type());
+    ICEBERG_ASSIGN_OR_RAISE(auto converted, literal.CastTo(primitive_type));
+    if (converted.IsNull()) {
+      return InvalidExpression("Invalid value for conversion to type {}: {} 
({})",
+                               *bound_term->type(), literal.ToString(), 
*literal.type());
+    }
+    // Filter out literals that are out of range after conversion.
+    if (!converted.IsBelowMin() && !converted.IsAboveMax()) {
+      converted_literals.push_back(std::move(converted));
+    }
+  }
+
+  // If no valid literals remain after conversion and filtering
+  if (converted_literals.empty()) {
+    switch (BASE::op()) {
+      case Expression::Operation::kIn:
+        return Expressions::AlwaysFalse();
+      case Expression::Operation::kNotIn:
+        return Expressions::AlwaysTrue();
+      default:
+        return InvalidExpression("Operation must be IN or NOT_IN");
+    }
+  }
+
+  // If only one unique literal remains, convert to equality/inequality
+  if (converted_literals.size() == 1) {
+    const auto& single_literal = converted_literals[0];
+    switch (BASE::op()) {
+      case Expression::Operation::kIn:
+        return std::make_shared<BoundLiteralPredicate>(
+            Expression::Operation::kEq, std::move(bound_term), single_literal);
+      case Expression::Operation::kNotIn:
+        return std::make_shared<BoundLiteralPredicate>(
+            Expression::Operation::kNotEq, std::move(bound_term), 
single_literal);
+      default:
+        return InvalidExpression("Operation must be IN or NOT_IN");
+    }
+  }
+
+  // Multiple literals - create a set predicate
+  return std::make_shared<BoundSetPredicate>(
+      BASE::op(), std::move(bound_term), std::span<const 
Literal>(converted_literals));
+}
+
+// BoundPredicate implementation
+BoundPredicate::BoundPredicate(Expression::Operation op, 
std::shared_ptr<BoundTerm> term)
+    : Predicate<BoundTerm>(op, std::move(term)) {}
+
+BoundPredicate::~BoundPredicate() = default;
+
+Result<Literal::Value> BoundPredicate::Evaluate(const StructLike& data) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto eval_result, term_->Evaluate(data));
+  ICEBERG_ASSIGN_OR_RAISE(auto test_result, Test(eval_result));
+  return Literal::Value{test_result};
+}
+
+// BoundUnaryPredicate implementation
+BoundUnaryPredicate::BoundUnaryPredicate(Expression::Operation op,
+                                         std::shared_ptr<BoundTerm> term)
+    : BoundPredicate(op, std::move(term)) {}
+
+BoundUnaryPredicate::~BoundUnaryPredicate() = default;
+
+Result<bool> BoundUnaryPredicate::Test(const Literal::Value& value) const {
+  return NotImplemented("BoundUnaryPredicate::Test not implemented");
+}
+
+bool BoundUnaryPredicate::Equals(const Expression& other) const {
+  throw IcebergError("BoundUnaryPredicate::Equals not implemented");
+}
+
+std::string BoundUnaryPredicate::ToString() const {
+  switch (op()) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", *term());
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", *term());
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", *term());
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", *term());
+    default:
+      return std::format("Invalid unary predicate: operation = {}", op());
+  }
+}
+
+// BoundLiteralPredicate implementation
+BoundLiteralPredicate::BoundLiteralPredicate(Expression::Operation op,
+                                             std::shared_ptr<BoundTerm> term,
+                                             Literal literal)
+    : BoundPredicate(op, std::move(term)), literal_(std::move(literal)) {}
+
+BoundLiteralPredicate::~BoundLiteralPredicate() = default;
+
+Result<bool> BoundLiteralPredicate::Test(const Literal::Value& value) const {
+  return NotImplemented("BoundLiteralPredicate::Test not implemented");
+}
+
+bool BoundLiteralPredicate::Equals(const Expression& other) const {
+  throw IcebergError("BoundLiteralPredicate::Equals not implemented");
+}
+
+std::string BoundLiteralPredicate::ToString() const {
+  switch (op()) {
+    case Expression::Operation::kLt:
+      return std::format("{} < {}", *term(), literal());
+    case Expression::Operation::kLtEq:
+      return std::format("{} <= {}", *term(), literal());
+    case Expression::Operation::kGt:
+      return std::format("{} > {}", *term(), literal());
+    case Expression::Operation::kGtEq:
+      return std::format("{} >= {}", *term(), literal());
+    case Expression::Operation::kEq:
+      return std::format("{} == {}", *term(), literal());
+    case Expression::Operation::kNotEq:
+      return std::format("{} != {}", *term(), literal());
+    case Expression::Operation::kStartsWith:
+      return std::format("{} startsWith \"{}\"", *term(), literal());
+    case Expression::Operation::kNotStartsWith:
+      return std::format("{} notStartsWith \"{}\"", *term(), literal());
+    case Expression::Operation::kIn:
+      return std::format("{} in ({})", *term(), literal());
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in ({})", *term(), literal());
+    default:
+      return std::format("Invalid literal predicate: operation = {}", op());
+  }
+}
+
+// BoundSetPredicate implementation
+BoundSetPredicate::BoundSetPredicate(Expression::Operation op,
+                                     std::shared_ptr<BoundTerm> term,
+                                     std::span<const Literal> literals)
+    : BoundPredicate(op, std::move(term)) {
+  for (const auto& literal : literals) {
+    value_set_.push_back(literal.value());
+  }

Review Comment:
   If we're saying this is bound, shouldn't we validate that the literals are 
all the correct type and cast if they aren't?



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {
+  if (left->op() == Expression::Operation::kFalse ||
+      right->op() == Expression::Operation::kFalse) {
+    return AlwaysFalse();
+  }
+
+  if (left->op() == Expression::Operation::kTrue) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kTrue) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::And>(std::move(left), std::move(right));
+}
+
+std::shared_ptr<Expression> Expressions::Or(std::shared_ptr<Expression> left,
+                                            std::shared_ptr<Expression> right) 
{

Review Comment:
   Same as above



##########
src/iceberg/expression/term.cc:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/term.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Bound::~Bound() = default;
+
+BoundTerm::~BoundTerm() = default;
+
+Reference::~Reference() = default;
+
+template <typename B>
+Result<std::shared_ptr<B>> Unbound<B>::Bind(const Schema& schema) const {
+  return Bind(schema, /*case_sensitive=*/true);
+}
+
+// NamedReference implementation
+NamedReference::NamedReference(std::string field_name)
+    : field_name_(std::move(field_name)) {}
+
+NamedReference::~NamedReference() = default;
+
+Result<std::shared_ptr<BoundReference>> NamedReference::Bind(const Schema& 
schema,
+                                                             bool 
case_sensitive) const {
+  if (!case_sensitive) {
+    return NotImplemented("case-insensitive lookup is not implemented");
+  }
+
+  auto field_opt = schema.GetFieldByName(field_name_);
+  if (!field_opt.has_value()) {
+    return InvalidExpression("Cannot find field '{}' in struct: {}", 
field_name_,
+                             schema.ToString());
+  }
+
+  return std::make_shared<BoundReference>(field_opt->get());
+}
+
+std::string NamedReference::ToString() const {
+  return std::format("ref(name=\"{}\")", field_name_);
+}
+
+// BoundReference implementation
+BoundReference::BoundReference(SchemaField field) : field_(std::move(field)) {}
+
+BoundReference::~BoundReference() = default;
+
+std::string BoundReference::ToString() const {
+  return std::format("ref(id={}, type={})", field_.field_id(), 
field_.type()->ToString());
+}
+
+Result<Literal::Value> BoundReference::Evaluate(const StructLike& data) const {
+  return NotImplemented("BoundReference::Evaluate(StructLike) not 
implemented");
+}
+
+bool BoundReference::Equals(const BoundTerm& other) const {
+  if (other.kind() != Term::Kind::kReference) {
+    return false;
+  }
+
+  const auto& other_ref = internal::checked_cast<const BoundReference&>(other);
+  return field_.field_id() == other_ref.field_.field_id() &&
+         field_.optional() == other_ref.field_.optional() &&
+         *field_.type() == *other_ref.field_.type();
+}
+
+// UnboundTransform implementation
+UnboundTransform::UnboundTransform(std::shared_ptr<NamedReference> ref,
+                                   std::shared_ptr<Transform> transform)
+    : ref_(std::move(ref)), transform_(std::move(transform)) {}
+
+UnboundTransform::~UnboundTransform() = default;
+
+std::string UnboundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<std::shared_ptr<BoundTransform>> UnboundTransform::Bind(
+    const Schema& schema, bool case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_ref, ref_->Bind(schema, case_sensitive));
+  ICEBERG_ASSIGN_OR_RAISE(auto transform_func, 
transform_->Bind(bound_ref->type()));
+  return std::make_shared<BoundTransform>(std::move(bound_ref), transform_,
+                                          std::move(transform_func));
+}
+
+// BoundTransform implementation
+BoundTransform::BoundTransform(std::shared_ptr<BoundReference> ref,
+                               std::shared_ptr<Transform> transform,
+                               std::shared_ptr<TransformFunction> 
transform_func)
+    : ref_(std::move(ref)),
+      transform_(std::move(transform)),
+      transform_func_(std::move(transform_func)) {}
+
+BoundTransform::~BoundTransform() = default;
+
+std::string BoundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<Literal::Value> BoundTransform::Evaluate(const StructLike& data) const {
+  throw IcebergError("BoundTransform::Evaluate(StructLike) not implemented");
+}
+
+bool BoundTransform::MayProduceNull() const {
+  // transforms must produce null for null input values
+  // transforms may produce null for non-null inputs when not order-preserving
+  // FIXME: add Transform::is_order_preserving()
+  return ref_->MayProduceNull();  // || !transform_->is_order_preserving();
+}
+
+std::shared_ptr<Type> BoundTransform::type() const {
+  return transform_func_->ResultType();
+}
+
+bool BoundTransform::Equals(const BoundTerm& other) const {
+  if (other.kind() == Term::Kind::kTransform) {
+    const auto& other_transform = internal::checked_cast<const 
BoundTransform&>(other);
+    return *ref_ == *other_transform.ref_ && *transform_ == 
*other_transform.transform_;

Review Comment:
   can we settle on either overloading `operator==` or having an `Equals` 
method and not mix them?



##########
src/iceberg/expression/expressions.cc:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/expressions.h"
+
+#include "iceberg/exception.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+// Logical operations
+
+std::shared_ptr<Expression> Expressions::And(std::shared_ptr<Expression> left,
+                                             std::shared_ptr<Expression> 
right) {
+  if (left->op() == Expression::Operation::kFalse ||
+      right->op() == Expression::Operation::kFalse) {
+    return AlwaysFalse();
+  }
+
+  if (left->op() == Expression::Operation::kTrue) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kTrue) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::And>(std::move(left), std::move(right));
+}
+
+std::shared_ptr<Expression> Expressions::Or(std::shared_ptr<Expression> left,
+                                            std::shared_ptr<Expression> right) 
{
+  if (left->op() == Expression::Operation::kTrue ||
+      right->op() == Expression::Operation::kTrue) {
+    return AlwaysTrue();
+  }
+
+  if (left->op() == Expression::Operation::kFalse) {
+    return right;
+  }
+
+  if (right->op() == Expression::Operation::kFalse) {
+    return left;
+  }
+
+  return std::make_shared<::iceberg::Or>(std::move(left), std::move(right));
+}
+
+// Transform functions
+
+std::shared_ptr<UnboundTransform> Expressions::Bucket(std::string name,
+                                                      int32_t num_buckets) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Bucket(num_buckets));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Year(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Year());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Month(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Month());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Day(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Day());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Hour(std::string name) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
Transform::Hour());
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Truncate(std::string name, 
int32_t width) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)),
+                                            Transform::Truncate(width));
+}
+
+std::shared_ptr<UnboundTransform> Expressions::Transform(
+    std::string name, std::shared_ptr<::iceberg::Transform> transform) {
+  return std::make_shared<UnboundTransform>(Ref(std::move(name)), 
std::move(transform));
+}
+
+// Template implementations for unary predicates
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::IsNull(std::string name) {
+  return IsNull<BoundReference>(Ref(std::move(name)));
+}
+
+template <typename B>
+std::shared_ptr<UnboundPredicate<B>> Expressions::IsNull(
+    std::shared_ptr<UnboundTerm<B>> expr) {
+  return std::make_shared<UnboundPredicate<B>>(Expression::Operation::kIsNull,
+                                               std::move(expr));
+}
+
+std::shared_ptr<UnboundPredicate<BoundReference>> 
Expressions::NotNull(std::string name) {
+  return NotNull<BoundReference>(Ref(std::move(name)));
+}

Review Comment:
   same as above



##########
src/iceberg/expression/predicate.cc:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/predicate.h"
+
+#include <algorithm>
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/result.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+// Predicate template implementations
+template <TermType T>
+Predicate<T>::Predicate(Expression::Operation op, std::shared_ptr<T> term)
+    : operation_(op), term_(std::move(term)) {}
+
+template <TermType T>
+Predicate<T>::~Predicate() = default;
+
+// UnboundPredicate template implementations
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term)
+    : BASE(op, std::move(term)) {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term, 
Literal value)
+    : BASE(op, std::move(term)), values_{std::move(value)} {}
+
+template <typename B>
+UnboundPredicate<B>::UnboundPredicate(Expression::Operation op,
+                                      std::shared_ptr<UnboundTerm<B>> term,
+                                      std::vector<Literal> values)
+    : BASE(op, std::move(term)), values_(std::move(values)) {}
+
+template <typename B>
+UnboundPredicate<B>::~UnboundPredicate() = default;
+
+namespace {}
+
+template <typename B>
+std::string UnboundPredicate<B>::ToString() const {
+  auto invalid_predicate_string = [](Expression::Operation op) {
+    return std::format("Invalid predicate: operation = {}", op);
+  };
+
+  const auto& term = *BASE::term();
+  const auto op = BASE::op();
+
+  switch (op) {
+    case Expression::Operation::kIsNull:
+      return std::format("is_null({})", term);
+    case Expression::Operation::kNotNull:
+      return std::format("not_null({})", term);
+    case Expression::Operation::kIsNan:
+      return std::format("is_nan({})", term);
+    case Expression::Operation::kNotNan:
+      return std::format("not_nan({})", term);
+    case Expression::Operation::kLt:
+      return values_.size() == 1 ? std::format("{} < {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kLtEq:
+      return values_.size() == 1 ? std::format("{} <= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGt:
+      return values_.size() == 1 ? std::format("{} > {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kGtEq:
+      return values_.size() == 1 ? std::format("{} >= {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kEq:
+      return values_.size() == 1 ? std::format("{} == {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotEq:
+      return values_.size() == 1 ? std::format("{} != {}", term, values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kStartsWith:
+      return values_.size() == 1 ? std::format("{} startsWith \"{}\"", term, 
values_[0])
+                                 : invalid_predicate_string(op);
+    case Expression::Operation::kNotStartsWith:
+      return values_.size() == 1
+                 ? std::format("{} notStartsWith \"{}\"", term, values_[0])
+                 : invalid_predicate_string(op);
+    case Expression::Operation::kIn:
+      return std::format("{} in {}", term, values_);
+    case Expression::Operation::kNotIn:
+      return std::format("{} not in {}", term, values_);
+    default:
+      return invalid_predicate_string(op);
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Negate() const {
+  ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op()));
+  return std::make_shared<UnboundPredicate>(negated_op, BASE::term(), values_);
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::Bind(const Schema& 
schema,
+                                                              bool 
case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_term, BASE::term()->Bind(schema, 
case_sensitive));
+
+  if (values_.empty()) {
+    return BindUnaryOperation(std::move(bound_term));
+  }
+
+  if (BASE::op() == Expression::Operation::kIn ||
+      BASE::op() == Expression::Operation::kNotIn) {
+    return BindInOperation(std::move(bound_term));
+  }
+
+  return BindLiteralOperation(std::move(bound_term));
+}
+
+namespace {
+
+bool IsFloatingType(TypeId type) {
+  return type == TypeId::kFloat || type == TypeId::kDouble;
+}
+
+}  // namespace
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindUnaryOperation(
+    std::shared_ptr<B> bound_term) const {
+  switch (BASE::op()) {
+    case Expression::Operation::kIsNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysFalse();
+      }
+      // TODO(gangwu): deal with UnknownType
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kIsNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kNotNull:
+      if (!bound_term->MayProduceNull()) {
+        return Expressions::AlwaysTrue();
+      }
+      return 
std::make_shared<BoundUnaryPredicate>(Expression::Operation::kNotNull,
+                                                   std::move(bound_term));
+    case Expression::Operation::kIsNan:
+    case Expression::Operation::kNotNan:
+      if (!IsFloatingType(bound_term->type()->type_id())) {
+        return InvalidExpression("{} cannot be used with a non-floating-point 
column",
+                                 BASE::op());
+      }
+      return std::make_shared<BoundUnaryPredicate>(BASE::op(), 
std::move(bound_term));
+    default:
+      return InvalidExpression("Operation must be IS_NULL, NOT_NULL, IS_NAN, 
or NOT_NAN");
+  }
+}
+
+template <typename B>
+Result<std::shared_ptr<Expression>> UnboundPredicate<B>::BindLiteralOperation(
+    std::shared_ptr<B> bound_term) const {
+  if (BASE::op() == Expression::Operation::kStartsWith ||
+      BASE::op() == Expression::Operation::kNotStartsWith) {
+    if (bound_term->type()->type_id() != TypeId::kString) {
+      return InvalidExpression(
+          "Term for STARTS_WITH or NOT_STARTS_WITH must produce a string: {}: 
{}",
+          *bound_term, *bound_term->type());
+    }
+  }
+
+  if (values_.size() != 1) {
+    return InvalidExpression("Literal operation requires a single value but 
got {}",
+                             values_.size());
+  }
+
+  const auto& literal = values_[0];

Review Comment:
   should we cast the literal to the bound_term's type?



##########
src/iceberg/expression/term.cc:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/term.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Bound::~Bound() = default;
+
+BoundTerm::~BoundTerm() = default;
+
+Reference::~Reference() = default;
+
+template <typename B>
+Result<std::shared_ptr<B>> Unbound<B>::Bind(const Schema& schema) const {
+  return Bind(schema, /*case_sensitive=*/true);
+}
+
+// NamedReference implementation
+NamedReference::NamedReference(std::string field_name)
+    : field_name_(std::move(field_name)) {}
+
+NamedReference::~NamedReference() = default;
+
+Result<std::shared_ptr<BoundReference>> NamedReference::Bind(const Schema& 
schema,
+                                                             bool 
case_sensitive) const {
+  if (!case_sensitive) {
+    return NotImplemented("case-insensitive lookup is not implemented");
+  }
+
+  auto field_opt = schema.GetFieldByName(field_name_);
+  if (!field_opt.has_value()) {
+    return InvalidExpression("Cannot find field '{}' in struct: {}", 
field_name_,
+                             schema.ToString());
+  }
+
+  return std::make_shared<BoundReference>(field_opt->get());
+}
+
+std::string NamedReference::ToString() const {
+  return std::format("ref(name=\"{}\")", field_name_);
+}
+
+// BoundReference implementation
+BoundReference::BoundReference(SchemaField field) : field_(std::move(field)) {}
+
+BoundReference::~BoundReference() = default;
+
+std::string BoundReference::ToString() const {
+  return std::format("ref(id={}, type={})", field_.field_id(), 
field_.type()->ToString());
+}
+
+Result<Literal::Value> BoundReference::Evaluate(const StructLike& data) const {
+  return NotImplemented("BoundReference::Evaluate(StructLike) not 
implemented");
+}
+
+bool BoundReference::Equals(const BoundTerm& other) const {
+  if (other.kind() != Term::Kind::kReference) {
+    return false;
+  }
+
+  const auto& other_ref = internal::checked_cast<const BoundReference&>(other);
+  return field_.field_id() == other_ref.field_.field_id() &&
+         field_.optional() == other_ref.field_.optional() &&
+         *field_.type() == *other_ref.field_.type();
+}
+
+// UnboundTransform implementation
+UnboundTransform::UnboundTransform(std::shared_ptr<NamedReference> ref,
+                                   std::shared_ptr<Transform> transform)
+    : ref_(std::move(ref)), transform_(std::move(transform)) {}
+
+UnboundTransform::~UnboundTransform() = default;
+
+std::string UnboundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<std::shared_ptr<BoundTransform>> UnboundTransform::Bind(
+    const Schema& schema, bool case_sensitive) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto bound_ref, ref_->Bind(schema, case_sensitive));
+  ICEBERG_ASSIGN_OR_RAISE(auto transform_func, 
transform_->Bind(bound_ref->type()));
+  return std::make_shared<BoundTransform>(std::move(bound_ref), transform_,
+                                          std::move(transform_func));
+}
+
+// BoundTransform implementation
+BoundTransform::BoundTransform(std::shared_ptr<BoundReference> ref,
+                               std::shared_ptr<Transform> transform,
+                               std::shared_ptr<TransformFunction> 
transform_func)
+    : ref_(std::move(ref)),
+      transform_(std::move(transform)),
+      transform_func_(std::move(transform_func)) {}
+
+BoundTransform::~BoundTransform() = default;
+
+std::string BoundTransform::ToString() const {
+  return std::format("{}({})", transform_->ToString(), ref_->ToString());
+}
+
+Result<Literal::Value> BoundTransform::Evaluate(const StructLike& data) const {
+  throw IcebergError("BoundTransform::Evaluate(StructLike) not implemented");
+}
+
+bool BoundTransform::MayProduceNull() const {
+  // transforms must produce null for null input values
+  // transforms may produce null for non-null inputs when not order-preserving
+  // FIXME: add Transform::is_order_preserving()
+  return ref_->MayProduceNull();  // || !transform_->is_order_preserving();
+}
+
+std::shared_ptr<Type> BoundTransform::type() const {
+  return transform_func_->ResultType();
+}
+
+bool BoundTransform::Equals(const BoundTerm& other) const {
+  if (other.kind() == Term::Kind::kTransform) {
+    const auto& other_transform = internal::checked_cast<const 
BoundTransform&>(other);
+    return *ref_ == *other_transform.ref_ && *transform_ == 
*other_transform.transform_;
+  }
+
+  if (transform_->transform_type() == TransformType::kIdentity &&
+      other.kind() == Term::Kind::kReference) {
+    return ref_->Equals(other);

Review Comment:
   either this should be `*ref_ == other` or above should be 
`ref_->Equals(*other_transform.ref_)` right?



##########
src/iceberg/expression/term.cc:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/term.h"
+
+#include <format>
+
+#include "iceberg/exception.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Bound::~Bound() = default;
+
+BoundTerm::~BoundTerm() = default;
+
+Reference::~Reference() = default;
+
+template <typename B>
+Result<std::shared_ptr<B>> Unbound<B>::Bind(const Schema& schema) const {
+  return Bind(schema, /*case_sensitive=*/true);

Review Comment:
   shouldn't case_sensitive be a parameter rather than always be true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to