This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7a1cb12 Make plan_files as asynchronous stream (#243)
7a1cb12 is described below
commit 7a1cb12d048ffd111b99fc1d53c1e5817e8e9117
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Mar 11 02:56:23 2024 -0700
Make plan_files as asynchronous stream (#243)
---
crates/iceberg/src/scan.rs | 57 ++++++++++++++++++++++++++--------------------
1 file changed, 32 insertions(+), 25 deletions(-)
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 37fde8f..bd0e6ad 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -23,6 +23,7 @@ use crate::spec::{DataContentType, ManifestEntryRef,
SchemaRef, SnapshotRef, Tab
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
+use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
@@ -143,37 +144,43 @@ pub type FileScanTaskStream = BoxStream<'static,
crate::Result<FileScanTask>>;
impl TableScan {
/// Returns a stream of file scan tasks.
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
- let manifest_list = self
- .snapshot
- .load_manifest_list(&self.file_io, &self.table_metadata)
+ let snapshot = self.snapshot.clone();
+ let table_metadata = self.table_metadata.clone();
+ let file_io = self.file_io.clone();
+
+ Ok(try_stream! {
+ let manifest_list = snapshot
+ .clone()
+ .load_manifest_list(&file_io, &table_metadata)
.await?;
- // Generate data file stream
- let mut file_scan_tasks =
Vec::with_capacity(manifest_list.entries().len());
- for manifest_list_entry in manifest_list.entries().iter() {
- // Data file
- let manifest =
manifest_list_entry.load_manifest(&self.file_io).await?;
-
- for manifest_entry in manifest.entries().iter().filter(|e|
e.is_alive()) {
- match manifest_entry.content_type() {
- DataContentType::EqualityDeletes |
DataContentType::PositionDeletes => {
- return Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Delete files are not supported yet.",
- ));
- }
- DataContentType::Data => {
- file_scan_tasks.push(Ok(FileScanTask {
- data_file: manifest_entry.clone(),
- start: 0,
- length: manifest_entry.file_size_in_bytes(),
- }));
+ // Generate data file stream
+ let mut entries = iter(manifest_list.entries());
+ while let Some(entry) = entries.next().await {
+ let manifest = entry.load_manifest(&file_io).await?;
+
+ let mut manifest_entries =
iter(manifest.entries().iter().filter(|e| e.is_alive()));
+ while let Some(manifest_entry) = manifest_entries.next().await
{
+ match manifest_entry.content_type() {
+ DataContentType::EqualityDeletes |
DataContentType::PositionDeletes => {
+ yield Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Delete files are not supported yet.",
+ ))?;
+ }
+ DataContentType::Data => {
+ let scan_task: crate::Result<FileScanTask> =
Ok(FileScanTask {
+ data_file: manifest_entry.clone(),
+ start: 0,
+ length: manifest_entry.file_size_in_bytes(),
+ });
+ yield scan_task?;
+ }
}
}
}
}
-
- Ok(iter(file_scan_tasks).boxed())
+ .boxed())
}
pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {