This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 42aa1d0b feat: add residual evaluator (#402)
42aa1d0b is described below

commit 42aa1d0b71940abac3270b74ce2fff0f1ef508eb
Author: Junwang Zhao <[email protected]>
AuthorDate: Thu Dec 11 12:26:14 2025 +0800

    feat: add residual evaluator (#402)
---
 src/iceberg/CMakeLists.txt                   |   1 +
 src/iceberg/expression/expression_visitor.h  |   4 +-
 src/iceberg/expression/meson.build           |   1 +
 src/iceberg/expression/residual_evaluator.cc | 352 +++++++++++++++
 src/iceberg/expression/residual_evaluator.h  |  89 ++++
 src/iceberg/meson.build                      |   1 +
 src/iceberg/partition_spec.cc                |  23 +-
 src/iceberg/partition_spec.h                 |  15 +-
 src/iceberg/test/CMakeLists.txt              |   1 +
 src/iceberg/test/meson.build                 |   1 +
 src/iceberg/test/residual_evaluator_test.cc  | 614 +++++++++++++++++++++++++++
 11 files changed, 1097 insertions(+), 5 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 275d71fc..7218d76e 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -28,6 +28,7 @@ set(ICEBERG_SOURCES
     expression/inclusive_metrics_evaluator.cc
     expression/literal.cc
     expression/predicate.cc
+    expression/residual_evaluator.cc
     expression/rewrite_not.cc
     expression/strict_metrics_evaluator.cc
     expression/term.cc
diff --git a/src/iceberg/expression/expression_visitor.h 
b/src/iceberg/expression/expression_visitor.h
index d6638245..27cfb99c 100644
--- a/src/iceberg/expression/expression_visitor.h
+++ b/src/iceberg/expression/expression_visitor.h
@@ -260,10 +260,8 @@ class ICEBERG_EXPORT BoundVisitor : public 
ExpressionVisitor<R> {
 
   /// \brief Visit an unbound predicate.
   ///
-  /// Bound visitors do not support unbound predicates.
-  ///
   /// \param pred The unbound predicate
-  Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) final {
+  Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) override {
     ICEBERG_DCHECK(pred != nullptr, "UnboundPredicate cannot be null");
     return NotSupported("Not a bound predicate: {}", pred->ToString());
   }
diff --git a/src/iceberg/expression/meson.build 
b/src/iceberg/expression/meson.build
index 8e312791..f3b74848 100644
--- a/src/iceberg/expression/meson.build
+++ b/src/iceberg/expression/meson.build
@@ -26,6 +26,7 @@ install_headers(
         'inclusive_metrics_evaluator.h',
         'literal.h',
         'predicate.h',
+        'residual_evaluator.h',
         'rewrite_not.h',
         'strict_metrics_evaluator.h',
         'term.h',
diff --git a/src/iceberg/expression/residual_evaluator.cc 
b/src/iceberg/expression/residual_evaluator.cc
new file mode 100644
index 00000000..e818199e
--- /dev/null
+++ b/src/iceberg/expression/residual_evaluator.cc
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/residual_evaluator.h"
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/expression_visitor.h"
+#include "iceberg/expression/predicate.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/struct_like.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+std::shared_ptr<Expression> always_true() { return True::Instance(); }
+std::shared_ptr<Expression> always_false() { return False::Instance(); }
+
+class ResidualVisitor : public BoundVisitor<std::shared_ptr<Expression>> {
+ public:
+  static Result<ResidualVisitor> Make(const PartitionSpec& spec, const Schema& 
schema,
+                                      const StructLike& partition_data,
+                                      bool case_sensitive) {
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec.PartitionType(schema));
+    auto partition_schema = FromStructType(std::move(*partition_type), 
std::nullopt);
+    return ResidualVisitor(spec, schema, std::move(partition_schema), 
partition_data,
+                           case_sensitive);
+  }
+
+  Result<std::shared_ptr<Expression>> AlwaysTrue() override { return 
always_true(); }
+
+  Result<std::shared_ptr<Expression>> AlwaysFalse() override { return 
always_false(); }
+
+  Result<std::shared_ptr<Expression>> Not(
+      const std::shared_ptr<Expression>& child_result) override {
+    return Not::MakeFolded(child_result);
+  }
+
+  Result<std::shared_ptr<Expression>> And(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override {
+    return And::MakeFolded(left_result, right_result);
+  }
+
+  Result<std::shared_ptr<Expression>> Or(
+      const std::shared_ptr<Expression>& left_result,
+      const std::shared_ptr<Expression>& right_result) override {
+    return Or::MakeFolded(left_result, right_result);
+  }
+
+  Result<std::shared_ptr<Expression>> IsNull(
+      const std::shared_ptr<Bound>& expr) override {
+    return expr->Evaluate(partition_data_).transform([](const auto& value) {
+      return value.IsNull() ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> NotNull(
+      const std::shared_ptr<Bound>& expr) override {
+    return expr->Evaluate(partition_data_).transform([](const auto& value) {
+      return value.IsNull() ? always_false() : always_true();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> IsNaN(const std::shared_ptr<Bound>& 
expr) override {
+    return expr->Evaluate(partition_data_).transform([](const auto& value) {
+      return value.IsNaN() ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> NotNaN(
+      const std::shared_ptr<Bound>& expr) override {
+    return expr->Evaluate(partition_data_).transform([](const auto& value) {
+      return value.IsNaN() ? always_false() : always_true();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> Lt(const std::shared_ptr<Bound>& expr,
+                                         const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value < lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> LtEq(const std::shared_ptr<Bound>& expr,
+                                           const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value <= lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> Gt(const std::shared_ptr<Bound>& expr,
+                                         const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value > lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> GtEq(const std::shared_ptr<Bound>& expr,
+                                           const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value >= lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> Eq(const std::shared_ptr<Bound>& expr,
+                                         const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value == lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> NotEq(const std::shared_ptr<Bound>& expr,
+                                            const Literal& lit) override {
+    return expr->Evaluate(partition_data_).transform([&lit](const auto& value) 
{
+      return value != lit ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> StartsWith(const std::shared_ptr<Bound>& 
expr,
+                                                 const Literal& lit) override {
+    ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_));
+
+    if (!std::holds_alternative<std::string>(value.value()) ||
+        !std::holds_alternative<std::string>(lit.value())) {
+      return InvalidExpression("Both value and literal should be strings");
+    }
+
+    const auto& str_value = std::get<std::string>(value.value());
+    const auto& str_prefix = std::get<std::string>(lit.value());
+    return str_value.starts_with(str_prefix) ? always_true() : always_false();
+  }
+
+  Result<std::shared_ptr<Expression>> NotStartsWith(const 
std::shared_ptr<Bound>& expr,
+                                                    const Literal& lit) 
override {
+    ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_));
+
+    if (!std::holds_alternative<std::string>(value.value()) ||
+        !std::holds_alternative<std::string>(lit.value())) {
+      return InvalidExpression("Both value and literal should be strings");
+    }
+
+    const auto& str_value = std::get<std::string>(value.value());
+    const auto& str_prefix = std::get<std::string>(lit.value());
+    return str_value.starts_with(str_prefix) ? always_false() : always_true();
+  }
+
+  Result<std::shared_ptr<Expression>> In(
+      const std::shared_ptr<Bound>& expr,
+      const BoundSetPredicate::LiteralSet& literal_set) override {
+    return expr->Evaluate(partition_data_).transform([&literal_set](const 
auto& value) {
+      return literal_set.contains(value) ? always_true() : always_false();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> NotIn(
+      const std::shared_ptr<Bound>& expr,
+      const BoundSetPredicate::LiteralSet& literal_set) override {
+    return expr->Evaluate(partition_data_).transform([&literal_set](const 
auto& value) {
+      return literal_set.contains(value) ? always_false() : always_true();
+    });
+  }
+
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<BoundPredicate>& pred) override;
+
+  Result<std::shared_ptr<Expression>> Predicate(
+      const std::shared_ptr<UnboundPredicate>& pred) override {
+    ICEBERG_ASSIGN_OR_RAISE(auto bound, pred->Bind(schema_, case_sensitive_));
+    if (bound->is_bound_predicate()) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto residual, 
Predicate(std::dynamic_pointer_cast<BoundPredicate>(bound)));
+      if (residual->is_bound_predicate()) {
+        // replace inclusive original unbound predicate
+        return pred;
+      }
+      return residual;
+    }
+    // if binding didn't result in a Predicate, return the expression
+    return bound;
+  }
+
+ private:
+  ResidualVisitor(const PartitionSpec& spec, const Schema& schema,
+                  std::unique_ptr<Schema> partition_schema,
+                  const StructLike& partition_data, bool case_sensitive)
+      : spec_(spec),
+        schema_(schema),
+        partition_schema_(std::move(partition_schema)),
+        partition_data_(partition_data),
+        case_sensitive_(case_sensitive) {}
+
+  const PartitionSpec& spec_;
+  const Schema& schema_;
+  std::unique_ptr<Schema> partition_schema_;
+  const StructLike& partition_data_;
+  bool case_sensitive_;
+};
+
+Result<std::shared_ptr<Expression>> ResidualVisitor::Predicate(
+    const std::shared_ptr<BoundPredicate>& pred) {
+  // Get the strict projection and inclusive projection of this predicate in 
partition
+  // data, then use them to determine whether to return the original 
predicate. The
+  // strict projection returns true iff the original predicate would have 
returned true,
+  // so the predicate can be eliminated if the strict projection evaluates to 
true.
+  // Similarly the inclusive projection returns false iff the original 
predicate would
+  // have returned false, so the predicate can also be eliminated if the 
inclusive
+  // projection evaluates to false.
+
+  // If there is no strict projection or if it evaluates to false, then return 
the
+  // predicate.
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto parts, 
spec_.GetFieldsBySourceId(pred->reference()->field().field_id()));
+  if (parts.empty()) {
+    // Not associated with a partition field, can't be evaluated
+    return pred;
+  }
+
+  for (const auto& part : parts) {
+    // Check the strict projection
+    ICEBERG_ASSIGN_OR_RAISE(auto strict_projection, 
part.get().transform()->ProjectStrict(
+                                                        part.get().name(), 
pred));
+    std::shared_ptr<Expression> strict_result = nullptr;
+
+    if (strict_projection != nullptr) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto bound_strict,
+          strict_projection->Bind(*partition_schema_, case_sensitive_));
+      if (bound_strict->is_bound_predicate()) {
+        ICEBERG_ASSIGN_OR_RAISE(
+            strict_result, BoundVisitor::Predicate(
+                               
std::dynamic_pointer_cast<BoundPredicate>(bound_strict)));
+      } else {
+        // If the result is not a predicate, then it must be a constant like 
alwaysTrue
+        // or alwaysFalse
+        strict_result = std::move(bound_strict);
+      }
+    }
+
+    if (strict_result != nullptr && strict_result->op() == 
Expression::Operation::kTrue) {
+      // If strict is true, returning true
+      return always_true();
+    }
+
+    // Check the inclusive projection
+    ICEBERG_ASSIGN_OR_RAISE(auto inclusive_projection,
+                            part.get().transform()->Project(part.get().name(), 
pred));
+    std::shared_ptr<Expression> inclusive_result = nullptr;
+
+    if (inclusive_projection != nullptr) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto bound_inclusive,
+          inclusive_projection->Bind(*partition_schema_, case_sensitive_));
+
+      if (bound_inclusive->is_bound_predicate()) {
+        ICEBERG_ASSIGN_OR_RAISE(
+            inclusive_result,
+            BoundVisitor::Predicate(
+                std::dynamic_pointer_cast<BoundPredicate>(bound_inclusive)));
+      } else {
+        // If the result is not a predicate, then it must be a constant like 
alwaysTrue
+        // or alwaysFalse
+        inclusive_result = std::move(bound_inclusive);
+      }
+    }
+
+    if (inclusive_result != nullptr &&
+        inclusive_result->op() == Expression::Operation::kFalse) {
+      // If inclusive is false, returning false
+      return always_false();
+    }
+  }
+
+  // Neither strict nor inclusive predicate was conclusive, returning the 
original pred
+  return pred;
+}
+
+// Unpartitioned residual evaluator that always returns the original expression
+class UnpartitionedResidualEvaluator : public ResidualEvaluator {
+ public:
+  explicit UnpartitionedResidualEvaluator(std::shared_ptr<Expression> expr)
+      : ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(),
+                          *kEmptySchema_, true) {}
+
+  Result<std::shared_ptr<Expression>> ResidualFor(
+      const StructLike& /*partition_data*/) const override {
+    return expr_;
+  }
+
+ private:
+  // Store an empty schema to avoid dangling reference when passing to base 
class
+  inline static const std::shared_ptr<Schema> kEmptySchema_ =
+      std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
+};
+
+}  // namespace
+
+ResidualEvaluator::ResidualEvaluator(std::shared_ptr<Expression> expr,
+                                     const PartitionSpec& spec, const Schema& 
schema,
+                                     bool case_sensitive)
+    : expr_(std::move(expr)),
+      spec_(spec),
+      schema_(schema),
+      case_sensitive_(case_sensitive) {}
+
+ResidualEvaluator::~ResidualEvaluator() = default;
+
+Result<std::unique_ptr<ResidualEvaluator>> ResidualEvaluator::Unpartitioned(
+    std::shared_ptr<Expression> expr) {
+  return std::unique_ptr<ResidualEvaluator>(
+      new UnpartitionedResidualEvaluator(std::move(expr)));
+}
+
+Result<std::unique_ptr<ResidualEvaluator>> ResidualEvaluator::Make(
+    std::shared_ptr<Expression> expr, const PartitionSpec& spec, const Schema& 
schema,
+    bool case_sensitive) {
+  if (spec.fields().empty()) {
+    return Unpartitioned(std::move(expr));
+  }
+  return std::unique_ptr<ResidualEvaluator>(
+      new ResidualEvaluator(std::move(expr), spec, schema, case_sensitive));
+}
+
+Result<std::shared_ptr<Expression>> ResidualEvaluator::ResidualFor(
+    const StructLike& partition_data) const {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto visitor,
+      ResidualVisitor::Make(spec_, schema_, partition_data, case_sensitive_));
+  return Visit<std::shared_ptr<Expression>, ResidualVisitor>(expr_, visitor);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/expression/residual_evaluator.h 
b/src/iceberg/expression/residual_evaluator.h
new file mode 100644
index 00000000..60bf67f2
--- /dev/null
+++ b/src/iceberg/expression/residual_evaluator.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/expression/residual_evaluator.h
+/// Residual evaluator for finding residual expressions after partition 
evaluation.
+
+#include <memory>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Finds the residuals for an Expression using the partitions in the 
given
+/// PartitionSpec.
+///
+/// A residual expression is made by partially evaluating an expression using 
partition
+/// values. For example, if a table is partitioned by day(utc_timestamp) and 
is read
+/// with a filter expression utc_timestamp >= a and utc_timestamp <= b, then 
there are
+/// 4 possible residual expressions for the partition data, d:
+///
+/// - If d > day(a) and d < day(b), the residual is always true
+/// - If d == day(a) and d != day(b), the residual is utc_timestamp >= a
+/// - If d == day(b) and d != day(a), the residual is utc_timestamp <= b
+/// - If d == day(a) == day(b), the residual is utc_timestamp >= a and 
utc_timestamp <= b
+///
+/// Partition data is passed using StructLike. Residuals are returned by 
ResidualFor().
+class ICEBERG_EXPORT ResidualEvaluator {
+ public:
+  /// \brief Return a residual evaluator for an unpartitioned PartitionSpec.
+  ///
+  /// \param expr An expression
+  /// \return A residual evaluator that always returns the expression
+  static Result<std::unique_ptr<ResidualEvaluator>> Unpartitioned(
+      std::shared_ptr<Expression> expr);
+
+  /// \brief Return a residual evaluator for a PartitionSpec and Expression.
+  ///
+  /// \param expr An expression
+  /// \param spec A partition spec
+  /// \param schema The schema to bind expressions against
+  /// \param case_sensitive Whether field name matching is case-sensitive
+  /// \return A residual evaluator for the expression
+  static Result<std::unique_ptr<ResidualEvaluator>> 
Make(std::shared_ptr<Expression> expr,
+                                                         const PartitionSpec& 
spec,
+                                                         const Schema& schema,
+                                                         bool case_sensitive = 
true);
+
+  ~ResidualEvaluator();
+
+  /// \brief Returns a residual expression for the given partition values.
+  ///
+  /// \param partition_data Partition data values
+  /// \return The residual of this evaluator's expression from the partition 
values
+  virtual Result<std::shared_ptr<Expression>> ResidualFor(
+      const StructLike& partition_data) const;
+
+ protected:
+  ResidualEvaluator(std::shared_ptr<Expression> expr, const PartitionSpec& 
spec,
+                    const Schema& schema, bool case_sensitive);
+
+  std::shared_ptr<Expression> expr_;
+
+ private:
+  const PartitionSpec& spec_;
+  const Schema& schema_;
+  bool case_sensitive_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index c139c66b..c10c5a82 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -50,6 +50,7 @@ iceberg_sources = files(
     'expression/inclusive_metrics_evaluator.cc',
     'expression/literal.cc',
     'expression/predicate.cc',
+    'expression/residual_evaluator.cc',
     'expression/rewrite_not.cc',
     'expression/strict_metrics_evaluator.cc',
     'expression/term.cc',
diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc
index 0c2dda12..b0f1144c 100644
--- a/src/iceberg/partition_spec.cc
+++ b/src/iceberg/partition_spec.cc
@@ -60,7 +60,8 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
 
 std::span<const PartitionField> PartitionSpec::fields() const { return 
fields_; }
 
-Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(const Schema& 
schema) {
+Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
+    const Schema& schema) const {
   if (fields_.empty()) {
     return std::make_unique<StructType>(std::vector<SchemaField>{});
   }
@@ -154,6 +155,26 @@ Status PartitionSpec::Validate(const Schema& schema, bool 
allow_missing_fields)
   return {};
 }
 
+Result<std::vector<std::reference_wrapper<const PartitionField>>>
+PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, 
source_id_to_fields_.Get(*this));
+  if (auto it = source_id_to_fields.get().find(source_id);
+      it != source_id_to_fields.get().cend()) {
+    return it->second;
+  }
+  // Note that it is not an error to not find any partition fields for a 
source id.
+  return std::vector<PartitionFieldRef>{};
+}
+
+Result<PartitionSpec::SourceIdToFieldsMap> 
PartitionSpec::InitSourceIdToFieldsMap(
+    const PartitionSpec& self) {
+  SourceIdToFieldsMap source_id_to_fields;
+  for (const auto& field : self.fields_) {
+    source_id_to_fields[field.source_id()].emplace_back(std::cref(field));
+  }
+  return source_id_to_fields;
+}
+
 Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
     const Schema& schema, int32_t spec_id, std::vector<PartitionField> fields,
     bool allow_missing_fields, std::optional<int32_t> last_assigned_field_id) {
diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h
index 12beb9c9..7f8f6782 100644
--- a/src/iceberg/partition_spec.h
+++ b/src/iceberg/partition_spec.h
@@ -27,6 +27,7 @@
 #include <optional>
 #include <span>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
@@ -34,6 +35,7 @@
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
 #include "iceberg/util/formattable.h"
+#include "iceberg/util/lazy.h"
 
 namespace iceberg {
 
@@ -60,7 +62,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable 
{
   std::span<const PartitionField> fields() const;
 
   /// \brief Get the partition type binding to the input schema.
-  Result<std::unique_ptr<StructType>> PartitionType(const Schema&);
+  Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) 
const;
 
   std::string ToString() const override;
 
@@ -77,6 +79,13 @@ class ICEBERG_EXPORT PartitionSpec : public 
util::Formattable {
   /// \return Error status if the partition spec is invalid.
   Status Validate(const Schema& schema, bool allow_missing_fields) const;
 
+  /// \brief Get the partition fields by source ID.
+  /// \param source_id The id of the source field.
+  /// \return The partition fields by source ID, or NotFound if the source 
field is not
+  /// found.
+  using PartitionFieldRef = std::reference_wrapper<const PartitionField>;
+  Result<std::vector<PartitionFieldRef>> GetFieldsBySourceId(int32_t 
source_id) const;
+
   /// \brief Create a PartitionSpec binding to a schema.
   /// \param schema The schema to bind the partition spec to.
   /// \param spec_id The spec ID.
@@ -116,9 +125,13 @@ class ICEBERG_EXPORT PartitionSpec : public 
util::Formattable {
   /// \brief Compare two partition specs for equality.
   bool Equals(const PartitionSpec& other) const;
 
+  using SourceIdToFieldsMap = std::unordered_map<int32_t, 
std::vector<PartitionFieldRef>>;
+  static Result<SourceIdToFieldsMap> InitSourceIdToFieldsMap(const 
PartitionSpec&);
+
   const int32_t spec_id_;
   std::vector<PartitionField> fields_;
   int32_t last_assigned_field_id_;
+  Lazy<InitSourceIdToFieldsMap> source_id_to_fields_;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 9892e3d4..f9cfb848 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -88,6 +88,7 @@ add_iceberg_test(expression_test
                  inclusive_metrics_evaluator_test.cc
                  inclusive_metrics_evaluator_with_transform_test.cc
                  predicate_test.cc
+                 residual_evaluator_test.cc
                  strict_metrics_evaluator_test.cc)
 
 add_iceberg_test(json_serde_test
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index c73abe18..0f8b9291 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -65,6 +65,7 @@ iceberg_tests = {
             'inclusive_metrics_evaluator_with_transform_test.cc',
             'literal_test.cc',
             'predicate_test.cc',
+            'residual_evaluator_test.cc',
             'strict_metrics_evaluator_test.cc',
         ),
     },
diff --git a/src/iceberg/test/residual_evaluator_test.cc 
b/src/iceberg/test/residual_evaluator_test.cc
new file mode 100644
index 00000000..bef17d2b
--- /dev/null
+++ b/src/iceberg/test/residual_evaluator_test.cc
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/expression/residual_evaluator.h"
+
+#include <cmath>
+#include <memory>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/literal.h"
+#include "iceberg/expression/predicate.h"
+#include "iceberg/partition_field.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class ResidualEvaluatorTest : public ::testing::Test {
+ protected:
+  void SetUp() override {}
+
+  // Helper function to assert residual operation
+  void AssertResidualOp(const std::shared_ptr<PartitionSpec>& spec,
+                        const std::shared_ptr<Schema>& schema,
+                        const std::shared_ptr<Expression>& pred,
+                        const Literal& partition_value,
+                        Expression::Operation expected_op) {
+    ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                           ResidualEvaluator::Make(pred, *spec, *schema, 
true));
+    PartitionValues partition_data(partition_value);
+    ICEBERG_UNWRAP_OR_FAIL(auto residual, 
evaluator->ResidualFor(partition_data));
+    EXPECT_EQ(residual->op(), expected_op);
+  }
+
+  // Helper function to assert residual is the same as original predicate
+  void AssertResidualPredicate(const std::shared_ptr<PartitionSpec>& spec,
+                               const std::shared_ptr<Schema>& schema,
+                               const std::shared_ptr<Expression>& pred,
+                               const Literal& partition_value) {
+    ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                           ResidualEvaluator::Make(pred, *spec, *schema, 
true));
+    PartitionValues partition_data(partition_value);
+    ICEBERG_UNWRAP_OR_FAIL(auto residual, 
evaluator->ResidualFor(partition_data));
+    ASSERT_TRUE(residual->is_unbound_predicate());
+    auto unbound_residual = 
std::dynamic_pointer_cast<UnboundPredicate>(residual);
+    ASSERT_NE(unbound_residual, nullptr);
+    auto unbound_original = std::dynamic_pointer_cast<UnboundPredicate>(pred);
+    ASSERT_NE(unbound_original, nullptr);
+    EXPECT_EQ(unbound_residual->op(), unbound_original->op());
+    EXPECT_EQ(unbound_residual->reference()->name(),
+              unbound_original->reference()->name());
+    // Check literal value
+    auto residual_impl =
+        
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound_residual);
+    auto original_impl =
+        
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound_original);
+    ASSERT_NE(residual_impl, nullptr);
+    ASSERT_NE(original_impl, nullptr);
+    ASSERT_EQ(residual_impl->literals().size(), 
original_impl->literals().size());
+    if (!residual_impl->literals().empty()) {
+      EXPECT_EQ(residual_impl->literals()[0].value(),
+                original_impl->literals()[0].value());
+    }
+  }
+};
+
+TEST_F(ResidualEvaluatorTest, IdentityTransformResiduals) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "dateint", 
int32()),
+                               SchemaField::MakeOptional(51, "hour", int32())},
+      std::nullopt);
+
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field(50, 1000, "dateint", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  // Create expression: (dateint < 20170815 AND dateint > 20170801) OR
+  //                     (dateint == 20170815 AND hour < 12) OR
+  //                     (dateint == 20170801 AND hour > 11)
+  auto expr = Expressions::Or(
+      Expressions::Or(
+          Expressions::And(Expressions::LessThan("dateint", 
Literal::Int(20170815)),
+                           Expressions::GreaterThan("dateint", 
Literal::Int(20170801))),
+          Expressions::And(Expressions::Equal("dateint", 
Literal::Int(20170815)),
+                           Expressions::LessThan("hour", Literal::Int(12)))),
+      Expressions::And(Expressions::Equal("dateint", Literal::Int(20170801)),
+                       Expressions::GreaterThan("hour", Literal::Int(11))));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                         ResidualEvaluator::Make(expr, *spec, *schema, true));
+
+  // Equal to the upper date bound
+  PartitionValues partition_data1(Literal::Int(20170815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual1, 
evaluator->ResidualFor(partition_data1));
+  ASSERT_TRUE(residual1->is_unbound_predicate());
+  auto unbound1 = std::dynamic_pointer_cast<UnboundPredicate>(residual1);
+  ASSERT_NE(unbound1, nullptr);
+  EXPECT_EQ(unbound1->op(), Expression::Operation::kLt);
+  EXPECT_EQ(unbound1->reference()->name(), "hour");
+  // Access literal through literals() span
+  auto unbound1_impl =
+      
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound1);
+  ASSERT_NE(unbound1_impl, nullptr);
+  ASSERT_EQ(unbound1_impl->literals().size(), 1);
+  EXPECT_EQ(unbound1_impl->literals()[0].value(), Literal::Int(12).value());
+
+  // Equal to the lower date bound
+  PartitionValues partition_data2(Literal::Int(20170801));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual2, 
evaluator->ResidualFor(partition_data2));
+  ASSERT_TRUE(residual2->is_unbound_predicate());
+  auto unbound2 = std::dynamic_pointer_cast<UnboundPredicate>(residual2);
+  ASSERT_NE(unbound2, nullptr);
+  EXPECT_EQ(unbound2->op(), Expression::Operation::kGt);
+  EXPECT_EQ(unbound2->reference()->name(), "hour");
+  // Access literal through literals() span
+  auto unbound2_impl =
+      
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound2);
+  ASSERT_NE(unbound2_impl, nullptr);
+  ASSERT_EQ(unbound2_impl->literals().size(), 1);
+  EXPECT_EQ(unbound2_impl->literals()[0].value(), Literal::Int(11).value());
+
+  // Inside the date range
+  PartitionValues partition_data3(Literal::Int(20170812));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual3, 
evaluator->ResidualFor(partition_data3));
+  EXPECT_EQ(residual3->op(), Expression::Operation::kTrue);
+
+  // Outside the date range
+  PartitionValues partition_data4(Literal::Int(20170817));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual4, 
evaluator->ResidualFor(partition_data4));
+  EXPECT_EQ(residual4->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(ResidualEvaluatorTest, CaseInsensitiveIdentityTransformResiduals) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "dateint", 
int32()),
+                               SchemaField::MakeOptional(51, "hour", int32())},
+      std::nullopt);
+
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field(50, 1000, "dateint", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  // Create expression with mixed case field names
+  auto expr = Expressions::Or(
+      Expressions::Or(
+          Expressions::And(Expressions::LessThan("DATEINT", 
Literal::Int(20170815)),
+                           Expressions::GreaterThan("dateint", 
Literal::Int(20170801))),
+          Expressions::And(Expressions::Equal("dateint", 
Literal::Int(20170815)),
+                           Expressions::LessThan("HOUR", Literal::Int(12)))),
+      Expressions::And(Expressions::Equal("DateInt", Literal::Int(20170801)),
+                       Expressions::GreaterThan("hOUr", Literal::Int(11))));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                         ResidualEvaluator::Make(expr, *spec, *schema, false));
+
+  // Equal to the upper date bound
+  PartitionValues partition_data1(Literal::Int(20170815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual1, 
evaluator->ResidualFor(partition_data1));
+  ASSERT_TRUE(residual1->is_unbound_predicate());
+  auto unbound1 = std::dynamic_pointer_cast<UnboundPredicate>(residual1);
+  ASSERT_NE(unbound1, nullptr);
+  EXPECT_EQ(unbound1->op(), Expression::Operation::kLt);
+  EXPECT_EQ(unbound1->reference()->name(), "HOUR");
+  // Access literal through literals() span
+  auto unbound1_impl =
+      
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound1);
+  ASSERT_NE(unbound1_impl, nullptr);
+  ASSERT_EQ(unbound1_impl->literals().size(), 1);
+  EXPECT_EQ(unbound1_impl->literals()[0].value(), Literal::Int(12).value());
+
+  // Equal to the lower date bound
+  PartitionValues partition_data2(Literal::Int(20170801));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual2, 
evaluator->ResidualFor(partition_data2));
+  ASSERT_TRUE(residual2->is_unbound_predicate());
+  auto unbound2 = std::dynamic_pointer_cast<UnboundPredicate>(residual2);
+  ASSERT_NE(unbound2, nullptr);
+  EXPECT_EQ(unbound2->op(), Expression::Operation::kGt);
+  EXPECT_EQ(unbound2->reference()->name(), "hOUr");
+  // Access literal through literals() span
+  auto unbound2_impl =
+      
std::dynamic_pointer_cast<UnboundPredicateImpl<BoundReference>>(unbound2);
+  ASSERT_NE(unbound2_impl, nullptr);
+  ASSERT_EQ(unbound2_impl->literals().size(), 1);
+  EXPECT_EQ(unbound2_impl->literals()[0].value(), Literal::Int(11).value());
+
+  // Inside the date range
+  PartitionValues partition_data3(Literal::Int(20170812));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual3, 
evaluator->ResidualFor(partition_data3));
+  EXPECT_EQ(residual3->op(), Expression::Operation::kTrue);
+
+  // Outside the date range
+  PartitionValues partition_data4(Literal::Int(20170817));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual4, 
evaluator->ResidualFor(partition_data4));
+  EXPECT_EQ(residual4->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(ResidualEvaluatorTest, UnpartitionedResiduals) {
+  std::vector<std::shared_ptr<Expression>> expressions = {
+      Expressions::AlwaysTrue(),
+      Expressions::AlwaysFalse(),
+      Expressions::LessThan("a", Literal::Int(5)),
+      Expressions::GreaterThanOrEqual("b", Literal::Int(16)),
+      Expressions::NotNull("c"),
+      Expressions::IsNull("d"),
+      Expressions::In("e", {Literal::Int(1), Literal::Int(2), 
Literal::Int(3)}),
+      Expressions::NotIn("f", {Literal::Int(1), Literal::Int(2), 
Literal::Int(3)}),
+      Expressions::NotNaN("g"),
+      Expressions::IsNaN("h"),
+      Expressions::StartsWith("data", "abcd"),
+      Expressions::NotStartsWith("data", "abcd")};
+
+  PartitionValues empty_partition;
+
+  for (const auto& expr : expressions) {
+    ICEBERG_UNWRAP_OR_FAIL(auto evaluator, 
ResidualEvaluator::Unpartitioned(expr));
+    ICEBERG_UNWRAP_OR_FAIL(auto residual, 
evaluator->ResidualFor(empty_partition));
+    // For unpartitioned tables, residual should be the original expression
+    EXPECT_EQ(residual->op(), expr->op());
+  }
+}
+
+TEST_F(ResidualEvaluatorTest, In) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "dateint", 
int32()),
+                               SchemaField::MakeOptional(51, "hour", int32())},
+      std::nullopt);
+
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field(50, 1000, "dateint", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  auto expr = Expressions::In("dateint", {Literal::Int(20170815), 
Literal::Int(20170816),
+                                          Literal::Int(20170817)});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                         ResidualEvaluator::Make(expr, *spec, *schema, true));
+
+  PartitionValues partition_data1(Literal::Int(20170815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual1, 
evaluator->ResidualFor(partition_data1));
+  EXPECT_EQ(residual1->op(), Expression::Operation::kTrue);
+
+  PartitionValues partition_data2(Literal::Int(20180815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual2, 
evaluator->ResidualFor(partition_data2));
+  EXPECT_EQ(residual2->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(ResidualEvaluatorTest, NotIn) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "dateint", 
int32()),
+                               SchemaField::MakeOptional(51, "hour", int32())},
+      std::nullopt);
+
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field(50, 1000, "dateint", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  auto expr = Expressions::NotIn(
+      "dateint",
+      {Literal::Int(20170815), Literal::Int(20170816), 
Literal::Int(20170817)});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator,
+                         ResidualEvaluator::Make(expr, *spec, *schema, true));
+
+  PartitionValues partition_data1(Literal::Int(20180815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual1, 
evaluator->ResidualFor(partition_data1));
+  EXPECT_EQ(residual1->op(), Expression::Operation::kTrue);
+
+  PartitionValues partition_data2(Literal::Int(20170815));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual2, 
evaluator->ResidualFor(partition_data2));
+  EXPECT_EQ(residual2->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(ResidualEvaluatorTest, IsNaN) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "double", 
float64()),
+                               SchemaField::MakeOptional(51, "float", 
float32())},
+      std::nullopt);
+
+  // Test double field
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field_double(50, 1000, "double", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_double_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field_double}, 
false));
+  auto spec_double = 
std::shared_ptr<PartitionSpec>(spec_double_unique.release());
+
+  auto expr_double = Expressions::IsNaN("double");
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto evaluator_double,
+      ResidualEvaluator::Make(expr_double, *spec_double, *schema, true));
+
+  PartitionValues partition_data_nan_double(Literal::Double(std::nan("")));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_double,
+                         
evaluator_double->ResidualFor(partition_data_nan_double));
+  EXPECT_EQ(residual_nan_double->op(), Expression::Operation::kTrue);
+
+  PartitionValues partition_data_double(Literal::Double(2.0));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_double,
+                         evaluator_double->ResidualFor(partition_data_double));
+  EXPECT_EQ(residual_double->op(), Expression::Operation::kFalse);
+
+  // Test float field
+  PartitionField pt_field_float(51, 1001, "float", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_float_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field_float}, 
false));
+  auto spec_float = 
std::shared_ptr<PartitionSpec>(spec_float_unique.release());
+
+  auto expr_float = Expressions::IsNaN("float");
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator_float,
+                         ResidualEvaluator::Make(expr_float, *spec_float, 
*schema, true));
+
+  PartitionValues partition_data_nan_float(Literal::Float(std::nanf("")));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_float,
+                         
evaluator_float->ResidualFor(partition_data_nan_float));
+  EXPECT_EQ(residual_nan_float->op(), Expression::Operation::kTrue);
+
+  PartitionValues partition_data_float(Literal::Float(3.0f));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_float,
+                         evaluator_float->ResidualFor(partition_data_float));
+  EXPECT_EQ(residual_float->op(), Expression::Operation::kFalse);
+}
+
+TEST_F(ResidualEvaluatorTest, NotNaN) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "double", 
float64()),
+                               SchemaField::MakeOptional(51, "float", 
float32())},
+      std::nullopt);
+
+  // Test double field
+  auto identity_transform = Transform::Identity();
+  PartitionField pt_field_double(50, 1000, "double", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_double_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field_double}, 
false));
+  auto spec_double = 
std::shared_ptr<PartitionSpec>(spec_double_unique.release());
+
+  auto expr_double = Expressions::NotNaN("double");
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto evaluator_double,
+      ResidualEvaluator::Make(expr_double, *spec_double, *schema, true));
+
+  PartitionValues partition_data_nan_double(Literal::Double(std::nan("")));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_double,
+                         
evaluator_double->ResidualFor(partition_data_nan_double));
+  EXPECT_EQ(residual_nan_double->op(), Expression::Operation::kFalse);
+
+  PartitionValues partition_data_double(Literal::Double(2.0));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_double,
+                         evaluator_double->ResidualFor(partition_data_double));
+  EXPECT_EQ(residual_double->op(), Expression::Operation::kTrue);
+
+  // Test float field
+  PartitionField pt_field_float(51, 1001, "float", identity_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_float_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field_float}, 
false));
+  auto spec_float = 
std::shared_ptr<PartitionSpec>(spec_float_unique.release());
+
+  auto expr_float = Expressions::NotNaN("float");
+  ICEBERG_UNWRAP_OR_FAIL(auto evaluator_float,
+                         ResidualEvaluator::Make(expr_float, *spec_float, 
*schema, true));
+
+  PartitionValues partition_data_nan_float(Literal::Float(std::nanf("")));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_float,
+                         
evaluator_float->ResidualFor(partition_data_nan_float));
+  EXPECT_EQ(residual_nan_float->op(), Expression::Operation::kFalse);
+
+  PartitionValues partition_data_float(Literal::Float(3.0f));
+  ICEBERG_UNWRAP_OR_FAIL(auto residual_float,
+                         evaluator_float->ResidualFor(partition_data_float));
+  EXPECT_EQ(residual_float->op(), Expression::Operation::kTrue);
+}
+
+TEST_F(ResidualEvaluatorTest, IntegerTruncateTransformResiduals) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "value", 
int32())},
+      std::nullopt);
+
+  // Valid partitions would be 0, 10, 20...90, 100 etc.
+  auto truncate_transform = Transform::Truncate(10);
+  PartitionField pt_field(50, 1000, "value", truncate_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  // Less than lower bound
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kFalse);
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::Int(100)),
+                   Literal::Int(100), Expression::Operation::kFalse);
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kTrue);
+
+  // Less than upper bound
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema, Expressions::LessThan("value", 
Literal::Int(99)),
+                          Literal::Int(90));
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kTrue);
+
+  // Less than equals lower bound
+  AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", 
Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::LessThanOrEqual("value", 
Literal::Int(100)),
+                          Literal::Int(100));
+  AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", 
Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kTrue);
+
+  // Less than equals upper bound
+  AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", 
Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kFalse);
+  AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", 
Literal::Int(99)),
+                   Literal::Int(90), Expression::Operation::kTrue);
+  AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", 
Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kTrue);
+
+  // Greater than lower bound
+  AssertResidualOp(spec, schema, Expressions::GreaterThan("value", 
Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::GreaterThan("value", Literal::Int(100)),
+                          Literal::Int(100));
+  AssertResidualOp(spec, schema, Expressions::GreaterThan("value", 
Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kFalse);
+
+  // Greater than upper bound
+  AssertResidualOp(spec, schema, Expressions::GreaterThan("value", 
Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kTrue);
+  AssertResidualOp(spec, schema, Expressions::GreaterThan("value", 
Literal::Int(99)),
+                   Literal::Int(90), Expression::Operation::kFalse);
+  AssertResidualOp(spec, schema, Expressions::GreaterThan("value", 
Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kFalse);
+
+  // Greater than equals lower bound
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kTrue);
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", Literal::Int(100)),
+                   Literal::Int(100), Expression::Operation::kTrue);
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kFalse);
+
+  // Greater than equals upper bound
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::GreaterThanOrEqual("value", 
Literal::Int(99)),
+                          Literal::Int(90));
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kFalse);
+
+  // Equal lower bound
+  AssertResidualOp(spec, schema, Expressions::Equal("value", 
Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema, Expressions::Equal("value", 
Literal::Int(100)),
+                          Literal::Int(100));
+  AssertResidualOp(spec, schema, Expressions::Equal("value", 
Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kFalse);
+
+  // Equal upper bound
+  AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema, Expressions::Equal("value", 
Literal::Int(99)),
+                          Literal::Int(90));
+  AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kFalse);
+
+  // Not equal lower bound
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::Int(100)),
+                   Literal::Int(110), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema, Expressions::NotEqual("value", 
Literal::Int(100)),
+                          Literal::Int(100));
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::Int(100)),
+                   Literal::Int(90), Expression::Operation::kTrue);
+
+  // Not equal upper bound
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::Int(99)),
+                   Literal::Int(100), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema, Expressions::NotEqual("value", 
Literal::Int(99)),
+                          Literal::Int(90));
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::Int(99)),
+                   Literal::Int(80), Expression::Operation::kTrue);
+}
+
+TEST_F(ResidualEvaluatorTest, StringTruncateTransformResiduals) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(50, "value", 
string())},
+      std::nullopt);
+
+  // Valid partitions would be two letter strings for eg: ab, bc etc
+  auto truncate_transform = Transform::Truncate(2);
+  PartitionField pt_field(50, 1000, "value", truncate_transform);
+  ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
+                         PartitionSpec::Make(*schema, 0, {pt_field}, false));
+  auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
+
+  // Less than
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::LessThan("value", 
Literal::String("bcd")),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema, Expressions::LessThan("value", 
Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kFalse);
+
+  // Less than equals
+  AssertResidualOp(spec, schema,
+                   Expressions::LessThanOrEqual("value", 
Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::LessThanOrEqual("value", 
Literal::String("bcd")),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema,
+                   Expressions::LessThanOrEqual("value", 
Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kFalse);
+
+  // Greater than
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThan("value", Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::GreaterThan("value", 
Literal::String("bcd")),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThan("value", Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kTrue);
+
+  // Greater than equals
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", 
Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kFalse);
+  AssertResidualPredicate(
+      spec, schema, Expressions::GreaterThanOrEqual("value", 
Literal::String("bcd")),
+      Literal::String("bc"));
+  AssertResidualOp(spec, schema,
+                   Expressions::GreaterThanOrEqual("value", 
Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kTrue);
+
+  // Equal
+  AssertResidualOp(spec, schema, Expressions::Equal("value", 
Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::Equal("value", Literal::String("bcd")),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema, Expressions::Equal("value", 
Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kFalse);
+
+  // Not equal
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::String("bcd")),
+                   Literal::String("ab"), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema,
+                          Expressions::NotEqual("value", 
Literal::String("bcd")),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema, Expressions::NotEqual("value", 
Literal::String("bcd")),
+                   Literal::String("cd"), Expression::Operation::kTrue);
+
+  // Starts with
+  AssertResidualOp(spec, schema, Expressions::StartsWith("value", "bcd"),
+                   Literal::String("ab"), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema, Expressions::StartsWith("value", 
"bcd"),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema, Expressions::StartsWith("value", "bcd"),
+                   Literal::String("cd"), Expression::Operation::kFalse);
+  AssertResidualPredicate(spec, schema, Expressions::StartsWith("value", 
"bcd"),
+                          Literal::String("bcdd"));
+
+  // Not starts with
+  AssertResidualOp(spec, schema, Expressions::NotStartsWith("value", "bcd"),
+                   Literal::String("ab"), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", 
"bcd"),
+                          Literal::String("bc"));
+  AssertResidualOp(spec, schema, Expressions::NotStartsWith("value", "bcd"),
+                   Literal::String("cd"), Expression::Operation::kTrue);
+  AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", 
"bcd"),
+                          Literal::String("bcd"));
+  AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", 
"bcd"),
+                          Literal::String("bcdd"));
+}
+
+}  // namespace iceberg

Reply via email to