This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a1cb12  Make plan_files as asynchronous stream (#243)
7a1cb12 is described below

commit 7a1cb12d048ffd111b99fc1d53c1e5817e8e9117
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Mar 11 02:56:23 2024 -0700

    Make plan_files as asynchronous stream (#243)
---
 crates/iceberg/src/scan.rs | 57 ++++++++++++++++++++++++++--------------------
 1 file changed, 32 insertions(+), 25 deletions(-)

diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 37fde8f..bd0e6ad 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -23,6 +23,7 @@ use crate::spec::{DataContentType, ManifestEntryRef, 
SchemaRef, SnapshotRef, Tab
 use crate::table::Table;
 use crate::{Error, ErrorKind};
 use arrow_array::RecordBatch;
+use async_stream::try_stream;
 use futures::stream::{iter, BoxStream};
 use futures::StreamExt;
 
@@ -143,37 +144,43 @@ pub type FileScanTaskStream = BoxStream<'static, 
crate::Result<FileScanTask>>;
 impl TableScan {
     /// Returns a stream of file scan tasks.
     pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        let manifest_list = self
-            .snapshot
-            .load_manifest_list(&self.file_io, &self.table_metadata)
+        let snapshot = self.snapshot.clone();
+        let table_metadata = self.table_metadata.clone();
+        let file_io = self.file_io.clone();
+
+        Ok(try_stream! {
+            let manifest_list = snapshot
+            .clone()
+            .load_manifest_list(&file_io, &table_metadata)
             .await?;
 
-        // Generate data file stream
-        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
-        for manifest_list_entry in manifest_list.entries().iter() {
-            // Data file
-            let manifest = 
manifest_list_entry.load_manifest(&self.file_io).await?;
-
-            for manifest_entry in manifest.entries().iter().filter(|e| 
e.is_alive()) {
-                match manifest_entry.content_type() {
-                    DataContentType::EqualityDeletes | 
DataContentType::PositionDeletes => {
-                        return Err(Error::new(
-                            ErrorKind::FeatureUnsupported,
-                            "Delete files are not supported yet.",
-                        ));
-                    }
-                    DataContentType::Data => {
-                        file_scan_tasks.push(Ok(FileScanTask {
-                            data_file: manifest_entry.clone(),
-                            start: 0,
-                            length: manifest_entry.file_size_in_bytes(),
-                        }));
+            // Generate data file stream
+            let mut entries = iter(manifest_list.entries());
+            while let Some(entry) = entries.next().await {
+                let manifest = entry.load_manifest(&file_io).await?;
+
+                let mut manifest_entries = 
iter(manifest.entries().iter().filter(|e| e.is_alive()));
+                while let Some(manifest_entry) = manifest_entries.next().await 
{
+                    match manifest_entry.content_type() {
+                        DataContentType::EqualityDeletes | 
DataContentType::PositionDeletes => {
+                            yield Err(Error::new(
+                                ErrorKind::FeatureUnsupported,
+                                "Delete files are not supported yet.",
+                            ))?;
+                        }
+                        DataContentType::Data => {
+                            let scan_task: crate::Result<FileScanTask> = 
Ok(FileScanTask {
+                                data_file: manifest_entry.clone(),
+                                start: 0,
+                                length: manifest_entry.file_size_in_bytes(),
+                            });
+                            yield scan_task?;
+                        }
                     }
                 }
             }
         }
-
-        Ok(iter(file_scan_tasks).boxed())
+        .boxed())
     }
 
     pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {

Reply via email to