This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new abe3a64  Concurrent table scans (#373)
abe3a64 is described below

commit abe3a64a4ed4c216538cbe1ab4d3dfe30469ced3
Author: Scott Donnelly <[email protected]>
AuthorDate: Wed Aug 7 01:47:27 2024 +0100

    Concurrent table scans (#373)
    
    * feat: concurrent table scans
    
    * refactor: remove TableScanConfig.
---
 Cargo.toml                               |   1 +
 crates/iceberg/Cargo.toml                |   1 +
 crates/iceberg/src/error.rs              |   6 +
 crates/iceberg/src/scan.rs               | 734 ++++++++++++++++++++++---------
 crates/iceberg/src/spec/manifest.rs      |   6 +
 crates/iceberg/src/spec/manifest_list.rs |   5 +
 6 files changed, 544 insertions(+), 209 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 642c99b..bc7a083 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -66,6 +66,7 @@ itertools = "0.13"
 log = "^0.4"
 mockito = "^1"
 murmur3 = "0.5.2"
+num_cpus = "1"
 once_cell = "1"
 opendal = "0.48"
 ordered-float = "4.0.0"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index de5b7cd..18e25e9 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -61,6 +61,7 @@ fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
 murmur3 = { workspace = true }
+num_cpus = { workspace = true }
 once_cell = { workspace = true }
 opendal = { workspace = true }
 ordered-float = { workspace = true }
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 6f7fd7c..2b69b47 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -331,6 +331,12 @@ define_from_err!(
     "Failed to read a Parquet file"
 );
 
+define_from_err!(
+    futures::channel::mpsc::SendError,
+    ErrorKind::Unexpected,
+    "Failed to send a message to a channel"
+);
+
 define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
 
 /// Converts a timestamp in milliseconds to `DateTime<Utc>`, handling errors.
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 18489b7..ebb3b00 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -17,14 +17,13 @@
 
 //! Table scan api.
 
-use std::collections::hash_map::Entry;
 use std::collections::HashMap;
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
 
 use arrow_array::RecordBatch;
-use async_stream::try_stream;
+use futures::channel::mpsc::{channel, Sender};
 use futures::stream::BoxStream;
-use futures::StreamExt;
+use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
 use serde::{Deserialize, Serialize};
 
 use crate::arrow::ArrowReaderBuilder;
@@ -34,9 +33,10 @@ use 
crate::expr::visitors::inclusive_projection::InclusiveProjection;
 use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
 use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
+use crate::runtime::spawn;
 use crate::spec::{
-    DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, 
SnapshotRef,
-    TableMetadataRef,
+    DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, 
ManifestList, Schema,
+    SchemaRef, SnapshotRef, TableMetadataRef,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -55,10 +55,14 @@ pub struct TableScanBuilder<'a> {
     batch_size: Option<usize>,
     case_sensitive: bool,
     filter: Option<Predicate>,
+    concurrency_limit_manifest_files: usize,
+    concurrency_limit_manifest_entries: usize,
 }
 
 impl<'a> TableScanBuilder<'a> {
     pub(crate) fn new(table: &'a Table) -> Self {
+        let num_cpus = num_cpus::get();
+
         Self {
             table,
             column_names: vec![],
@@ -66,6 +70,8 @@ impl<'a> TableScanBuilder<'a> {
             batch_size: None,
             case_sensitive: true,
             filter: None,
+            concurrency_limit_manifest_files: num_cpus,
+            concurrency_limit_manifest_entries: num_cpus,
         }
     }
 
@@ -111,6 +117,26 @@ impl<'a> TableScanBuilder<'a> {
         self
     }
 
+    /// Sets the concurrency limit for both manifest files and manifest
+    /// entries for this scan
+    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
+        self.concurrency_limit_manifest_files = limit;
+        self.concurrency_limit_manifest_entries = limit;
+        self
+    }
+
+    /// Sets the manifest file concurrency limit for this scan
+    pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> 
Self {
+        self.concurrency_limit_manifest_files = limit;
+        self
+    }
+
+    /// Sets the manifest entry concurrency limit for this scan
+    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> 
Self {
+        self.concurrency_limit_manifest_entries = limit;
+        self
+    }
+
     /// Build the table scan.
     pub fn build(self) -> Result<TableScan> {
         let snapshot = match self.snapshot_id {
@@ -155,12 +181,6 @@ impl<'a> TableScanBuilder<'a> {
             }
         }
 
-        let bound_predicates = if let Some(ref predicates) = self.filter {
-            Some(predicates.bind(schema.clone(), true)?)
-        } else {
-            None
-        };
-
         let mut field_ids = vec![];
         for column_name in &self.column_names {
             let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
@@ -199,17 +219,33 @@ impl<'a> TableScanBuilder<'a> {
             field_ids.push(field_id);
         }
 
-        Ok(TableScan {
+        let snapshot_bound_predicate = if let Some(ref predicates) = 
self.filter {
+            Some(predicates.bind(schema.clone(), true)?)
+        } else {
+            None
+        };
+
+        let plan_context = PlanContext {
             snapshot,
-            file_io: self.table.file_io().clone(),
             table_metadata: self.table.metadata_ref(),
-            column_names: self.column_names,
-            field_ids,
-            bound_predicates,
-            schema,
-            batch_size: self.batch_size,
+            snapshot_schema: schema,
             case_sensitive: self.case_sensitive,
-            filter: self.filter.map(Arc::new),
+            predicate: self.filter.map(Arc::new),
+            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
+            file_io: self.table.file_io().clone(),
+            field_ids: Arc::new(field_ids),
+            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
+            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
+            expression_evaluator_cache: 
Arc::new(ExpressionEvaluatorCache::new()),
+        };
+
+        Ok(TableScan {
+            batch_size: self.batch_size,
+            column_names: self.column_names,
+            concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
+            file_io: self.table.file_io().clone(),
+            plan_context,
+            concurrency_limit_manifest_entries: 
self.concurrency_limit_manifest_entries,
         })
     }
 }
@@ -217,116 +253,85 @@ impl<'a> TableScanBuilder<'a> {
 /// Table scan.
 #[derive(Debug)]
 pub struct TableScan {
-    snapshot: SnapshotRef,
-    table_metadata: TableMetadataRef,
+    plan_context: PlanContext,
+    batch_size: Option<usize>,
     file_io: FileIO,
     column_names: Vec<String>,
-    field_ids: Vec<i32>,
-    bound_predicates: Option<BoundPredicate>,
-    schema: SchemaRef,
-    batch_size: Option<usize>,
+    /// The maximum number of manifest files that will be
+    /// retrieved from [`FileIO`] concurrently
+    concurrency_limit_manifest_files: usize,
+
+    /// The maximum number of [`ManifestEntry`]s that will
+    /// be processed in parallel
+    concurrency_limit_manifest_entries: usize,
+}
+
+/// PlanContext wraps a [`SnapshotRef`] alongside all the other
+/// objects that are required to perform a scan file plan.
+#[derive(Debug)]
+struct PlanContext {
+    snapshot: SnapshotRef,
+
+    table_metadata: TableMetadataRef,
+    snapshot_schema: SchemaRef,
     case_sensitive: bool,
-    filter: Option<Arc<Predicate>>,
+    predicate: Option<Arc<Predicate>>,
+    snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
+    file_io: FileIO,
+    field_ids: Arc<Vec<i32>>,
+
+    partition_filter_cache: Arc<PartitionFilterCache>,
+    manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
+    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
 }
 
 impl TableScan {
     /// Returns a stream of [`FileScanTask`]s.
     pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
-        let context = FileScanStreamContext::new(
-            self.schema.clone(),
-            self.snapshot.clone(),
-            self.table_metadata.clone(),
-            self.file_io.clone(),
-            self.filter.clone(),
-            self.case_sensitive,
-        )?;
-
-        let mut partition_filter_cache = PartitionFilterCache::new();
-        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
-        let mut expression_evaluator_cache = ExpressionEvaluatorCache::new();
-
-        let field_ids = self.field_ids.clone();
-        let bound_predicates = self.bound_predicates.clone();
-
-        Ok(try_stream! {
-            let manifest_list = context
-                .snapshot
-                .load_manifest_list(&context.file_io, &context.table_metadata)
-                .await?;
-
-            for entry in manifest_list.entries() {
-                if !Self::content_type_is_data(entry) {
-                    continue;
-                }
-
-                let partition_spec_id = entry.partition_spec_id;
-
-                let partition_filter = partition_filter_cache.get(
-                    partition_spec_id,
-                    &context,
-                )?;
-
-                if let Some(partition_filter) = partition_filter {
-                    let manifest_evaluator = manifest_evaluator_cache.get(
-                        partition_spec_id,
-                        partition_filter,
-                    );
-
-                    if !manifest_evaluator.eval(entry)? {
-                        continue;
-                    }
-                }
+        let concurrency_limit_manifest_files = 
self.concurrency_limit_manifest_files;
+        let concurrency_limit_manifest_entries = 
self.concurrency_limit_manifest_entries;
+
+        // used to stream ManifestEntryContexts between stages of the file 
plan operation
+        let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) =
+            channel(concurrency_limit_manifest_files);
+        // used to stream the results back to the caller
+        let (file_scan_task_tx, file_scan_task_rx) = 
channel(concurrency_limit_manifest_entries);
+
+        let manifest_list = self.plan_context.get_manifest_list().await?;
+
+        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out 
any
+        // whose content type is not Data or whose partitions cannot match this
+        // scan's filter
+        let manifest_file_contexts = self
+            .plan_context
+            .build_manifest_file_contexts(manifest_list, 
manifest_entry_ctx_tx)?;
+
+        // Concurrently load all [`Manifest`]s and stream their 
[`ManifestEntry`]s
+        spawn(async move {
+            futures::stream::iter(manifest_file_contexts)
+                .try_for_each_concurrent(concurrency_limit_manifest_files, 
|ctx| async move {
+                    ctx.fetch_manifest_and_stream_manifest_entries().await
+                })
+                .await
+        });
+
+        // Process the [`ManifestEntry`] stream in parallel
+        spawn(async move {
+            manifest_entry_ctx_rx
+                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
+                .try_for_each_concurrent(
+                    concurrency_limit_manifest_entries,
+                    |(manifest_entry_context, tx)| async move {
+                        crate::runtime::spawn(async move {
+                            
Self::process_manifest_entry(manifest_entry_context, tx).await
+                        })
+                        .await
+                    },
+                )
+                .await
+        });
 
-                let manifest = entry.load_manifest(&context.file_io).await?;
-                let mut manifest_entries_stream =
-                    futures::stream::iter(manifest.entries().iter().filter(|e| 
e.is_alive()));
-
-                while let Some(manifest_entry) = 
manifest_entries_stream.next().await {
-                    let data_file = manifest_entry.data_file();
-
-                    if let Some(partition_filter) = partition_filter {
-                        let expression_evaluator = 
expression_evaluator_cache.get(partition_spec_id, partition_filter);
-
-                        if !expression_evaluator.eval(data_file)? {
-                            continue;
-                        }
-                    }
-
-
-                    if let Some(bound_predicate) = context.bound_filter() {
-                        // reject any manifest entries whose data file's 
metrics don't match the filter.
-                        if !InclusiveMetricsEvaluator::eval(
-                            bound_predicate,
-                            manifest_entry.data_file(),
-                            false
-                        )? {
-                            continue;
-                        }
-                    }
-
-                    match manifest_entry.content_type() {
-                        DataContentType::EqualityDeletes | 
DataContentType::PositionDeletes => {
-                            yield Err(Error::new(
-                                ErrorKind::FeatureUnsupported,
-                                "Delete files are not supported yet.",
-                            ))?;
-                        }
-                        DataContentType::Data => {
-                            let scan_task: Result<FileScanTask> = 
Ok(FileScanTask {
-                                data_file_path: 
manifest_entry.data_file().file_path().to_string(),
-                                start: 0,
-                                length: manifest_entry.file_size_in_bytes(),
-                                project_field_ids: field_ids.clone(),
-                                predicate: bound_predicates.clone(),
-                                schema: context.schema.clone(),
-                            });
-                            yield scan_task?;
-                        }
-                    }
-                }
-            }
-        }
-        .boxed())
+        return Ok(file_scan_task_rx.boxed());
     }
 
     /// Returns an [`ArrowRecordBatchStream`].
@@ -340,157 +345,468 @@ impl TableScan {
         arrow_reader_builder.build().read(self.plan_files().await?)
     }
 
-    /// Checks whether the [`ManifestContentType`] is `Data` or not.
-    fn content_type_is_data(entry: &ManifestFile) -> bool {
-        if let ManifestContentType::Data = entry.content {
-            return true;
-        }
-        false
-    }
-
     /// Returns a reference to the column names of the table scan.
     pub fn column_names(&self) -> &[String] {
         &self.column_names
     }
+    /// Returns a reference to the snapshot of the table scan.
+    pub fn snapshot(&self) -> &SnapshotRef {
+        &self.plan_context.snapshot
+    }
+
+    async fn process_manifest_entry(
+        manifest_entry_context: ManifestEntryContext,
+        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
+    ) -> Result<()> {
+        // skip processing this manifest entry if it has been marked as deleted
+        if !manifest_entry_context.manifest_entry.is_alive() {
+            return Ok(());
+        }
+
+        // abort the plan if we encounter a manifest entry whose data file's
+        // content type is currently unsupported
+        if manifest_entry_context.manifest_entry.content_type() != 
DataContentType::Data {
+            return Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Only Data files currently supported",
+            ));
+        }
+
+        if let Some(ref bound_predicates) = 
manifest_entry_context.bound_predicates {
+            let BoundPredicates {
+                ref snapshot_bound_predicate,
+                ref partition_bound_predicate,
+            } = bound_predicates.as_ref();
+
+            let expression_evaluator_cache =
+                manifest_entry_context.expression_evaluator_cache.as_ref();
+
+            let expression_evaluator = expression_evaluator_cache.get(
+                manifest_entry_context.partition_spec_id,
+                partition_bound_predicate,
+            )?;
+
+            // skip any data file whose partition data indicates that it can't 
contain
+            // any data that matches this scan's filter
+            if 
!expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
+                return Ok(());
+            }
+
+            // skip any data file whose metrics don't match this scan's filter
+            if !InclusiveMetricsEvaluator::eval(
+                snapshot_bound_predicate,
+                manifest_entry_context.manifest_entry.data_file(),
+                false,
+            )? {
+                return Ok(());
+            }
+        }
+
+        // congratulations! the manifest entry has made its way through the
+        // entire plan without getting filtered out. Create a corresponding
+        // FileScanTask and push it to the result stream
+        file_scan_task_tx
+            .send(Ok(manifest_entry_context.into_file_scan_task()))
+            .await?;
+
+        Ok(())
+    }
 }
 
-/// Holds the context necessary for file scanning operations
-/// in a streaming environment.
-#[derive(Debug)]
-struct FileScanStreamContext {
-    schema: SchemaRef,
-    snapshot: SnapshotRef,
-    table_metadata: TableMetadataRef,
+struct BoundPredicates {
+    partition_bound_predicate: BoundPredicate,
+    snapshot_bound_predicate: BoundPredicate,
+}
+
+/// Wraps a [`ManifestFile`] alongside the objects that are needed
+/// to process it in a thread-safe manner
+struct ManifestFileContext {
+    manifest_file: ManifestFile,
+
+    sender: Sender<ManifestEntryContext>,
+
+    field_ids: Arc<Vec<i32>>,
     file_io: FileIO,
-    bound_filter: Option<BoundPredicate>,
-    case_sensitive: bool,
+    bound_predicates: Option<Arc<BoundPredicates>>,
+    snapshot_schema: SchemaRef,
+    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
 }
 
-impl FileScanStreamContext {
-    /// Creates a new [`FileScanStreamContext`].
-    fn new(
-        schema: SchemaRef,
-        snapshot: SnapshotRef,
-        table_metadata: TableMetadataRef,
-        file_io: FileIO,
-        filter: Option<Arc<Predicate>>,
-        case_sensitive: bool,
-    ) -> Result<Self> {
-        let bound_filter = match filter {
-            Some(ref filter) => Some(filter.bind(schema.clone(), 
case_sensitive)?),
-            None => None,
-        };
+/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
+/// to process it in a thread-safe manner
+struct ManifestEntryContext {
+    manifest_entry: ManifestEntryRef,
 
-        Ok(Self {
-            schema,
-            snapshot,
-            table_metadata,
+    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
+    field_ids: Arc<Vec<i32>>,
+    bound_predicates: Option<Arc<BoundPredicates>>,
+    partition_spec_id: i32,
+    snapshot_schema: SchemaRef,
+}
+
+impl ManifestFileContext {
+    /// Consumes this [`ManifestFileContext`], fetching its Manifest from 
FileIO and then
+    /// streaming its constituent [`ManifestEntries`] to the channel provided 
in the context
+    async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
+        let ManifestFileContext {
             file_io,
-            bound_filter,
-            case_sensitive,
-        })
+            manifest_file,
+            bound_predicates,
+            snapshot_schema,
+            field_ids,
+            expression_evaluator_cache,
+            mut sender,
+            ..
+        } = self;
+
+        let file_io_cloned = file_io.clone();
+        let manifest = manifest_file.load_manifest(&file_io_cloned).await?;
+
+        let (entries, _) = manifest.consume();
+
+        for manifest_entry in entries.into_iter() {
+            let manifest_entry_context = ManifestEntryContext {
+                manifest_entry,
+                expression_evaluator_cache: expression_evaluator_cache.clone(),
+                field_ids: field_ids.clone(),
+                partition_spec_id: manifest_file.partition_spec_id,
+                bound_predicates: bound_predicates.clone(),
+                snapshot_schema: snapshot_schema.clone(),
+            };
+
+            sender
+                .send(manifest_entry_context)
+                .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel 
SendError"))
+                .await?;
+        }
+
+        Ok(())
     }
+}
 
-    /// Returns a reference to the [`BoundPredicate`] filter.
-    fn bound_filter(&self) -> Option<&BoundPredicate> {
-        self.bound_filter.as_ref()
+impl ManifestEntryContext {
+    /// consume this `ManifestEntryContext`, returning a `FileScanTask`
+    /// created from it
+    fn into_file_scan_task(self) -> FileScanTask {
+        FileScanTask {
+            data_file_path: self.manifest_entry.file_path().to_string(),
+            start: 0,
+            length: self.manifest_entry.file_size_in_bytes(),
+            project_field_ids: self.field_ids.to_vec(),
+            predicate: self
+                .bound_predicates
+                .map(|x| x.as_ref().snapshot_bound_predicate.clone()),
+            schema: self.snapshot_schema,
+        }
+    }
+}
+
+impl PlanContext {
+    async fn get_manifest_list(&self) -> Result<ManifestList> {
+        self.snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await
+    }
+
+    fn get_partition_filter(&self, manifest_file: &ManifestFile) -> 
Result<Arc<BoundPredicate>> {
+        let partition_spec_id = manifest_file.partition_spec_id;
+
+        let partition_filter = self.partition_filter_cache.get(
+            partition_spec_id,
+            &self.table_metadata,
+            &self.snapshot_schema,
+            self.case_sensitive,
+            self.predicate
+                .as_ref()
+                .ok_or(Error::new(
+                    ErrorKind::Unexpected,
+                    "Expected a predicate but none present",
+                ))?
+                .as_ref()
+                .bind(self.snapshot_schema.clone(), self.case_sensitive)?,
+        )?;
+
+        Ok(partition_filter)
+    }
+
+    fn build_manifest_file_contexts(
+        &self,
+        manifest_list: ManifestList,
+        sender: Sender<ManifestEntryContext>,
+    ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
+        let filtered_entries = manifest_list
+            .consume_entries()
+            .into_iter()
+            .filter(|manifest_file| manifest_file.content == 
ManifestContentType::Data);
+
+        // TODO: Ideally we could ditch this intermediate Vec as we return an 
iterator.
+        let mut filtered_mfcs = vec![];
+        if self.predicate.is_some() {
+            for manifest_file in filtered_entries {
+                let partition_bound_predicate = 
self.get_partition_filter(&manifest_file)?;
+
+                // evaluate the ManifestFile against the partition filter. Skip
+                // if it cannot contain any matching rows
+                if self
+                    .manifest_evaluator_cache
+                    .get(
+                        manifest_file.partition_spec_id,
+                        partition_bound_predicate.clone(),
+                    )
+                    .eval(&manifest_file)?
+                {
+                    let mfc = self.create_manifest_file_context(
+                        manifest_file,
+                        Some(partition_bound_predicate),
+                        sender.clone(),
+                    );
+                    filtered_mfcs.push(Ok(mfc));
+                }
+            }
+        } else {
+            for manifest_file in filtered_entries {
+                let mfc = self.create_manifest_file_context(manifest_file, 
None, sender.clone());
+                filtered_mfcs.push(Ok(mfc));
+            }
+        }
+
+        Ok(Box::new(filtered_mfcs.into_iter()))
+    }
+
+    fn create_manifest_file_context(
+        &self,
+        manifest_file: ManifestFile,
+        partition_filter: Option<Arc<BoundPredicate>>,
+        sender: Sender<ManifestEntryContext>,
+    ) -> ManifestFileContext {
+        let bound_predicates =
+            if let (Some(ref partition_bound_predicate), 
Some(snapshot_bound_predicate)) =
+                (partition_filter, &self.snapshot_bound_predicate)
+            {
+                Some(Arc::new(BoundPredicates {
+                    partition_bound_predicate: 
partition_bound_predicate.as_ref().clone(),
+                    snapshot_bound_predicate: 
snapshot_bound_predicate.as_ref().clone(),
+                }))
+            } else {
+                None
+            };
+
+        ManifestFileContext {
+            manifest_file,
+            bound_predicates,
+            sender,
+            file_io: self.file_io.clone(),
+            snapshot_schema: self.snapshot_schema.clone(),
+            field_ids: self.field_ids.clone(),
+            expression_evaluator_cache: 
self.expression_evaluator_cache.clone(),
+        }
     }
 }
 
 /// Manages the caching of [`BoundPredicate`] objects
 /// for [`PartitionSpec`]s based on partition spec id.
 #[derive(Debug)]
-struct PartitionFilterCache(HashMap<i32, BoundPredicate>);
+struct PartitionFilterCache(RwLock<HashMap<i32, Arc<BoundPredicate>>>);
 
 impl PartitionFilterCache {
     /// Creates a new [`PartitionFilterCache`]
     /// with an empty internal HashMap.
     fn new() -> Self {
-        Self(HashMap::new())
+        Self(RwLock::new(HashMap::new()))
     }
 
     /// Retrieves a [`BoundPredicate`] from the cache
     /// or computes it if not present.
     fn get(
-        &mut self,
+        &self,
         spec_id: i32,
-        context: &FileScanStreamContext,
-    ) -> Result<Option<&BoundPredicate>> {
-        match context.bound_filter() {
-            None => Ok(None),
-            Some(filter) => match self.0.entry(spec_id) {
-                Entry::Occupied(e) => Ok(Some(e.into_mut())),
-                Entry::Vacant(e) => {
-                    let partition_spec = context
-                        .table_metadata
-                        .partition_spec_by_id(spec_id)
-                        .ok_or(Error::new(
-                            ErrorKind::Unexpected,
-                            format!("Could not find partition spec for id {}", 
spec_id),
-                        ))?;
-
-                    let partition_type = 
partition_spec.partition_type(context.schema.as_ref())?;
-                    let partition_fields = partition_type.fields().to_owned();
-                    let partition_schema = Arc::new(
-                        Schema::builder()
-                            .with_schema_id(partition_spec.spec_id)
-                            .with_fields(partition_fields)
-                            .build()?,
-                    );
+        table_metadata: &TableMetadataRef,
+        schema: &SchemaRef,
+        case_sensitive: bool,
+        filter: BoundPredicate,
+    ) -> Result<Arc<BoundPredicate>> {
+        // we need a block here to ensure that the `read()` gets dropped 
before we hit the `write()`
+        // below, otherwise we hit deadlock
+        {
+            let read = self.0.read().map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "PartitionFilterCache RwLock was poisoned",
+                )
+            })?;
 
-                    let mut inclusive_projection = 
InclusiveProjection::new(partition_spec.clone());
+            if read.contains_key(&spec_id) {
+                return Ok(read.get(&spec_id).unwrap().clone());
+            }
+        }
 
-                    let partition_filter = inclusive_projection
-                        .project(filter)?
-                        .rewrite_not()
-                        .bind(partition_schema.clone(), 
context.case_sensitive)?;
+        let partition_spec = table_metadata
+            .partition_spec_by_id(spec_id)
+            .ok_or(Error::new(
+                ErrorKind::Unexpected,
+                format!("Could not find partition spec for id {}", spec_id),
+            ))?;
 
-                    Ok(Some(e.insert(partition_filter)))
-                }
-            },
-        }
+        let partition_type = partition_spec.partition_type(schema.as_ref())?;
+        let partition_fields = partition_type.fields().to_owned();
+        let partition_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(partition_spec.spec_id)
+                .with_fields(partition_fields)
+                .build()?,
+        );
+
+        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec.clone());
+
+        let partition_filter = inclusive_projection
+            .project(&filter)?
+            .rewrite_not()
+            .bind(partition_schema.clone(), case_sensitive)?;
+
+        self.0
+            .write()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "PartitionFilterCache RwLock was poisoned",
+                )
+            })?
+            .insert(spec_id, Arc::new(partition_filter));
+
+        let read = self.0.read().map_err(|_| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "PartitionFilterCache RwLock was poisoned",
+            )
+        })?;
+
+        Ok(read.get(&spec_id).unwrap().clone())
     }
 }
 
 /// Manages the caching of [`ManifestEvaluator`] objects
 /// for [`PartitionSpec`]s based on partition spec id.
 #[derive(Debug)]
-struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);
+struct ManifestEvaluatorCache(RwLock<HashMap<i32, Arc<ManifestEvaluator>>>);
 
 impl ManifestEvaluatorCache {
     /// Creates a new [`ManifestEvaluatorCache`]
     /// with an empty internal HashMap.
     fn new() -> Self {
-        Self(HashMap::new())
+        Self(RwLock::new(HashMap::new()))
     }
 
     /// Retrieves a [`ManifestEvaluator`] from the cache
     /// or computes it if not present.
-    fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut 
ManifestEvaluator {
+    fn get(&self, spec_id: i32, partition_filter: Arc<BoundPredicate>) -> 
Arc<ManifestEvaluator> {
+        // we need a block here to ensure that the `read()` gets dropped 
before we hit the `write()`
+        // below, otherwise we hit deadlock
+        {
+            let read = self
+                .0
+                .read()
+                .map_err(|_| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "ManifestEvaluatorCache RwLock was poisoned",
+                    )
+                })
+                .unwrap();
+
+            if read.contains_key(&spec_id) {
+                return read.get(&spec_id).unwrap().clone();
+            }
+        }
+
         self.0
-            .entry(spec_id)
-            .or_insert(ManifestEvaluator::new(partition_filter.clone()))
+            .write()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "ManifestEvaluatorCache RwLock was poisoned",
+                )
+            })
+            .unwrap()
+            .insert(
+                spec_id,
+                
Arc::new(ManifestEvaluator::new(partition_filter.as_ref().clone())),
+            );
+
+        let read = self
+            .0
+            .read()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "ManifestEvaluatorCache RwLock was poisoned",
+                )
+            })
+            .unwrap();
+
+        read.get(&spec_id).unwrap().clone()
     }
 }
 
 /// Manages the caching of [`ExpressionEvaluator`] objects
 /// for [`PartitionSpec`]s based on partition spec id.
 #[derive(Debug)]
-struct ExpressionEvaluatorCache(HashMap<i32, ExpressionEvaluator>);
+struct ExpressionEvaluatorCache(RwLock<HashMap<i32, 
Arc<ExpressionEvaluator>>>);
 
 impl ExpressionEvaluatorCache {
     /// Creates a new [`ExpressionEvaluatorCache`]
     /// with an empty internal HashMap.
     fn new() -> Self {
-        Self(HashMap::new())
+        Self(RwLock::new(HashMap::new()))
     }
 
     /// Retrieves a [`ExpressionEvaluator`] from the cache
     /// or computes it if not present.
-    fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut 
ExpressionEvaluator {
+    fn get(
+        &self,
+        spec_id: i32,
+        partition_filter: &BoundPredicate,
+    ) -> Result<Arc<ExpressionEvaluator>> {
+        // we need a block here to ensure that the `read()` gets dropped 
before we hit the `write()`
+        // below, otherwise we hit deadlock
+        {
+            let read = self.0.read().map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "PartitionFilterCache RwLock was poisoned",
+                )
+            })?;
+
+            if read.contains_key(&spec_id) {
+                return Ok(read.get(&spec_id).unwrap().clone());
+            }
+        }
+
         self.0
-            .entry(spec_id)
-            .or_insert(ExpressionEvaluator::new(partition_filter.clone()))
+            .write()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "ManifestEvaluatorCache RwLock was poisoned",
+                )
+            })
+            .unwrap()
+            .insert(
+                spec_id,
+                Arc::new(ExpressionEvaluator::new(partition_filter.clone())),
+            );
+
+        let read = self
+            .0
+            .read()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "ManifestEvaluatorCache RwLock was poisoned",
+                )
+            })
+            .unwrap();
+
+        Ok(read.get(&spec_id).unwrap().clone())
     }
 }
 
@@ -817,7 +1133,7 @@ mod tests {
         let table_scan = table.scan().build().unwrap();
         assert_eq!(
             table.metadata().current_snapshot().unwrap().snapshot_id(),
-            table_scan.snapshot.snapshot_id()
+            table_scan.snapshot().snapshot_id()
         );
     }
 
@@ -838,7 +1154,7 @@ mod tests {
             .snapshot_id(3051729675574597004)
             .build()
             .unwrap();
-        assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004);
+        assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004);
     }
 
     #[tokio::test]
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index e08591f..e2f8251 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -94,6 +94,12 @@ impl Manifest {
         &self.entries
     }
 
+    /// Consume this Manifest, returning its constituent parts
+    pub fn consume(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
+        let Self { entries, metadata } = self;
+        (entries, metadata)
+    }
+
     /// Constructor from [`ManifestMetadata`] and [`ManifestEntry`]s.
     pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> 
Self {
         Self {
diff --git a/crates/iceberg/src/spec/manifest_list.rs 
b/crates/iceberg/src/spec/manifest_list.rs
index e818890..3aaecf1 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -78,6 +78,11 @@ impl ManifestList {
     pub fn entries(&self) -> &[ManifestFile] {
         &self.entries
     }
+
+    /// Take ownership of the entries in the manifest list, consuming it
+    pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
+        Box::new(self.entries.into_iter())
+    }
 }
 
 /// A manifest list writer.


Reply via email to