This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dfc548  Refactor: Extract `partition_filters` from 
`ManifestEvaluator` (#360)
7dfc548 is described below

commit 7dfc548ebe42569fd7ca2143a13afdaeff8a9883
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Tue Apr 30 10:18:44 2024 +0200

    Refactor: Extract `partition_filters` from `ManifestEvaluator` (#360)
    
    * refactor: extract inclusive_projection from manifest_evaluator
    
    * refactor: add FileScanStreamContext
    
    * refactor: create partition_spec and partition_schema
    
    * refactor: add cache structs
    
    * refactor: use entry in partition_file_cache
    
    * refactor: use result
    
    * chore: update docs + fmt
    
    * refactor: add bound_filter to FileScanStreamContext
    
    * refactor: return ref BoundPredicate
    
    * fix: return type PartitionSpecRef
    
    * refactor: remove spec_id runtime check
    
    * feat: add check for content_type data
---
 crates/iceberg/src/expr/predicate.rs               |  10 +-
 .../src/expr/visitors/manifest_evaluator.rs        | 347 ++++++++++-----------
 crates/iceberg/src/scan.rs                         | 275 ++++++++++++----
 3 files changed, 371 insertions(+), 261 deletions(-)

diff --git a/crates/iceberg/src/expr/predicate.rs 
b/crates/iceberg/src/expr/predicate.rs
index 6cdf4fc..1457d5a 100644
--- a/crates/iceberg/src/expr/predicate.rs
+++ b/crates/iceberg/src/expr/predicate.rs
@@ -32,7 +32,7 @@ use crate::spec::{Datum, SchemaRef};
 use crate::{Error, ErrorKind};
 
 /// Logical expression, such as `AND`, `OR`, `NOT`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub struct LogicalExpression<T, const N: usize> {
     inputs: [Box<T>; N],
 }
@@ -79,7 +79,7 @@ where
 }
 
 /// Unary predicate, for example, `a IS NULL`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub struct UnaryExpression<T> {
     /// Operator of this predicate, must be single operand operator.
     op: PredicateOperator,
@@ -126,7 +126,7 @@ impl<T> UnaryExpression<T> {
 }
 
 /// Binary predicate, for example, `a > 10`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub struct BinaryExpression<T> {
     /// Operator of this predicate, must be binary operator, such as `=`, `>`, 
`<`, etc.
     op: PredicateOperator,
@@ -184,7 +184,7 @@ impl<T: Bind> Bind for BinaryExpression<T> {
 }
 
 /// Set predicates, for example, `a in (1, 2, 3)`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub struct SetExpression<T> {
     /// Operator of this predicate, must be set operator, such as `IN`, `NOT 
IN`, etc.
     op: PredicateOperator,
@@ -613,7 +613,7 @@ impl Not for Predicate {
 }
 
 /// Bound predicate expression after binding to a schema.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub enum BoundPredicate {
     /// An expression always evaluates to true.
     AlwaysTrue,
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs 
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 16d6481..bcb5967 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -16,83 +16,39 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, 
SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::Result;
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a 
provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of 
[`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
-    pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
-        case_sensitive: bool,
-    ) -> crate::Result<Self> {
-        let partition_type = partition_spec.partition_type(&table_schema)?;
-
-        // this is needed as SchemaBuilder.with_fields expects an iterator over
-        // Arc<NestedField> rather than &Arc<NestedField>
-        let cloned_partition_fields: Vec<_> =
-            partition_type.fields().iter().map(Arc::clone).collect();
-
-        // The partition_schema's schema_id is set to the partition
-        // spec's spec_id here, and used to perform a sanity check
-        // during eval to confirm that it matches the spec_id
-        // of the ManifestFile we're evaluating
-        let partition_schema = Schema::builder()
-            .with_schema_id(partition_spec.spec_id)
-            .with_fields(cloned_partition_fields)
-            .build()?;
-
-        let partition_schema_ref = Arc::new(partition_schema);
-
-        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec.clone());
-        let unbound_partition_filter = inclusive_projection.project(&filter)?;
-
-        let partition_filter = unbound_partition_filter
-            .rewrite_not()
-            .bind(partition_schema_ref.clone(), case_sensitive)?;
-
-        Ok(Self {
-            partition_schema: partition_schema_ref,
+    pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) 
-> Self {
+        Self {
             partition_filter,
             case_sensitive,
-        })
+        }
     }
 
     /// Evaluate this `ManifestEvaluator`'s filter predicate against the
     /// provided [`ManifestFile`]'s partitions. Used by [`TableScan`] to
     /// see if this `ManifestFile` could possibly contain data that matches
     /// the scan's filter.
-    pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> 
crate::Result<bool> {
+    pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> Result<bool> {
         if manifest_file.partitions.is_empty() {
             return Ok(true);
         }
 
-        // The schema_id of self.partition_schema is set to the
-        // spec_id of the partition spec that this ManifestEvaluator
-        // was created from in ManifestEvaluator::new
-        if self.partition_schema.schema_id() != 
manifest_file.partition_spec_id {
-            return Err(Error::new(
-                ErrorKind::Unexpected,
-                format!(
-                    "Partition ID for manifest file '{}' does not match 
partition ID for the Scan",
-                    &manifest_file.manifest_path
-                ),
-            ));
-        }
-
         let mut evaluator = ManifestFilterVisitor::new(self, 
&manifest_file.partitions);
 
         visit(&mut evaluator, &self.partition_filter)
@@ -273,194 +229,211 @@ impl ManifestFilterVisitor<'_> {
 
 #[cfg(test)]
 mod test {
+    use crate::expr::visitors::inclusive_projection::InclusiveProjection;
     use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
-    use crate::expr::{Bind, Predicate, PredicateOperator, Reference, 
UnaryExpression};
+    use crate::expr::{
+        Bind, BoundPredicate, Predicate, PredicateOperator, Reference, 
UnaryExpression,
+    };
     use crate::spec::{
         FieldSummary, ManifestContentType, ManifestFile, NestedField, 
PartitionField,
-        PartitionSpec, PrimitiveType, Schema, Transform, Type,
+        PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, 
Transform, Type,
     };
+    use crate::Result;
     use std::sync::Arc;
 
-    #[test]
-    fn test_manifest_file_no_partitions() {
-        let (table_schema_ref, partition_spec_ref) = 
create_test_schema_and_partition_spec();
+    fn create_schema_and_partition_spec() -> Result<(SchemaRef, 
PartitionSpecRef)> {
+        let schema = Schema::builder()
+            .with_fields(vec![Arc::new(NestedField::optional(
+                1,
+                "a",
+                Type::Primitive(PrimitiveType::Float),
+            ))])
+            .build()?;
 
-        let partition_filter = Predicate::AlwaysTrue
-            .bind(table_schema_ref.clone(), false)
+        let spec = PartitionSpec::builder()
+            .with_spec_id(1)
+            .with_fields(vec![PartitionField::builder()
+                .source_id(1)
+                .name("a".to_string())
+                .field_id(1)
+                .transform(Transform::Identity)
+                .build()])
+            .build()
             .unwrap();
 
-        let case_sensitive = false;
+        Ok((Arc::new(schema), Arc::new(spec)))
+    }
 
-        let manifest_file_partitions = vec![];
-        let manifest_file = 
create_test_manifest_file(manifest_file_partitions);
+    fn create_schema_and_partition_spec_with_id_mismatch() -> 
Result<(SchemaRef, PartitionSpecRef)>
+    {
+        let schema = Schema::builder()
+            .with_fields(vec![Arc::new(NestedField::optional(
+                1,
+                "a",
+                Type::Primitive(PrimitiveType::Float),
+            ))])
+            .build()?;
 
-        let manifest_evaluator = ManifestEvaluator::new(
-            partition_spec_ref,
-            table_schema_ref,
-            partition_filter,
-            case_sensitive,
-        )
-        .unwrap();
+        let spec = PartitionSpec::builder()
+            .with_spec_id(999)
+            .with_fields(vec![PartitionField::builder()
+                .source_id(1)
+                .name("a".to_string())
+                .field_id(1)
+                .transform(Transform::Identity)
+                .build()])
+            .build()
+            .unwrap();
 
-        let result = manifest_evaluator.eval(&manifest_file).unwrap();
+        Ok((Arc::new(schema), Arc::new(spec)))
+    }
 
-        assert!(result);
+    fn create_manifest_file(partitions: Vec<FieldSummary>) -> ManifestFile {
+        ManifestFile {
+            manifest_path: "/test/path".to_string(),
+            manifest_length: 0,
+            partition_spec_id: 1,
+            content: ManifestContentType::Data,
+            sequence_number: 0,
+            min_sequence_number: 0,
+            added_snapshot_id: 0,
+            added_data_files_count: None,
+            existing_data_files_count: None,
+            deleted_data_files_count: None,
+            added_rows_count: None,
+            existing_rows_count: None,
+            deleted_rows_count: None,
+            partitions,
+            key_metadata: vec![],
+        }
+    }
+
+    fn create_partition_schema(
+        partition_spec: &PartitionSpecRef,
+        schema: &SchemaRef,
+    ) -> Result<SchemaRef> {
+        let partition_type = partition_spec.partition_type(schema)?;
+
+        let partition_fields: Vec<_> = 
partition_type.fields().iter().map(Arc::clone).collect();
+
+        let partition_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(partition_spec.spec_id)
+                .with_fields(partition_fields)
+                .build()?,
+        );
+
+        Ok(partition_schema)
+    }
+
+    fn create_partition_filter(
+        partition_spec: PartitionSpecRef,
+        partition_schema: SchemaRef,
+        filter: &BoundPredicate,
+        case_sensitive: bool,
+    ) -> Result<BoundPredicate> {
+        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
+
+        let partition_filter = inclusive_projection
+            .project(filter)?
+            .rewrite_not()
+            .bind(partition_schema, case_sensitive)?;
+
+        Ok(partition_filter)
+    }
+
+    fn create_manifest_evaluator(
+        schema: SchemaRef,
+        partition_spec: PartitionSpecRef,
+        filter: &BoundPredicate,
+        case_sensitive: bool,
+    ) -> Result<ManifestEvaluator> {
+        let partition_schema = create_partition_schema(&partition_spec, 
&schema)?;
+        let partition_filter = create_partition_filter(
+            partition_spec,
+            partition_schema.clone(),
+            filter,
+            case_sensitive,
+        )?;
+
+        Ok(ManifestEvaluator::new(partition_filter, case_sensitive))
     }
 
     #[test]
-    fn test_manifest_file_trivial_partition_passing_filter() {
-        let (table_schema_ref, partition_spec_ref) = 
create_test_schema_and_partition_spec();
+    fn test_manifest_file_empty_partitions() -> Result<()> {
+        let case_sensitive = false;
 
-        let partition_filter = Predicate::Unary(UnaryExpression::new(
-            PredicateOperator::IsNull,
-            Reference::new("a"),
-        ))
-        .bind(table_schema_ref.clone(), true)
-        .unwrap();
+        let (schema, partition_spec) = create_schema_and_partition_spec()?;
 
-        let manifest_file_partitions = vec![FieldSummary {
-            contains_null: true,
-            contains_nan: None,
-            lower_bound: None,
-            upper_bound: None,
-        }];
-        let manifest_file = 
create_test_manifest_file(manifest_file_partitions);
+        let filter = Predicate::AlwaysTrue.bind(schema.clone(), 
case_sensitive)?;
+
+        let manifest_file = create_manifest_file(vec![]);
 
         let manifest_evaluator =
-            ManifestEvaluator::new(partition_spec_ref, table_schema_ref, 
partition_filter, true)
-                .unwrap();
+            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
 
-        let result = manifest_evaluator.eval(&manifest_file).unwrap();
+        let result = manifest_evaluator.eval(&manifest_file)?;
 
         assert!(result);
+
+        Ok(())
     }
 
     #[test]
-    fn test_manifest_file_partition_id_mismatch_returns_error() {
-        let (table_schema_ref, partition_spec_ref) =
-            create_test_schema_and_partition_spec_with_id_mismatch();
+    fn test_manifest_file_trivial_partition_passing_filter() -> Result<()> {
+        let case_sensitive = true;
 
-        let partition_filter = Predicate::Unary(UnaryExpression::new(
+        let (schema, partition_spec) = create_schema_and_partition_spec()?;
+
+        let filter = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::IsNull,
             Reference::new("a"),
         ))
-        .bind(table_schema_ref.clone(), true)
-        .unwrap();
+        .bind(schema.clone(), case_sensitive)?;
 
-        let manifest_file_partitions = vec![FieldSummary {
+        let manifest_file = create_manifest_file(vec![FieldSummary {
             contains_null: true,
             contains_nan: None,
             lower_bound: None,
             upper_bound: None,
-        }];
-        let manifest_file = 
create_test_manifest_file(manifest_file_partitions);
+        }]);
 
         let manifest_evaluator =
-            ManifestEvaluator::new(partition_spec_ref, table_schema_ref, 
partition_filter, true)
-                .unwrap();
+            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
 
-        let result = manifest_evaluator.eval(&manifest_file);
+        let result = manifest_evaluator.eval(&manifest_file)?;
 
-        assert!(result.is_err());
+        assert!(result);
+
+        Ok(())
     }
 
     #[test]
-    fn test_manifest_file_trivial_partition_rejected_filter() {
-        let (table_schema_ref, partition_spec_ref) = 
create_test_schema_and_partition_spec();
+    fn test_manifest_file_trivial_partition_rejected_filter() -> Result<()> {
+        let case_sensitive = true;
+
+        let (schema, partition_spec) = create_schema_and_partition_spec()?;
 
-        let partition_filter = Predicate::Unary(UnaryExpression::new(
+        let filter = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::IsNan,
             Reference::new("a"),
         ))
-        .bind(table_schema_ref.clone(), true)
-        .unwrap();
+        .bind(schema.clone(), case_sensitive)?;
 
-        let manifest_file_partitions = vec![FieldSummary {
+        let manifest_file = create_manifest_file(vec![FieldSummary {
             contains_null: false,
             contains_nan: None,
             lower_bound: None,
             upper_bound: None,
-        }];
-        let manifest_file = 
create_test_manifest_file(manifest_file_partitions);
+        }]);
 
         let manifest_evaluator =
-            ManifestEvaluator::new(partition_spec_ref, table_schema_ref, 
partition_filter, true)
-                .unwrap();
+            create_manifest_evaluator(schema, partition_spec, &filter, 
case_sensitive)?;
 
         let result = manifest_evaluator.eval(&manifest_file).unwrap();
 
         assert!(!result);
-    }
-
-    fn create_test_schema_and_partition_spec() -> (Arc<Schema>, 
Arc<PartitionSpec>) {
-        let table_schema = Schema::builder()
-            .with_fields(vec![Arc::new(NestedField::optional(
-                1,
-                "a",
-                Type::Primitive(PrimitiveType::Float),
-            ))])
-            .build()
-            .unwrap();
-        let table_schema_ref = Arc::new(table_schema);
-
-        let partition_spec = PartitionSpec::builder()
-            .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Identity)
-                .build()])
-            .build()
-            .unwrap();
-        let partition_spec_ref = Arc::new(partition_spec);
-        (table_schema_ref, partition_spec_ref)
-    }
-
-    fn create_test_schema_and_partition_spec_with_id_mismatch() -> 
(Arc<Schema>, Arc<PartitionSpec>)
-    {
-        let table_schema = Schema::builder()
-            .with_fields(vec![Arc::new(NestedField::optional(
-                1,
-                "a",
-                Type::Primitive(PrimitiveType::Float),
-            ))])
-            .build()
-            .unwrap();
-        let table_schema_ref = Arc::new(table_schema);
 
-        let partition_spec = PartitionSpec::builder()
-            // Spec ID here deliberately doesn't match the one from 
create_test_manifest_file
-            .with_spec_id(999)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Identity)
-                .build()])
-            .build()
-            .unwrap();
-        let partition_spec_ref = Arc::new(partition_spec);
-        (table_schema_ref, partition_spec_ref)
-    }
-
-    fn create_test_manifest_file(manifest_file_partitions: Vec<FieldSummary>) 
-> ManifestFile {
-        ManifestFile {
-            manifest_path: "/test/path".to_string(),
-            manifest_length: 0,
-            partition_spec_id: 1,
-            content: ManifestContentType::Data,
-            sequence_number: 0,
-            min_sequence_number: 0,
-            added_snapshot_id: 0,
-            added_data_files_count: None,
-            existing_data_files_count: None,
-            deleted_data_files_count: None,
-            added_rows_count: None,
-            existing_rows_count: None,
-            deleted_rows_count: None,
-            partitions: manifest_file_partitions,
-            key_metadata: vec![],
-        }
+        Ok(())
     }
 }
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 36f71c1..b842522 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,21 +18,29 @@
 //! Table scan api.
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::inclusive_projection::InclusiveProjection;
 use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
-use crate::expr::{Bind, Predicate};
+use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
 use crate::spec::{
-    DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, 
TableMetadataRef,
+    DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, 
PartitionSpecRef, Schema,
+    SchemaRef, SnapshotRef, TableMetadataRef,
 };
 use crate::table::Table;
-use crate::{Error, ErrorKind};
+use crate::{Error, ErrorKind, Result};
 use arrow_array::RecordBatch;
 use async_stream::try_stream;
-use futures::stream::{iter, BoxStream};
+use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::collections::hash_map::Entry;
 use std::collections::HashMap;
 use std::sync::Arc;
 
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
+/// A stream of arrow [`RecordBatch`]es.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
+
 /// Builder to create table scan.
 pub struct TableScanBuilder<'a> {
     table: &'a Table,
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
     }
 
     /// Build the table scan.
-    pub fn build(self) -> crate::Result<TableScan> {
+    pub fn build(self) -> Result<TableScan> {
         let snapshot = match self.snapshot_id {
             Some(snapshot_id) => self
                 .table
@@ -169,55 +177,67 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        )?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator 
cache for an existing
-                // PartitionEvaluator that matches this manifest's partition 
spec ID.
-                // Use one from the cache if there is one. If not, create one, 
put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+                if !Self::content_type_is_data(entry) {
+                    continue;
+                }
+
+                if let Some(filter) = context.bound_filter() {
+                    let partition_spec_id = entry.partition_spec_id;
+
+                    let (partition_spec, partition_schema) =
+                        
context.create_partition_spec_and_schema(partition_spec_id)?;
+
+                    let partition_filter = partition_filter_cache.get(
+                        partition_spec_id,
+                        partition_spec,
+                        partition_schema.clone(),
+                        filter,
+                        context.case_sensitive,
+                    )?;
+
+                    let manifest_evaluator = manifest_evaluator_cache.get(
+                        partition_schema.schema_id(),
+                        partition_filter.clone(),
+                        context.case_sensitive,
+                    );
 
-                    // reject any manifest files whose partition values don't 
match the filter.
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
+
+                    // TODO: Create ExpressionEvaluator
                 }
 
-                let manifest = entry.load_manifest(&file_io).await?;
+                let manifest = entry.load_manifest(&context.file_io).await?;
+                let mut manifest_entries_stream =
+                    futures::stream::iter(manifest.entries().iter().filter(|e| 
e.is_alive()));
+
+                while let Some(manifest_entry) = 
manifest_entries_stream.next().await {
+                    // TODO: Apply ExpressionEvaluator
+                    // TODO: Apply InclusiveMetricsEvaluator::eval()
 
-                let mut manifest_entries = 
iter(manifest.entries().iter().filter(|e| e.is_alive()));
-                while let Some(manifest_entry) = manifest_entries.next().await 
{
                     match manifest_entry.content_type() {
                         DataContentType::EqualityDeletes | 
DataContentType::PositionDeletes => {
                             yield Err(Error::new(
@@ -226,7 +246,7 @@ impl TableScan {
                             ))?;
                         }
                         DataContentType::Data => {
-                            let scan_task: crate::Result<FileScanTask> = 
Ok(FileScanTask {
+                            let scan_task: Result<FileScanTask> = 
Ok(FileScanTask {
                                 data_manifest_entry: manifest_entry.clone(),
                                 start: 0,
                                 length: manifest_entry.file_size_in_bytes(),
@@ -240,29 +260,8 @@ impl TableScan {
         .boxed())
     }
 
-    fn create_manifest_evaluator(
-        id: i32,
-        schema: SchemaRef,
-        table_metadata: Arc<TableMetadata>,
-        case_sensitive: bool,
-        filter: &Predicate,
-    ) -> crate::Result<ManifestEvaluator> {
-        let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;
-
-        let partition_spec = 
table_metadata.partition_spec_by_id(id).ok_or(Error::new(
-            ErrorKind::Unexpected,
-            format!("Could not find partition spec for id {id}"),
-        ))?;
-
-        ManifestEvaluator::new(
-            partition_spec.clone(),
-            schema.clone(),
-            bound_predicate,
-            case_sensitive,
-        )
-    }
-
-    pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
+    /// Returns an [`ArrowRecordBatchStream`].
+    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
         let mut arrow_reader_builder =
             ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
 
@@ -312,6 +311,147 @@ impl TableScan {
 
         arrow_reader_builder.build().read(self.plan_files().await?)
     }
+
+    /// Checks whether the [`ManifestContentType`] is `Data` or not.
+    fn content_type_is_data(entry: &ManifestFile) -> bool {
+        if let ManifestContentType::Data = entry.content {
+            return true;
+        }
+        false
+    }
+}
+
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    bound_filter: Option<BoundPredicate>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Result<Self> {
+        let bound_filter = match filter {
+            Some(ref filter) => Some(filter.bind(schema.clone(), 
case_sensitive)?),
+            None => None,
+        };
+
+        Ok(Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            bound_filter,
+            case_sensitive,
+        })
+    }
+
+    /// Returns a reference to the [`BoundPredicate`] filter.
+    fn bound_filter(&self) -> Option<&BoundPredicate> {
+        self.bound_filter.as_ref()
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding [`Schema`] based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(PartitionSpecRef, SchemaRef)> {
+        let partition_spec =
+            self.table_metadata
+                .partition_spec_by_id(spec_id)
+                .ok_or(Error::new(
+                    ErrorKind::Unexpected,
+                    format!("Could not find partition spec for id {}", 
spec_id),
+                ))?;
+
+        let partition_type = partition_spec.partition_type(&self.schema)?;
+        let partition_fields = partition_type.fields().to_owned();
+        let partition_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(partition_spec.spec_id)
+                .with_fields(partition_fields)
+                .build()?,
+        );
+
+        Ok((partition_spec.clone(), partition_schema))
+    }
+}
+
+#[derive(Debug)]
+/// Manages the caching of [`BoundPredicate`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+struct PartitionFilterCache(HashMap<i32, BoundPredicate>);
+
+impl PartitionFilterCache {
+    /// Creates a new [`PartitionFilterCache`]
+    /// with an empty internal HashMap.
+    fn new() -> Self {
+        Self(HashMap::new())
+    }
+
+    /// Retrieves a [`BoundPredicate`] from the cache
+    /// or computes it if not present.
+    fn get(
+        &mut self,
+        spec_id: i32,
+        partition_spec: PartitionSpecRef,
+        partition_schema: SchemaRef,
+        filter: &BoundPredicate,
+        case_sensitive: bool,
+    ) -> Result<&BoundPredicate> {
+        match self.0.entry(spec_id) {
+            Entry::Occupied(e) => Ok(e.into_mut()),
+            Entry::Vacant(e) => {
+                let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
+
+                let partition_filter = inclusive_projection
+                    .project(filter)?
+                    .rewrite_not()
+                    .bind(partition_schema, case_sensitive)?;
+
+                Ok(e.insert(partition_filter))
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+/// Manages the caching of [`ManifestEvaluator`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);
+
+impl ManifestEvaluatorCache {
+    /// Creates a new [`ManifestEvaluatorCache`]
+    /// with an empty internal HashMap.
+    fn new() -> Self {
+        Self(HashMap::new())
+    }
+
+    /// Retrieves a [`ManifestEvaluator`] from the cache
+    /// or computes it if not present.
+    fn get(
+        &mut self,
+        spec_id: i32,
+        partition_filter: BoundPredicate,
+        case_sensitive: bool,
+    ) -> &mut ManifestEvaluator {
+        self.0
+            .entry(spec_id)
+            .or_insert(ManifestEvaluator::new(partition_filter, 
case_sensitive))
+    }
 }
 
 /// A task to scan part of file.
@@ -324,9 +464,6 @@ pub struct FileScanTask {
     length: u64,
 }
 
-/// A stream of arrow record batches.
-pub type ArrowRecordBatchStream = BoxStream<'static, 
crate::Result<RecordBatch>>;
-
 impl FileScanTask {
     pub fn data(&self) -> ManifestEntryRef {
         self.data_manifest_entry.clone()

Reply via email to