This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 15e61f2  feat: add `ExpressionEvaluator` (#363)
15e61f2 is described below

commit 15e61f23198c4cc5d320d631e22e2fbc02d167c8
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Mon Jun 10 13:04:57 2024 +0200

    feat: add `ExpressionEvaluator` (#363)
    
    * refactor: add partition_schema_cache
    
    * refactor: use context as param object
    
    * fix: test setup
    
    * refactor: clone only when cache miss
    
    * chore: move derive stmts
    
    * feat: add basic setup expression evaluator
    
    * refactor: remove unused case_sensitive parameter
    
    * chore: add doc
    
    * refactor: remove partition_schema_cache
    
    * refactor: move partition_filter into wider scope
    
    * feat: add expression_evaluator_cache and apply in scan.rs
    
    * chore: remove comment
    
    * refactor: remove unused test setup fn
    
    * feat: add basic test infr + simple predicate evaluation
    
    * fix: clippy
    
    * feat: impl `is_null` + `not_null`
    
    * feat: impl `is_nan` + `not_nan`
    
    * chore: change result type
    
    * feat: impl `less_than` + `greater_than`
    
    * chore: fix return type
    
    * feat: impl `eq` + `not_eq`
    
    * feat: impl `starts_with + `not_starts_with`
    
    * feat: impl  +
    
    * chore: add tests for and and or expr
    
    * chore: move test
    
    * chore: remove unused_vars
    
    * chore: update docs
    
    * chore: update docs
    
    * fix: typo
    
    * refactor: compare datum instead of primitive literal
    
    * refactor: use Result<Option> from accessor
    
    * chore: remove unused fn
    
    * fix: sdd sleep pattern matching
---
 .../src/expr/visitors/expression_evaluator.rs      | 804 +++++++++++++++++++++
 .../src/expr/visitors/manifest_evaluator.rs        |  24 -
 crates/iceberg/src/expr/visitors/mod.rs            |   1 +
 crates/iceberg/src/scan.rs                         |  34 +-
 4 files changed, 838 insertions(+), 25 deletions(-)

diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs 
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
new file mode 100644
index 0000000..b69d093
--- /dev/null
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -0,0 +1,804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use fnv::FnvHashSet;
+
+use crate::{
+    expr::{BoundPredicate, BoundReference},
+    spec::{DataFile, Datum, PrimitiveLiteral, Struct},
+    Error, ErrorKind, Result,
+};
+
+use super::bound_predicate_visitor::{visit, BoundPredicateVisitor};
+
+/// Evaluates a [`DataFile`]'s partition [`Struct`] to check
+/// if the partition tuples match the given [`BoundPredicate`].
+///
+/// Use within [`TableScan`] to prune the list of [`DataFile`]s
+/// that could potentially match the TableScan's filter.
+#[derive(Debug)]
+pub(crate) struct ExpressionEvaluator {
+    /// The provided partition filter.
+    partition_filter: BoundPredicate,
+}
+
+impl ExpressionEvaluator {
+    /// Creates a new [`ExpressionEvaluator`].
+    pub(crate) fn new(partition_filter: BoundPredicate) -> Self {
+        Self { partition_filter }
+    }
+
+    /// Evaluate this [`ExpressionEvaluator`]'s partition filter against
+    /// the provided [`DataFile`]'s partition [`Struct`]. Used by [`TableScan`]
+    /// to see if this [`DataFile`] could possibly contain data that matches
+    /// the scan's filter.
+    pub(crate) fn eval(&self, data_file: &DataFile) -> Result<bool> {
+        let mut visitor = ExpressionEvaluatorVisitor::new(self, 
data_file.partition());
+
+        visit(&mut visitor, &self.partition_filter)
+    }
+}
+
+/// Acts as a visitor for [`ExpressionEvaluator`] to apply
+/// evaluation logic to different parts of a data structure,
+/// specifically for data file partitions.
+#[derive(Debug)]
+struct ExpressionEvaluatorVisitor<'a> {
+    /// Reference to an [`ExpressionEvaluator`].
+    expression_evaluator: &'a ExpressionEvaluator,
+    /// Reference to a [`DataFile`]'s partition [`Struct`].
+    partition: &'a Struct,
+}
+
+impl<'a> ExpressionEvaluatorVisitor<'a> {
+    /// Creates a new [`ExpressionEvaluatorVisitor`].
+    fn new(expression_evaluator: &'a ExpressionEvaluator, partition: &'a 
Struct) -> Self {
+        Self {
+            expression_evaluator,
+            partition,
+        }
+    }
+}
+
+impl BoundPredicateVisitor for ExpressionEvaluatorVisitor<'_> {
+    type T = bool;
+
+    fn always_true(&mut self) -> Result<bool> {
+        Ok(true)
+    }
+
+    fn always_false(&mut self) -> Result<bool> {
+        Ok(false)
+    }
+
+    fn and(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
+        Ok(lhs && rhs)
+    }
+
+    fn or(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
+        Ok(lhs || rhs)
+    }
+
+    fn not(&mut self, _inner: bool) -> Result<bool> {
+        Err(Error::new(ErrorKind::Unexpected, "The evaluation of expressions 
should not be performed against Predicates that contain a Not operator. Ensure 
that \"Rewrite Not\" gets applied to the originating Predicate before binding 
it."))
+    }
+
+    fn is_null(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(_) => Ok(false),
+            None => Ok(true),
+        }
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(_) => Ok(true),
+            None => Ok(false),
+        }
+    }
+
+    fn is_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(datum.is_nan()),
+            None => Ok(false),
+        }
+    }
+
+    fn not_nan(&mut self, reference: &BoundReference, _predicate: 
&BoundPredicate) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(!datum.is_nan()),
+            None => Ok(true),
+        }
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum < literal),
+            None => Ok(false),
+        }
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum <= literal),
+            None => Ok(false),
+        }
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum > literal),
+            None => Ok(false),
+        }
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum >= literal),
+            None => Ok(false),
+        }
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum == literal),
+            None => Ok(false),
+        }
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(&datum != literal),
+            None => Ok(true),
+        }
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        let Some(datum) = reference.accessor().get(self.partition)? else {
+            return Ok(false);
+        };
+
+        match (datum.literal(), literal.literal()) {
+            (PrimitiveLiteral::String(d), PrimitiveLiteral::String(l)) => 
Ok(d.starts_with(l)),
+            _ => Ok(false),
+        }
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        Ok(!self.starts_with(reference, literal, _predicate)?)
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(literals.contains(&datum)),
+            None => Ok(false),
+        }
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<bool> {
+        match reference.accessor().get(self.partition)? {
+            Some(datum) => Ok(!literals.contains(&datum)),
+            None => Ok(true),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{collections::HashMap, sync::Arc};
+
+    use fnv::FnvHashSet;
+    use predicate::SetExpression;
+
+    use crate::{
+        expr::{
+            predicate, visitors::inclusive_projection::InclusiveProjection, 
BinaryExpression, Bind,
+            BoundPredicate, Predicate, PredicateOperator, Reference, 
UnaryExpression,
+        },
+        spec::{
+            DataContentType, DataFile, DataFileFormat, Datum, Literal, 
NestedField, PartitionField,
+            PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, 
Struct, Transform,
+            Type,
+        },
+        Result,
+    };
+
+    use super::ExpressionEvaluator;
+
+    fn create_schema_and_partition_spec(
+        r#type: PrimitiveType,
+    ) -> Result<(SchemaRef, PartitionSpecRef)> {
+        let schema = Schema::builder()
+            .with_fields(vec![Arc::new(NestedField::optional(
+                1,
+                "a",
+                Type::Primitive(r#type),
+            ))])
+            .build()?;
+
+        let spec = PartitionSpec::builder()
+            .with_spec_id(1)
+            .with_fields(vec![PartitionField::builder()
+                .source_id(1)
+                .name("a".to_string())
+                .field_id(1)
+                .transform(Transform::Identity)
+                .build()])
+            .build()
+            .unwrap();
+
+        Ok((Arc::new(schema), Arc::new(spec)))
+    }
+
+    fn create_partition_filter(
+        schema: &Schema,
+        partition_spec: PartitionSpecRef,
+        predicate: &BoundPredicate,
+        case_sensitive: bool,
+    ) -> Result<BoundPredicate> {
+        let partition_type = partition_spec.partition_type(schema)?;
+        let partition_fields = partition_type.fields().to_owned();
+
+        let partition_schema = Schema::builder()
+            .with_schema_id(partition_spec.spec_id)
+            .with_fields(partition_fields)
+            .build()?;
+
+        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
+
+        let partition_filter = inclusive_projection
+            .project(predicate)?
+            .rewrite_not()
+            .bind(Arc::new(partition_schema), case_sensitive)?;
+
+        Ok(partition_filter)
+    }
+
+    fn create_expression_evaluator(
+        schema: &Schema,
+        partition_spec: PartitionSpecRef,
+        predicate: &BoundPredicate,
+        case_sensitive: bool,
+    ) -> Result<ExpressionEvaluator> {
+        let partition_filter =
+            create_partition_filter(schema, partition_spec, predicate, 
case_sensitive)?;
+
+        Ok(ExpressionEvaluator::new(partition_filter))
+    }
+
+    fn create_data_file_float() -> DataFile {
+        let partition = Struct::from_iter([Some(Literal::float(1.0))]);
+
+        DataFile {
+            content: DataContentType::Data,
+            file_path: "/test/path".to_string(),
+            file_format: DataFileFormat::Parquet,
+            partition,
+            record_count: 1,
+            file_size_in_bytes: 1,
+            column_sizes: HashMap::new(),
+            value_counts: HashMap::new(),
+            null_value_counts: HashMap::new(),
+            nan_value_counts: HashMap::new(),
+            lower_bounds: HashMap::new(),
+            upper_bounds: HashMap::new(),
+            key_metadata: vec![],
+            split_offsets: vec![],
+            equality_ids: vec![],
+            sort_order_id: None,
+        }
+    }
+
+    fn create_data_file_string() -> DataFile {
+        let partition = Struct::from_iter([Some(Literal::string("test str"))]);
+
+        DataFile {
+            content: DataContentType::Data,
+            file_path: "/test/path".to_string(),
+            file_format: DataFileFormat::Parquet,
+            partition,
+            record_count: 1,
+            file_size_in_bytes: 1,
+            column_sizes: HashMap::new(),
+            value_counts: HashMap::new(),
+            null_value_counts: HashMap::new(),
+            nan_value_counts: HashMap::new(),
+            lower_bounds: HashMap::new(),
+            upper_bounds: HashMap::new(),
+            key_metadata: vec![],
+            split_offsets: vec![],
+            equality_ids: vec![],
+            sort_order_id: None,
+        }
+    }
+
+    #[test]
+    fn test_expr_or() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("a"),
+            Datum::float(1.0),
+        ))
+        .or(Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("a"),
+            Datum::float(0.4),
+        )))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_and() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("a"),
+            Datum::float(1.1),
+        ))
+        .and(Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("a"),
+            Datum::float(0.4),
+        )))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_not_in() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Set(SetExpression::new(
+            PredicateOperator::NotIn,
+            Reference::new("a"),
+            FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2), 
Datum::float(2.4)]),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_in() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Set(SetExpression::new(
+            PredicateOperator::In,
+            Reference::new("a"),
+            FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2), 
Datum::float(2.4)]),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_not_starts_with() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::String)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::NotStartsWith,
+            Reference::new("a"),
+            Datum::string("not"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_string();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_starts_with() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::String)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::StartsWith,
+            Reference::new("a"),
+            Datum::string("test"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_string();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_not_eq() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::NotEq,
+            Reference::new("a"),
+            Datum::float(0.9),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_eq() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::Eq,
+            Reference::new("a"),
+            Datum::float(1.0),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_greater_than_or_eq() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("a"),
+            Datum::float(1.0),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_greater_than() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThan,
+            Reference::new("a"),
+            Datum::float(0.9),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_less_than_or_eq() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThanOrEq,
+            Reference::new("a"),
+            Datum::float(1.0),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_less_than() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+        let predicate = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("a"),
+            Datum::float(1.1),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_is_not_nan() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_is_nan() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(!result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_is_not_null() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNull,
+            Reference::new("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_is_null() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNull,
+            Reference::new("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(!result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_always_false() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::AlwaysFalse.bind(schema.clone(), 
case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(!result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_always_true() -> Result<()> {
+        let case_sensitive = true;
+        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let predicate = Predicate::AlwaysTrue.bind(schema.clone(), 
case_sensitive)?;
+
+        let expression_evaluator =
+            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+
+        let data_file = create_data_file_float();
+
+        let result = expression_evaluator.eval(&data_file)?;
+
+        assert!(result);
+
+        Ok(())
+    }
+}
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs 
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 03075d7..c1b6dbe 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -260,30 +260,6 @@ mod test {
         Ok((Arc::new(schema), Arc::new(spec)))
     }
 
-    fn create_schema_and_partition_spec_with_id_mismatch() -> 
Result<(SchemaRef, PartitionSpecRef)>
-    {
-        let schema = Schema::builder()
-            .with_fields(vec![Arc::new(NestedField::optional(
-                1,
-                "a",
-                Type::Primitive(PrimitiveType::Float),
-            ))])
-            .build()?;
-
-        let spec = PartitionSpec::builder()
-            .with_spec_id(999)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Identity)
-                .build()])
-            .build()
-            .unwrap();
-
-        Ok((Arc::new(schema), Arc::new(spec)))
-    }
-
     fn create_manifest_file(partitions: Vec<FieldSummary>) -> ManifestFile {
         ManifestFile {
             manifest_path: "/test/path".to_string(),
diff --git a/crates/iceberg/src/expr/visitors/mod.rs 
b/crates/iceberg/src/expr/visitors/mod.rs
index 805f7dd..d686b11 100644
--- a/crates/iceberg/src/expr/visitors/mod.rs
+++ b/crates/iceberg/src/expr/visitors/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 pub(crate) mod bound_predicate_visitor;
+pub(crate) mod expression_evaluator;
 pub(crate) mod inclusive_metrics_evaluator;
 pub(crate) mod inclusive_projection;
 pub(crate) mod manifest_evaluator;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 397633d..5f0922e 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,6 +18,7 @@
 //! Table scan api.
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
 use 
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
 use crate::expr::visitors::inclusive_projection::InclusiveProjection;
 use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
@@ -209,6 +210,7 @@ impl TableScan {
 
         let mut partition_filter_cache = PartitionFilterCache::new();
         let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
+        let mut expression_evaluator_cache = ExpressionEvaluatorCache::new();
 
         Ok(try_stream! {
             let manifest_list = context
@@ -244,7 +246,16 @@ impl TableScan {
                     futures::stream::iter(manifest.entries().iter().filter(|e| 
e.is_alive()));
 
                 while let Some(manifest_entry) = 
manifest_entries_stream.next().await {
-                    // TODO: Apply ExpressionEvaluator
+                    let data_file = manifest_entry.data_file();
+
+                    if let Some(partition_filter) = partition_filter {
+                        let expression_evaluator = 
expression_evaluator_cache.get(partition_spec_id, partition_filter);
+
+                        if !expression_evaluator.eval(data_file)? {
+                            continue;
+                        }
+                    }
+
 
                     if let Some(bound_predicate) = context.bound_filter() {
                         // reject any manifest entries whose data file's 
metrics don't match the filter.
@@ -463,6 +474,27 @@ impl ManifestEvaluatorCache {
     }
 }
 
+/// Manages the caching of [`ExpressionEvaluator`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
+struct ExpressionEvaluatorCache(HashMap<i32, ExpressionEvaluator>);
+
+impl ExpressionEvaluatorCache {
+    /// Creates a new [`ExpressionEvaluatorCache`]
+    /// with an empty internal HashMap.
+    fn new() -> Self {
+        Self(HashMap::new())
+    }
+
+    /// Retrieves a [`ExpressionEvaluator`] from the cache
+    /// or computes it if not present.
+    fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut 
ExpressionEvaluator {
+        self.0
+            .entry(spec_id)
+            .or_insert(ExpressionEvaluator::new(partition_filter.clone()))
+    }
+}
+
 /// A task to scan part of file.
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct FileScanTask {

Reply via email to