This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-reader
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit 6cef43a540461c80245ff8ebd34ea553719a01c8
Author: Xuanwo <[email protected]>
AuthorDate: Mon Dec 16 15:50:05 2024 +0800

    refactor: Remove spawn and channel inside arrow reader
    
    Signed-off-by: Xuanwo <[email protected]>
---
 crates/iceberg/src/arrow/reader.rs | 80 ++++++++++++++------------------------
 crates/iceberg/src/scan.rs         |  7 +++-
 2 files changed, 35 insertions(+), 52 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 16b9468c..b4e15821 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -31,9 +31,8 @@ use arrow_schema::{
 use arrow_string::like::starts_with;
 use bytes::Bytes;
 use fnv::FnvHashSet;
-use futures::channel::mpsc::{channel, Sender};
 use futures::future::BoxFuture;
-use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, 
TryStreamExt};
+use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, 
RowFilter, RowSelection};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, 
PARQUET_FIELD_ID_META_KEY};
@@ -48,7 +47,6 @@ use 
crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
 use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
-use crate::runtime::spawn;
 use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
 use crate::spec::{Datum, PrimitiveType, Schema};
 use crate::utils::available_parallelism;
@@ -130,62 +128,41 @@ pub struct ArrowReader {
 impl ArrowReader {
     /// Take a stream of FileScanTasks and reads all the files.
     /// Returns a stream of Arrow RecordBatches containing the data from the 
files
-    pub fn read(self, tasks: FileScanTaskStream) -> 
Result<ArrowRecordBatchStream> {
+    pub async fn read(self, tasks: FileScanTaskStream) -> 
Result<ArrowRecordBatchStream> {
         let file_io = self.file_io.clone();
         let batch_size = self.batch_size;
         let concurrency_limit_data_files = self.concurrency_limit_data_files;
         let row_group_filtering_enabled = self.row_group_filtering_enabled;
         let row_selection_enabled = self.row_selection_enabled;
 
-        let (tx, rx) = channel(concurrency_limit_data_files);
-        let mut channel_for_error = tx.clone();
-
-        spawn(async move {
-            let result = tasks
-                .map(|task| Ok((task, file_io.clone(), tx.clone())))
-                .try_for_each_concurrent(
-                    concurrency_limit_data_files,
-                    |(file_scan_task, file_io, tx)| async move {
-                        match file_scan_task {
-                            Ok(task) => {
-                                let file_path = 
task.data_file_path.to_string();
-
-                                spawn(async move {
-                                    Self::process_file_scan_task(
-                                        task,
-                                        batch_size,
-                                        file_io,
-                                        tx,
-                                        row_group_filtering_enabled,
-                                        row_selection_enabled,
-                                    )
-                                    .await
-                                })
-                                .await
-                                .map_err(|e| e.with_context("file_path", 
file_path))
-                            }
-                            Err(err) => Err(err),
-                        }
-                    },
-                )
-                .await;
+        let stream = tasks
+            .map_ok(move |task| {
+                let file_io = file_io.clone();
 
-            if let Err(error) = result {
-                let _ = channel_for_error.send(Err(error)).await;
-            }
-        });
+                Self::process_file_scan_task(
+                    task,
+                    batch_size,
+                    file_io,
+                    row_group_filtering_enabled,
+                    row_selection_enabled,
+                )
+            })
+            .map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "file scan task generate 
failed").with_source(err)
+            })
+            .try_buffer_unordered(concurrency_limit_data_files)
+            .try_flatten_unordered(concurrency_limit_data_files);
 
-        return Ok(rx.boxed());
+        Ok(Box::pin(stream) as ArrowRecordBatchStream)
     }
 
     async fn process_file_scan_task(
         task: FileScanTask,
         batch_size: Option<usize>,
         file_io: FileIO,
-        mut tx: Sender<Result<RecordBatch>>,
         row_group_filtering_enabled: bool,
         row_selection_enabled: bool,
-    ) -> Result<()> {
+    ) -> Result<ArrowRecordBatchStream> {
         // Get the metadata for the Parquet file we need to read and build
         // a reader for the data within
         let parquet_file = file_io.new_input(&task.data_file_path)?;
@@ -269,14 +246,15 @@ impl ArrowReader {
 
         // Build the batch stream and send all the RecordBatches that it 
generates
         // to the requester.
-        let mut record_batch_stream = record_batch_stream_builder.build()?;
-
-        while let Some(batch) = record_batch_stream.try_next().await? {
-            tx.send(record_batch_transformer.process_record_batch(batch))
-                .await?
-        }
-
-        Ok(())
+        let record_batch_stream =
+            record_batch_stream_builder
+                .build()?
+                .map(move |batch| match batch {
+                    Ok(batch) => 
record_batch_transformer.process_record_batch(batch),
+                    Err(err) => Err(err.into()),
+                });
+
+        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
     }
 
     fn build_field_id_set_and_map(
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 89cc21bb..89df9752 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -422,7 +422,10 @@ impl TableScan {
             arrow_reader_builder = 
arrow_reader_builder.with_batch_size(batch_size);
         }
 
-        arrow_reader_builder.build().read(self.plan_files().await?)
+        arrow_reader_builder
+            .build()
+            .read(self.plan_files().await?)
+            .await
     }
 
     /// Returns a reference to the column names of the table scan.
@@ -1404,12 +1407,14 @@ mod tests {
         let batch_stream = reader
             .clone()
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
+            .await
             .unwrap();
         let batche1: Vec<_> = batch_stream.try_collect().await.unwrap();
 
         let reader = 
ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
         let batch_stream = reader
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
+            .await
             .unwrap();
         let batche2: Vec<_> = batch_stream.try_collect().await.unwrap();
 

Reply via email to