This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 070576b  Implement `BoundPredicateVisitor` for `ManifestFilterVisitor` 
(#367)
070576b is described below

commit 070576bfb47a6a7205026c9dbfb3397a06cc2986
Author: Shabana Baig <[email protected]>
AuthorDate: Thu Jun 13 15:01:34 2024 -0400

    Implement `BoundPredicateVisitor` for `ManifestFilterVisitor` (#367)
    
    * Implement all functions of BoundPredicateVisitor for ManifestFilterVisitor
    
    * Fix code comments
    
    * Refactor code and fixpredicate for is_some_and
    
    * Refactor code
    
    * Handle review comments
    
    * Handle review comments
    
    * Handle review comments
    
    * Refactor code
---
 .../src/expr/visitors/manifest_evaluator.rs        | 1144 +++++++++++++++++---
 1 file changed, 1014 insertions(+), 130 deletions(-)

diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs 
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index c1b6dbe..30ae58f 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -17,8 +17,9 @@
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
 use crate::expr::{BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::spec::{Datum, FieldSummary, ManifestFile, PrimitiveLiteral, Type};
 use crate::Result;
+use crate::{Error, ErrorKind};
 use fnv::FnvHashSet;
 
 /// Evaluates a [`ManifestFile`] to see if the partition summaries
@@ -65,17 +66,19 @@ impl<'a> ManifestFilterVisitor<'a> {
     }
 }
 
-// Remove this annotation once all todos have been removed
-#[allow(unused_variables)]
+const ROWS_MIGHT_MATCH: Result<bool> = Ok(true);
+const ROWS_CANNOT_MATCH: Result<bool> = Ok(false);
+const IN_PREDICATE_LIMIT: usize = 200;
+
 impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
     type T = bool;
 
     fn always_true(&mut self) -> crate::Result<bool> {
-        Ok(true)
+        ROWS_MIGHT_MATCH
     }
 
     fn always_false(&mut self) -> crate::Result<bool> {
-        Ok(false)
+        ROWS_CANNOT_MATCH
     }
 
     fn and(&mut self, lhs: bool, rhs: bool) -> crate::Result<bool> {
@@ -103,7 +106,15 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        if ManifestFilterVisitor::are_all_null(field, 
&reference.field().field_type) {
+            ROWS_CANNOT_MATCH
+        } else {
+            ROWS_MIGHT_MATCH
+        }
     }
 
     fn is_nan(
@@ -111,10 +122,18 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        Ok(self
-            .field_summary_for_reference(reference)
-            .contains_nan
-            .is_some())
+        let field = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            if !contains_nan {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if ManifestFilterVisitor::are_all_null(field, 
&reference.field().field_type) {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_nan(
@@ -122,79 +141,210 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> 
{
         reference: &BoundReference,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        if let Some(contains_nan) = field.contains_nan {
+            // check if all values are nan
+            if contains_nan && !field.contains_null && 
field.lower_bound.is_none() {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+        ROWS_MIGHT_MATCH
     }
 
     fn less_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        match &field.lower_bound {
+            Some(bound) if datum <= bound => ROWS_CANNOT_MATCH,
+            Some(_) => ROWS_MIGHT_MATCH,
+            None => ROWS_CANNOT_MATCH,
+        }
     }
 
     fn less_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        match &field.lower_bound {
+            Some(bound) if datum < bound => ROWS_CANNOT_MATCH,
+            Some(_) => ROWS_MIGHT_MATCH,
+            None => ROWS_CANNOT_MATCH,
+        }
     }
 
     fn greater_than(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        match &field.upper_bound {
+            Some(bound) if datum >= bound => ROWS_CANNOT_MATCH,
+            Some(_) => ROWS_MIGHT_MATCH,
+            None => ROWS_CANNOT_MATCH,
+        }
     }
 
     fn greater_than_or_eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        match &field.upper_bound {
+            Some(bound) if datum > bound => ROWS_CANNOT_MATCH,
+            Some(_) => ROWS_MIGHT_MATCH,
+            None => ROWS_CANNOT_MATCH,
+        }
     }
 
     fn eq(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        if let Some(lower_bound) = &field.lower_bound {
+            if lower_bound > datum {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            if upper_bound < datum {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_eq(
         &mut self,
-        reference: &BoundReference,
-        literal: &Datum,
+        _reference: &BoundReference,
+        _datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        ROWS_MIGHT_MATCH
     }
 
     fn starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+
+        if field.lower_bound.is_none() || field.upper_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        let prefix = ManifestFilterVisitor::datum_as_str(
+            datum,
+            "Cannot perform starts_with on non-string value",
+        )?;
+        let prefix_len = prefix.len();
+
+        if let Some(lower_bound) = &field.lower_bound {
+            let lower_bound_str = ManifestFilterVisitor::datum_as_str(
+                lower_bound,
+                "Cannot perform starts_with on non-string lower bound",
+            )?;
+            let min_len = lower_bound_str.len().min(prefix_len);
+            if prefix.as_bytes().lt(&lower_bound_str.as_bytes()[..min_len]) {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            let upper_bound_str = ManifestFilterVisitor::datum_as_str(
+                upper_bound,
+                "Cannot perform starts_with on non-string upper bound",
+            )?;
+            let min_len = upper_bound_str.len().min(prefix_len);
+            if prefix.as_bytes().gt(&upper_bound_str.as_bytes()[..min_len]) {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_starts_with(
         &mut self,
         reference: &BoundReference,
-        literal: &Datum,
+        datum: &Datum,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+
+        if field.contains_null || field.lower_bound.is_none() || 
field.upper_bound.is_none() {
+            return ROWS_MIGHT_MATCH;
+        }
+
+        let prefix = ManifestFilterVisitor::datum_as_str(
+            datum,
+            "Cannot perform not_starts_with on non-string value",
+        )?;
+        let prefix_len = prefix.len();
+
+        // not_starts_with will match unless all values must start with the 
prefix. This happens when
+        // the lower and upper bounds both start with the prefix.
+        if let Some(lower_bound) = &field.lower_bound {
+            let lower_bound_str = ManifestFilterVisitor::datum_as_str(
+                lower_bound,
+                "Cannot perform not_starts_with on non-string lower bound",
+            )?;
+
+            // if lower is shorter than the prefix then lower doesn't start 
with the prefix
+            if prefix_len > lower_bound_str.len() {
+                return ROWS_MIGHT_MATCH;
+            }
+
+            if prefix
+                .as_bytes()
+                .eq(&lower_bound_str.as_bytes()[..prefix_len])
+            {
+                if let Some(upper_bound) = &field.upper_bound {
+                    let upper_bound_str = ManifestFilterVisitor::datum_as_str(
+                        upper_bound,
+                        "Cannot perform not_starts_with on non-string upper 
bound",
+                    )?;
+
+                    // if upper is shorter than the prefix then upper can't 
start with the prefix
+                    if prefix_len > upper_bound_str.len() {
+                        return ROWS_MIGHT_MATCH;
+                    }
+
+                    if prefix
+                        .as_bytes()
+                        .eq(&upper_bound_str.as_bytes()[..prefix_len])
+                    {
+                        return ROWS_CANNOT_MATCH;
+                    }
+                }
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn r#in(
@@ -203,16 +353,39 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
         literals: &FnvHashSet<Datum>,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        let field = self.field_summary_for_reference(reference);
+        if field.lower_bound.is_none() {
+            return ROWS_CANNOT_MATCH;
+        }
+
+        if literals.len() > IN_PREDICATE_LIMIT {
+            return ROWS_MIGHT_MATCH;
+        }
+
+        if let Some(lower_bound) = &field.lower_bound {
+            if literals.iter().all(|datum| lower_bound > datum) {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        if let Some(upper_bound) = &field.upper_bound {
+            if literals.iter().all(|datum| upper_bound < datum) {
+                return ROWS_CANNOT_MATCH;
+            }
+        }
+
+        ROWS_MIGHT_MATCH
     }
 
     fn not_in(
         &mut self,
-        reference: &BoundReference,
-        literals: &FnvHashSet<Datum>,
+        _reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
         _predicate: &BoundPredicate,
     ) -> crate::Result<bool> {
-        todo!()
+        // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        // them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is 
a value in col.
+        ROWS_MIGHT_MATCH
     }
 }
 
@@ -221,43 +394,222 @@ impl ManifestFilterVisitor<'_> {
         let pos = reference.accessor().position();
         &self.partitions[pos]
     }
+
+    fn are_all_null(field: &FieldSummary, r#type: &Type) -> bool {
+        // contains_null encodes whether at least one partition value is null,
+        // lowerBound is null if all partition values are null
+        let mut all_null: bool = field.contains_null && 
field.lower_bound.is_none();
+
+        if all_null && r#type.is_floating_type() {
+            // floating point types may include NaN values, which we check 
separately.
+            // In case bounds don't include NaN value, contains_nan needs to 
be checked against.
+            all_null = match field.contains_nan {
+                Some(val) => !val,
+                None => false,
+            }
+        }
+
+        all_null
+    }
+
+    fn datum_as_str<'a>(bound: &'a Datum, err_msg: &str) -> crate::Result<&'a 
String> {
+        let PrimitiveLiteral::String(bound) = bound.literal() else {
+            return Err(Error::new(ErrorKind::Unexpected, err_msg));
+        };
+        Ok(bound)
+    }
 }
 
 #[cfg(test)]
 mod test {
-    use crate::expr::visitors::inclusive_projection::InclusiveProjection;
     use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
     use crate::expr::{
-        Bind, BoundPredicate, Predicate, PredicateOperator, Reference, 
UnaryExpression,
+        BinaryExpression, Bind, Predicate, PredicateOperator, Reference, 
SetExpression,
+        UnaryExpression,
     };
     use crate::spec::{
-        FieldSummary, ManifestContentType, ManifestFile, NestedField, 
PartitionField,
-        PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, 
Transform, Type,
+        Datum, FieldSummary, ManifestContentType, ManifestFile, NestedField, 
PrimitiveType, Schema,
+        SchemaRef, Type,
     };
     use crate::Result;
+    use fnv::FnvHashSet;
+    use std::ops::Not;
     use std::sync::Arc;
 
-    fn create_schema_and_partition_spec() -> Result<(SchemaRef, 
PartitionSpecRef)> {
+    const INT_MIN_VALUE: i32 = 30;
+    const INT_MAX_VALUE: i32 = 79;
+
+    const STRING_MIN_VALUE: &str = "a";
+    const STRING_MAX_VALUE: &str = "z";
+
+    fn create_schema() -> Result<SchemaRef> {
         let schema = Schema::builder()
-            .with_fields(vec![Arc::new(NestedField::optional(
-                1,
-                "a",
-                Type::Primitive(PrimitiveType::Float),
-            ))])
+            .with_fields(vec![
+                Arc::new(NestedField::required(
+                    1,
+                    "id",
+                    Type::Primitive(PrimitiveType::Int),
+                )),
+                Arc::new(NestedField::optional(
+                    2,
+                    "all_nulls_missing_nan",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+                Arc::new(NestedField::optional(
+                    3,
+                    "some_nulls",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+                Arc::new(NestedField::optional(
+                    4,
+                    "no_nulls",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+                Arc::new(NestedField::optional(
+                    5,
+                    "float",
+                    Type::Primitive(PrimitiveType::Float),
+                )),
+                Arc::new(NestedField::optional(
+                    6,
+                    "all_nulls_double",
+                    Type::Primitive(PrimitiveType::Double),
+                )),
+                Arc::new(NestedField::optional(
+                    7,
+                    "all_nulls_no_nans",
+                    Type::Primitive(PrimitiveType::Float),
+                )),
+                Arc::new(NestedField::optional(
+                    8,
+                    "all_nans",
+                    Type::Primitive(PrimitiveType::Double),
+                )),
+                Arc::new(NestedField::optional(
+                    9,
+                    "both_nan_and_null",
+                    Type::Primitive(PrimitiveType::Float),
+                )),
+                Arc::new(NestedField::optional(
+                    10,
+                    "no_nan_or_null",
+                    Type::Primitive(PrimitiveType::Double),
+                )),
+                Arc::new(NestedField::optional(
+                    11,
+                    "all_nulls_missing_nan_float",
+                    Type::Primitive(PrimitiveType::Float),
+                )),
+                Arc::new(NestedField::optional(
+                    12,
+                    "all_same_value_or_null",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+                Arc::new(NestedField::optional(
+                    13,
+                    "no_nulls_same_value_a",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+            ])
             .build()?;
 
-        let spec = PartitionSpec::builder()
-            .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Identity)
-                .build()])
-            .build()
-            .unwrap();
+        Ok(Arc::new(schema))
+    }
 
-        Ok((Arc::new(schema), Arc::new(spec)))
+    fn create_partitions() -> Vec<FieldSummary> {
+        vec![
+            // id
+            FieldSummary {
+                contains_null: false,
+                contains_nan: None,
+                lower_bound: Some(Datum::int(INT_MIN_VALUE)),
+                upper_bound: Some(Datum::int(INT_MAX_VALUE)),
+            },
+            // all_nulls_missing_nan
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // some_nulls
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
+                upper_bound: Some(Datum::string(STRING_MAX_VALUE)),
+            },
+            // no_nulls
+            FieldSummary {
+                contains_null: false,
+                contains_nan: None,
+                lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
+                upper_bound: Some(Datum::string(STRING_MAX_VALUE)),
+            },
+            // float
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: Some(Datum::float(0.0)),
+                upper_bound: Some(Datum::float(20.0)),
+            },
+            // all_nulls_double
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // all_nulls_no_nans
+            FieldSummary {
+                contains_null: true,
+                contains_nan: Some(false),
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // all_nans
+            FieldSummary {
+                contains_null: false,
+                contains_nan: Some(true),
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // both_nan_and_null
+            FieldSummary {
+                contains_null: true,
+                contains_nan: Some(true),
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // no_nan_or_null
+            FieldSummary {
+                contains_null: false,
+                contains_nan: Some(false),
+                lower_bound: Some(Datum::float(0.0)),
+                upper_bound: Some(Datum::float(20.0)),
+            },
+            // all_nulls_missing_nan_float
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: None,
+                upper_bound: None,
+            },
+            // all_same_value_or_null
+            FieldSummary {
+                contains_null: true,
+                contains_nan: None,
+                lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
+                upper_bound: Some(Datum::string(STRING_MIN_VALUE)),
+            },
+            // no_nulls_same_value_a
+            FieldSummary {
+                contains_null: false,
+                contains_nan: None,
+                lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
+                upper_bound: Some(Datum::string(STRING_MIN_VALUE)),
+            },
+        ]
     }
 
     fn create_manifest_file(partitions: Vec<FieldSummary>) -> ManifestFile {
@@ -280,131 +632,663 @@ mod test {
         }
     }
 
-    fn create_partition_schema(
-        partition_spec: &PartitionSpecRef,
-        schema: &Schema,
-    ) -> Result<SchemaRef> {
-        let partition_type = partition_spec.partition_type(schema)?;
+    #[test]
+    fn test_always_true() -> Result<()> {
+        let case_sensitive = false;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
 
-        let partition_fields: Vec<_> = 
partition_type.fields().iter().map(Arc::clone).collect();
+        let filter = Predicate::AlwaysTrue.bind(schema.clone(), 
case_sensitive)?;
+
+        assert!(ManifestEvaluator::new(filter).eval(&manifest_file)?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_always_false() -> Result<()> {
+        let case_sensitive = false;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::AlwaysFalse.bind(schema.clone(), 
case_sensitive)?;
 
-        let partition_schema = Arc::new(
-            Schema::builder()
-                .with_schema_id(partition_spec.spec_id)
-                .with_fields(partition_fields)
-                .build()?,
+        assert!(!ManifestEvaluator::new(filter).eval(&manifest_file)?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_all_nulls() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        // all_nulls_missing_nan
+        let all_nulls_missing_nan_filter = 
Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNull,
+            Reference::new("all_nulls_missing_nan"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
!ManifestEvaluator::new(all_nulls_missing_nan_filter).eval(&manifest_file)?,
+            "Should skip: all nulls column with non-floating type contains all 
null"
         );
 
-        Ok(partition_schema)
+        // all_nulls_missing_nan_float
+        let all_nulls_missing_nan_float_filter = 
Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNull,
+            Reference::new("all_nulls_missing_nan_float"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_missing_nan_float_filter).eval(&manifest_file)?,
+            "Should read: no NaN information may indicate presence of NaN 
value"
+        );
+
+        // some_nulls
+        let some_nulls_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNull,
+            Reference::new("some_nulls"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(some_nulls_filter).eval(&manifest_file)?,
+            "Should read: column with some nulls contains a non-null value"
+        );
+
+        // no_nulls
+        let no_nulls_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNull,
+            Reference::new("no_nulls"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        assert!(
+            ManifestEvaluator::new(no_nulls_filter).eval(&manifest_file)?,
+            "Should read: non-null column contains a non-null value"
+        );
+
+        Ok(())
     }
 
-    fn create_partition_filter(
-        partition_spec: PartitionSpecRef,
-        partition_schema: SchemaRef,
-        filter: &BoundPredicate,
-        case_sensitive: bool,
-    ) -> Result<BoundPredicate> {
-        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
+    #[test]
+    fn test_no_nulls() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
 
-        let partition_filter = inclusive_projection
-            .project(filter)?
-            .rewrite_not()
-            .bind(partition_schema, case_sensitive)?;
+        // all_nulls_missing_nan
+        let all_nulls_missing_nan_filter = 
Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNull,
+            Reference::new("all_nulls_missing_nan"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_missing_nan_filter).eval(&manifest_file)?,
+            "Should read: at least one null value in all null column"
+        );
+
+        // some_nulls
+        let some_nulls_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNull,
+            Reference::new("some_nulls"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(some_nulls_filter).eval(&manifest_file)?,
+            "Should read: column with some nulls contains a null value"
+        );
+
+        // no_nulls
+        let no_nulls_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNull,
+            Reference::new("no_nulls"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+
+        assert!(
+            !ManifestEvaluator::new(no_nulls_filter).eval(&manifest_file)?,
+            "Should skip: non-null column contains no null values"
+        );
+
+        // both_nan_and_null
+        let both_nan_and_null_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNull,
+            Reference::new("both_nan_and_null"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(both_nan_and_null_filter).eval(&manifest_file)?,
+            "Should read: both_nan_and_null column contains no null values"
+        );
 
-        Ok(partition_filter)
+        Ok(())
     }
 
-    fn create_manifest_evaluator(
-        schema: SchemaRef,
-        partition_spec: PartitionSpecRef,
-        filter: &BoundPredicate,
-        case_sensitive: bool,
-    ) -> Result<ManifestEvaluator> {
-        let partition_schema = create_partition_schema(&partition_spec, 
&schema)?;
-        let partition_filter = create_partition_filter(
-            partition_spec,
-            partition_schema.clone(),
-            filter,
-            case_sensitive,
-        )?;
+    #[test]
+    fn test_is_nan() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        // float
+        let float_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("float"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(float_filter).eval(&manifest_file)?,
+            "Should read: no information on if there are nan value in float 
column"
+        );
+
+        // all_nulls_double
+        let all_nulls_double_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("all_nulls_double"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_double_filter).eval(&manifest_file)?,
+            "Should read: no NaN information may indicate presence of NaN 
value"
+        );
+
+        // all_nulls_missing_nan_float
+        let all_nulls_missing_nan_float_filter = 
Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("all_nulls_missing_nan_float"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_missing_nan_float_filter).eval(&manifest_file)?,
+            "Should read: no NaN information may indicate presence of NaN 
value"
+        );
+
+        // all_nulls_no_nans
+        let all_nulls_no_nans_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("all_nulls_no_nans"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
!ManifestEvaluator::new(all_nulls_no_nans_filter).eval(&manifest_file)?,
+            "Should skip: no nan column doesn't contain nan value"
+        );
+
+        // all_nans
+        let all_nans_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("all_nans"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(all_nans_filter).eval(&manifest_file)?,
+            "Should read: all_nans column contains nan value"
+        );
+
+        // both_nan_and_null
+        let both_nan_and_null_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("both_nan_and_null"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(both_nan_and_null_filter).eval(&manifest_file)?,
+            "Should read: both_nan_and_null column contains nan value"
+        );
 
-        Ok(ManifestEvaluator::new(partition_filter))
+        // no_nan_or_null
+        let no_nan_or_null_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::IsNan,
+            Reference::new("no_nan_or_null"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
!ManifestEvaluator::new(no_nan_or_null_filter).eval(&manifest_file)?,
+            "Should skip: no_nan_or_null column doesn't contain nan value"
+        );
+
+        Ok(())
     }
 
     #[test]
-    fn test_manifest_file_empty_partitions() -> Result<()> {
-        let case_sensitive = false;
+    fn test_not_nan() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        // float
+        let float_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("float"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(float_filter).eval(&manifest_file)?,
+            "Should read: no information on if there are nan value in float 
column"
+        );
 
-        let (schema, partition_spec) = create_schema_and_partition_spec()?;
+        // all_nulls_double
+        let all_nulls_double_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("all_nulls_double"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_double_filter).eval(&manifest_file)?,
+            "Should read: all null column contains non nan value"
+        );
 
-        let filter = Predicate::AlwaysTrue.bind(schema.clone(), 
case_sensitive)?;
+        // all_nulls_no_nans
+        let all_nulls_no_nans_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("all_nulls_no_nans"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(all_nulls_no_nans_filter).eval(&manifest_file)?,
+            "Should read: no_nans column contains non nan value"
+        );
 
-        let manifest_file = create_manifest_file(vec![]);
+        // all_nans
+        let all_nans_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("all_nans"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(all_nans_filter).eval(&manifest_file)?,
+            "Should skip: all nans column doesn't contain non nan value"
+        );
 
-        let manifest_evaluator =
-            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
+        // both_nan_and_null
+        let both_nan_and_null_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("both_nan_and_null"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(both_nan_and_null_filter).eval(&manifest_file)?,
+            "Should read: both_nan_and_null nans column contains non nan value"
+        );
+
+        // no_nan_or_null
+        let no_nan_or_null_filter = Predicate::Unary(UnaryExpression::new(
+            PredicateOperator::NotNan,
+            Reference::new("no_nan_or_null"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            
ManifestEvaluator::new(no_nan_or_null_filter).eval(&manifest_file)?,
+            "Should read: no_nan_or_null column contains non nan value"
+        );
 
-        let result = manifest_evaluator.eval(&manifest_file)?;
+        Ok(())
+    }
 
-        assert!(result);
+    #[test]
+    fn test_and() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .and(Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 30),
+        )))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: no information on if there are nan value in float 
column"
+        );
 
         Ok(())
     }
 
     #[test]
-    fn test_manifest_file_trivial_partition_passing_filter() -> Result<()> {
+    fn test_or() -> Result<()> {
         let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .or(Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("id"),
+            Datum::int(INT_MAX_VALUE + 1),
+        )))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should skip: or(false, false)"
+        );
 
-        let (schema, partition_spec) = create_schema_and_partition_spec()?;
+        Ok(())
+    }
 
-        let filter = Predicate::Unary(UnaryExpression::new(
-            PredicateOperator::IsNull,
-            Reference::new("a"),
+    #[test]
+    fn test_not() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
         ))
+        .not()
         .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: not(false)"
+        );
 
-        let manifest_file = create_manifest_file(vec![FieldSummary {
-            contains_null: true,
-            contains_nan: None,
-            lower_bound: None,
-            upper_bound: None,
-        }]);
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThan,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .not()
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should skip: not(true)"
+        );
 
-        let manifest_evaluator =
-            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
+        Ok(())
+    }
 
-        let result = manifest_evaluator.eval(&manifest_file)?;
+    #[test]
+    fn test_less_than() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThan,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id range below lower bound (5 < 30)"
+        );
 
-        assert!(result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_less_than_or_eq() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::LessThanOrEq,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id range below lower bound (5 < 30)"
+        );
 
         Ok(())
     }
 
     #[test]
-    fn test_manifest_file_trivial_partition_rejected_filter() -> Result<()> {
+    fn test_greater_than() -> Result<()> {
         let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThan,
+            Reference::new("id"),
+            Datum::int(INT_MAX_VALUE + 6),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id range above upper bound (85 < 79)"
+        );
 
-        let (schema, partition_spec) = create_schema_and_partition_spec()?;
+        Ok(())
+    }
 
-        let filter = Predicate::Unary(UnaryExpression::new(
-            PredicateOperator::IsNan,
-            Reference::new("a"),
+    #[test]
+    fn test_greater_than_or_eq() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("id"),
+            Datum::int(INT_MAX_VALUE + 6),
         ))
         .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id range above upper bound (85 < 79)"
+        );
 
-        let manifest_file = create_manifest_file(vec![FieldSummary {
-            contains_null: false,
-            contains_nan: None,
-            lower_bound: None,
-            upper_bound: None,
-        }]);
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::GreaterThanOrEq,
+            Reference::new("id"),
+            Datum::int(INT_MAX_VALUE),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: one possible id"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_eq() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::Eq,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id below lower bound"
+        );
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::Eq,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: id equal to lower bound"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_not_eq() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::NotEq,
+            Reference::new("id"),
+            Datum::int(INT_MIN_VALUE - 25),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: id below lower bound"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_in() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Set(SetExpression::new(
+            PredicateOperator::In,
+            Reference::new("id"),
+            FnvHashSet::from_iter(vec![
+                Datum::int(INT_MIN_VALUE - 25),
+                Datum::int(INT_MIN_VALUE - 24),
+            ]),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: id below lower bound (5 < 30, 6 < 30)"
+        );
+
+        let filter = Predicate::Set(SetExpression::new(
+            PredicateOperator::In,
+            Reference::new("id"),
+            FnvHashSet::from_iter(vec![
+                Datum::int(INT_MIN_VALUE - 1),
+                Datum::int(INT_MIN_VALUE),
+            ]),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: id equal to lower bound (30 == 30)"
+        );
 
-        let manifest_evaluator =
-            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
+        Ok(())
+    }
+
+    #[test]
+    fn test_not_in() -> Result<()> {
+        let case_sensitive = true;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Set(SetExpression::new(
+            PredicateOperator::NotIn,
+            Reference::new("id"),
+            FnvHashSet::from_iter(vec![
+                Datum::int(INT_MIN_VALUE - 25),
+                Datum::int(INT_MIN_VALUE - 24),
+            ]),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: id below lower bound (5 < 30, 6 < 30)"
+        );
 
-        let result = manifest_evaluator.eval(&manifest_file).unwrap();
+        Ok(())
+    }
 
-        assert!(!result);
+    #[test]
+    fn test_starts_with() -> Result<()> {
+        let case_sensitive = false;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::StartsWith,
+            Reference::new("some_nulls"),
+            Datum::string("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: range matches"
+        );
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::StartsWith,
+            Reference::new("some_nulls"),
+            Datum::string("zzzz"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should skip: range doesn't match"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_not_starts_with() -> Result<()> {
+        let case_sensitive = false;
+        let schema = create_schema()?;
+        let partitions = create_partitions();
+        let manifest_file = create_manifest_file(partitions);
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::NotStartsWith,
+            Reference::new("some_nulls"),
+            Datum::string("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should read: range matches"
+        );
+
+        let filter = Predicate::Binary(BinaryExpression::new(
+            PredicateOperator::NotStartsWith,
+            Reference::new("no_nulls_same_value_a"),
+            Datum::string("a"),
+        ))
+        .bind(schema.clone(), case_sensitive)?;
+        assert!(
+            !ManifestEvaluator::new(filter).eval(&manifest_file)?,
+            "Should not read: all values start with the prefix"
+        );
 
         Ok(())
     }


Reply via email to