This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 15e61f2 feat: add `ExpressionEvaluator` (#363)
15e61f2 is described below
commit 15e61f23198c4cc5d320d631e22e2fbc02d167c8
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Mon Jun 10 13:04:57 2024 +0200
feat: add `ExpressionEvaluator` (#363)
* refactor: add partition_schema_cache
* refactor: use context as param object
* fix: test setup
* refactor: clone only when cache miss
* chore: move derive stmts
* feat: add basic setup expression evaluator
* refactor: remove unused case_sensitive parameter
* chore: add doc
* refactor: remove partition_schema_cache
* refactor: move partition_filter into wider scope
* feat: add expression_evaluator_cache and apply in scan.rs
* chore: remove comment
* refactor: remove unused test setup fn
* feat: add basic test infr + simple predicate evaluation
* fix: clippy
* feat: impl `is_null` + `not_null`
* feat: impl `is_nan` + `not_nan`
* chore: change result type
* feat: impl `less_than` + `greater_than`
* chore: fix return type
* feat: impl `eq` + `not_eq`
* feat: impl `starts_with + `not_starts_with`
* feat: impl +
* chore: add tests for and and or expr
* chore: move test
* chore: remove unused_vars
* chore: update docs
* chore: update docs
* fix: typo
* refactor: compare datum instead of primitive literal
* refactor: use Result<Option> from accessor
* chore: remove unused fn
* fix: sdd sleep pattern matching
---
.../src/expr/visitors/expression_evaluator.rs | 804 +++++++++++++++++++++
.../src/expr/visitors/manifest_evaluator.rs | 24 -
crates/iceberg/src/expr/visitors/mod.rs | 1 +
crates/iceberg/src/scan.rs | 34 +-
4 files changed, 838 insertions(+), 25 deletions(-)
diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
new file mode 100644
index 0000000..b69d093
--- /dev/null
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -0,0 +1,804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use fnv::FnvHashSet;
+
+use crate::{
+ expr::{BoundPredicate, BoundReference},
+ spec::{DataFile, Datum, PrimitiveLiteral, Struct},
+ Error, ErrorKind, Result,
+};
+
+use super::bound_predicate_visitor::{visit, BoundPredicateVisitor};
+
+/// Evaluates a [`DataFile`]'s partition [`Struct`] to check
+/// if the partition tuples match the given [`BoundPredicate`].
+///
+/// Use within [`TableScan`] to prune the list of [`DataFile`]s
+/// that could potentially match the TableScan's filter.
+#[derive(Debug)]
+pub(crate) struct ExpressionEvaluator {
+ /// The provided partition filter.
+ partition_filter: BoundPredicate,
+}
+
+impl ExpressionEvaluator {
+ /// Creates a new [`ExpressionEvaluator`].
+ pub(crate) fn new(partition_filter: BoundPredicate) -> Self {
+ Self { partition_filter }
+ }
+
+ /// Evaluate this [`ExpressionEvaluator`]'s partition filter against
+ /// the provided [`DataFile`]'s partition [`Struct`]. Used by [`TableScan`]
+ /// to see if this [`DataFile`] could possibly contain data that matches
+ /// the scan's filter.
+ pub(crate) fn eval(&self, data_file: &DataFile) -> Result<bool> {
+ let mut visitor = ExpressionEvaluatorVisitor::new(self,
data_file.partition());
+
+ visit(&mut visitor, &self.partition_filter)
+ }
+}
+
+/// Acts as a visitor for [`ExpressionEvaluator`] to apply
+/// evaluation logic to different parts of a data structure,
+/// specifically for data file partitions.
+#[derive(Debug)]
+struct ExpressionEvaluatorVisitor<'a> {
+ /// Reference to an [`ExpressionEvaluator`].
+ expression_evaluator: &'a ExpressionEvaluator,
+ /// Reference to a [`DataFile`]'s partition [`Struct`].
+ partition: &'a Struct,
+}
+
+impl<'a> ExpressionEvaluatorVisitor<'a> {
+ /// Creates a new [`ExpressionEvaluatorVisitor`].
+ fn new(expression_evaluator: &'a ExpressionEvaluator, partition: &'a
Struct) -> Self {
+ Self {
+ expression_evaluator,
+ partition,
+ }
+ }
+}
+
+impl BoundPredicateVisitor for ExpressionEvaluatorVisitor<'_> {
+ type T = bool;
+
+ fn always_true(&mut self) -> Result<bool> {
+ Ok(true)
+ }
+
+ fn always_false(&mut self) -> Result<bool> {
+ Ok(false)
+ }
+
+ fn and(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
+ Ok(lhs && rhs)
+ }
+
+ fn or(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
+ Ok(lhs || rhs)
+ }
+
+ fn not(&mut self, _inner: bool) -> Result<bool> {
+ Err(Error::new(ErrorKind::Unexpected, "The evaluation of expressions
should not be performed against Predicates that contain a Not operator. Ensure
that \"Rewrite Not\" gets applied to the originating Predicate before binding
it."))
+ }
+
+ fn is_null(&mut self, reference: &BoundReference, _predicate:
&BoundPredicate) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(_) => Ok(false),
+ None => Ok(true),
+ }
+ }
+
+ fn not_null(
+ &mut self,
+ reference: &BoundReference,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(_) => Ok(true),
+ None => Ok(false),
+ }
+ }
+
+ fn is_nan(&mut self, reference: &BoundReference, _predicate:
&BoundPredicate) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(datum.is_nan()),
+ None => Ok(false),
+ }
+ }
+
+ fn not_nan(&mut self, reference: &BoundReference, _predicate:
&BoundPredicate) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(!datum.is_nan()),
+ None => Ok(true),
+ }
+ }
+
+ fn less_than(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum < literal),
+ None => Ok(false),
+ }
+ }
+
+ fn less_than_or_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum <= literal),
+ None => Ok(false),
+ }
+ }
+
+ fn greater_than(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum > literal),
+ None => Ok(false),
+ }
+ }
+
+ fn greater_than_or_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum >= literal),
+ None => Ok(false),
+ }
+ }
+
+ fn eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum == literal),
+ None => Ok(false),
+ }
+ }
+
+ fn not_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(&datum != literal),
+ None => Ok(true),
+ }
+ }
+
+ fn starts_with(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ let Some(datum) = reference.accessor().get(self.partition)? else {
+ return Ok(false);
+ };
+
+ match (datum.literal(), literal.literal()) {
+ (PrimitiveLiteral::String(d), PrimitiveLiteral::String(l)) =>
Ok(d.starts_with(l)),
+ _ => Ok(false),
+ }
+ }
+
+ fn not_starts_with(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ Ok(!self.starts_with(reference, literal, _predicate)?)
+ }
+
+ fn r#in(
+ &mut self,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(literals.contains(&datum)),
+ None => Ok(false),
+ }
+ }
+
+ fn not_in(
+ &mut self,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
+ _predicate: &BoundPredicate,
+ ) -> Result<bool> {
+ match reference.accessor().get(self.partition)? {
+ Some(datum) => Ok(!literals.contains(&datum)),
+ None => Ok(true),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{collections::HashMap, sync::Arc};
+
+ use fnv::FnvHashSet;
+ use predicate::SetExpression;
+
+ use crate::{
+ expr::{
+ predicate, visitors::inclusive_projection::InclusiveProjection,
BinaryExpression, Bind,
+ BoundPredicate, Predicate, PredicateOperator, Reference,
UnaryExpression,
+ },
+ spec::{
+ DataContentType, DataFile, DataFileFormat, Datum, Literal,
NestedField, PartitionField,
+ PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef,
Struct, Transform,
+ Type,
+ },
+ Result,
+ };
+
+ use super::ExpressionEvaluator;
+
+ fn create_schema_and_partition_spec(
+ r#type: PrimitiveType,
+ ) -> Result<(SchemaRef, PartitionSpecRef)> {
+ let schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::optional(
+ 1,
+ "a",
+ Type::Primitive(r#type),
+ ))])
+ .build()?;
+
+ let spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .field_id(1)
+ .transform(Transform::Identity)
+ .build()])
+ .build()
+ .unwrap();
+
+ Ok((Arc::new(schema), Arc::new(spec)))
+ }
+
+ fn create_partition_filter(
+ schema: &Schema,
+ partition_spec: PartitionSpecRef,
+ predicate: &BoundPredicate,
+ case_sensitive: bool,
+ ) -> Result<BoundPredicate> {
+ let partition_type = partition_spec.partition_type(schema)?;
+ let partition_fields = partition_type.fields().to_owned();
+
+ let partition_schema = Schema::builder()
+ .with_schema_id(partition_spec.spec_id)
+ .with_fields(partition_fields)
+ .build()?;
+
+ let mut inclusive_projection =
InclusiveProjection::new(partition_spec);
+
+ let partition_filter = inclusive_projection
+ .project(predicate)?
+ .rewrite_not()
+ .bind(Arc::new(partition_schema), case_sensitive)?;
+
+ Ok(partition_filter)
+ }
+
+ fn create_expression_evaluator(
+ schema: &Schema,
+ partition_spec: PartitionSpecRef,
+ predicate: &BoundPredicate,
+ case_sensitive: bool,
+ ) -> Result<ExpressionEvaluator> {
+ let partition_filter =
+ create_partition_filter(schema, partition_spec, predicate,
case_sensitive)?;
+
+ Ok(ExpressionEvaluator::new(partition_filter))
+ }
+
+ fn create_data_file_float() -> DataFile {
+ let partition = Struct::from_iter([Some(Literal::float(1.0))]);
+
+ DataFile {
+ content: DataContentType::Data,
+ file_path: "/test/path".to_string(),
+ file_format: DataFileFormat::Parquet,
+ partition,
+ record_count: 1,
+ file_size_in_bytes: 1,
+ column_sizes: HashMap::new(),
+ value_counts: HashMap::new(),
+ null_value_counts: HashMap::new(),
+ nan_value_counts: HashMap::new(),
+ lower_bounds: HashMap::new(),
+ upper_bounds: HashMap::new(),
+ key_metadata: vec![],
+ split_offsets: vec![],
+ equality_ids: vec![],
+ sort_order_id: None,
+ }
+ }
+
+ fn create_data_file_string() -> DataFile {
+ let partition = Struct::from_iter([Some(Literal::string("test str"))]);
+
+ DataFile {
+ content: DataContentType::Data,
+ file_path: "/test/path".to_string(),
+ file_format: DataFileFormat::Parquet,
+ partition,
+ record_count: 1,
+ file_size_in_bytes: 1,
+ column_sizes: HashMap::new(),
+ value_counts: HashMap::new(),
+ null_value_counts: HashMap::new(),
+ nan_value_counts: HashMap::new(),
+ lower_bounds: HashMap::new(),
+ upper_bounds: HashMap::new(),
+ key_metadata: vec![],
+ split_offsets: vec![],
+ equality_ids: vec![],
+ sort_order_id: None,
+ }
+ }
+
+ #[test]
+ fn test_expr_or() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::LessThan,
+ Reference::new("a"),
+ Datum::float(1.0),
+ ))
+ .or(Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::GreaterThanOrEq,
+ Reference::new("a"),
+ Datum::float(0.4),
+ )))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_and() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::LessThan,
+ Reference::new("a"),
+ Datum::float(1.1),
+ ))
+ .and(Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::GreaterThanOrEq,
+ Reference::new("a"),
+ Datum::float(0.4),
+ )))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_not_in() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Set(SetExpression::new(
+ PredicateOperator::NotIn,
+ Reference::new("a"),
+ FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2),
Datum::float(2.4)]),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_in() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Set(SetExpression::new(
+ PredicateOperator::In,
+ Reference::new("a"),
+ FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2),
Datum::float(2.4)]),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_not_starts_with() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::String)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::NotStartsWith,
+ Reference::new("a"),
+ Datum::string("not"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_string();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_starts_with() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::String)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::StartsWith,
+ Reference::new("a"),
+ Datum::string("test"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_string();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_not_eq() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::NotEq,
+ Reference::new("a"),
+ Datum::float(0.9),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_eq() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::Eq,
+ Reference::new("a"),
+ Datum::float(1.0),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_greater_than_or_eq() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::GreaterThanOrEq,
+ Reference::new("a"),
+ Datum::float(1.0),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_greater_than() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::GreaterThan,
+ Reference::new("a"),
+ Datum::float(0.9),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_less_than_or_eq() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::LessThanOrEq,
+ Reference::new("a"),
+ Datum::float(1.0),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_less_than() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+
+ let predicate = Predicate::Binary(BinaryExpression::new(
+ PredicateOperator::LessThan,
+ Reference::new("a"),
+ Datum::float(1.1),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_is_not_nan() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::NotNan,
+ Reference::new("a"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_is_nan() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::IsNan,
+ Reference::new("a"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(!result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_is_not_null() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::NotNull,
+ Reference::new("a"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_is_null() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::IsNull,
+ Reference::new("a"),
+ ))
+ .bind(schema.clone(), case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(!result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_always_false() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::AlwaysFalse.bind(schema.clone(),
case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(!result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_always_true() -> Result<()> {
+ let case_sensitive = true;
+ let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let predicate = Predicate::AlwaysTrue.bind(schema.clone(),
case_sensitive)?;
+
+ let expression_evaluator =
+ create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+
+ let data_file = create_data_file_float();
+
+ let result = expression_evaluator.eval(&data_file)?;
+
+ assert!(result);
+
+ Ok(())
+ }
+}
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 03075d7..c1b6dbe 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -260,30 +260,6 @@ mod test {
Ok((Arc::new(schema), Arc::new(spec)))
}
- fn create_schema_and_partition_spec_with_id_mismatch() ->
Result<(SchemaRef, PartitionSpecRef)>
- {
- let schema = Schema::builder()
- .with_fields(vec![Arc::new(NestedField::optional(
- 1,
- "a",
- Type::Primitive(PrimitiveType::Float),
- ))])
- .build()?;
-
- let spec = PartitionSpec::builder()
- .with_spec_id(999)
- .with_fields(vec![PartitionField::builder()
- .source_id(1)
- .name("a".to_string())
- .field_id(1)
- .transform(Transform::Identity)
- .build()])
- .build()
- .unwrap();
-
- Ok((Arc::new(schema), Arc::new(spec)))
- }
-
fn create_manifest_file(partitions: Vec<FieldSummary>) -> ManifestFile {
ManifestFile {
manifest_path: "/test/path".to_string(),
diff --git a/crates/iceberg/src/expr/visitors/mod.rs
b/crates/iceberg/src/expr/visitors/mod.rs
index 805f7dd..d686b11 100644
--- a/crates/iceberg/src/expr/visitors/mod.rs
+++ b/crates/iceberg/src/expr/visitors/mod.rs
@@ -16,6 +16,7 @@
// under the License.
pub(crate) mod bound_predicate_visitor;
+pub(crate) mod expression_evaluator;
pub(crate) mod inclusive_metrics_evaluator;
pub(crate) mod inclusive_projection;
pub(crate) mod manifest_evaluator;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 397633d..5f0922e 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,6 +18,7 @@
//! Table scan api.
use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
use
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
@@ -209,6 +210,7 @@ impl TableScan {
let mut partition_filter_cache = PartitionFilterCache::new();
let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
+ let mut expression_evaluator_cache = ExpressionEvaluatorCache::new();
Ok(try_stream! {
let manifest_list = context
@@ -244,7 +246,16 @@ impl TableScan {
futures::stream::iter(manifest.entries().iter().filter(|e|
e.is_alive()));
while let Some(manifest_entry) =
manifest_entries_stream.next().await {
- // TODO: Apply ExpressionEvaluator
+ let data_file = manifest_entry.data_file();
+
+ if let Some(partition_filter) = partition_filter {
+ let expression_evaluator =
expression_evaluator_cache.get(partition_spec_id, partition_filter);
+
+ if !expression_evaluator.eval(data_file)? {
+ continue;
+ }
+ }
+
if let Some(bound_predicate) = context.bound_filter() {
// reject any manifest entries whose data file's
metrics don't match the filter.
@@ -463,6 +474,27 @@ impl ManifestEvaluatorCache {
}
}
+/// Manages the caching of [`ExpressionEvaluator`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
+struct ExpressionEvaluatorCache(HashMap<i32, ExpressionEvaluator>);
+
+impl ExpressionEvaluatorCache {
+ /// Creates a new [`ExpressionEvaluatorCache`]
+ /// with an empty internal HashMap.
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+
+ /// Retrieves a [`ExpressionEvaluator`] from the cache
+ /// or computes it if not present.
+ fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut
ExpressionEvaluator {
+ self.0
+ .entry(spec_id)
+ .or_insert(ExpressionEvaluator::new(partition_filter.clone()))
+ }
+}
+
/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {