This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bf80e1  make file scan task serializable (#377)
1bf80e1 is described below

commit 1bf80e1894e03fe37cb47f5ccba00ccc0ebe510a
Author: ZENOTME <[email protected]>
AuthorDate: Mon May 27 22:34:47 2024 +0800

    make file scan task serializable (#377)
    
    Co-authored-by: ZENOTME <[email protected]>
---
 crates/iceberg/src/arrow/reader.rs |  2 +-
 crates/iceberg/src/lib.rs          |  2 +-
 crates/iceberg/src/scan.rs         | 24 +++++++++++++-----------
 3 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 391239c..c10264b 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -125,7 +125,7 @@ impl ArrowReader {
         Ok(try_stream! {
             while let Some(Ok(task)) = tasks.next().await {
                 let parquet_file = file_io
-                    .new_input(task.data().data_file().file_path())?;
+                    .new_input(task.data_file_path())?;
                 let (parquet_metadata, parquet_reader) = 
try_join!(parquet_file.metadata(), parquet_file.reader())?;
                 let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
 
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 17a94d4..b7e0b3b 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -45,7 +45,7 @@ mod avro;
 pub mod io;
 pub mod spec;
 
-mod scan;
+pub mod scan;
 
 #[allow(dead_code)]
 pub mod expr;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 6641d67..397633d 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -24,8 +24,8 @@ use 
crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
 use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
 use crate::spec::{
-    DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, 
Schema, SchemaRef,
-    SnapshotRef, TableMetadataRef,
+    DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, 
SnapshotRef,
+    TableMetadataRef,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -33,6 +33,7 @@ use arrow_array::RecordBatch;
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use serde::{Deserialize, Serialize};
 use std::collections::hash_map::Entry;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -55,7 +56,7 @@ pub struct TableScanBuilder<'a> {
 }
 
 impl<'a> TableScanBuilder<'a> {
-    pub fn new(table: &'a Table) -> Self {
+    pub(crate) fn new(table: &'a Table) -> Self {
         Self {
             table,
             column_names: vec![],
@@ -265,7 +266,7 @@ impl TableScan {
                         }
                         DataContentType::Data => {
                             let scan_task: Result<FileScanTask> = 
Ok(FileScanTask {
-                                data_manifest_entry: manifest_entry.clone(),
+                                data_file_path: 
manifest_entry.data_file().file_path().to_string(),
                                 start: 0,
                                 length: manifest_entry.file_size_in_bytes(),
                             });
@@ -463,9 +464,9 @@ impl ManifestEvaluatorCache {
 }
 
 /// A task to scan part of file.
-#[derive(Debug)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct FileScanTask {
-    data_manifest_entry: ManifestEntryRef,
+    data_file_path: String,
     #[allow(dead_code)]
     start: u64,
     #[allow(dead_code)]
@@ -473,8 +474,9 @@ pub struct FileScanTask {
 }
 
 impl FileScanTask {
-    pub fn data(&self) -> ManifestEntryRef {
-        self.data_manifest_entry.clone()
+    /// Returns the data file path of this file scan task.
+    pub fn data_file_path(&self) -> &str {
+        &self.data_file_path
     }
 }
 
@@ -794,17 +796,17 @@ mod tests {
 
         assert_eq!(tasks.len(), 2);
 
-        tasks.sort_by_key(|t| t.data().data_file().file_path().to_string());
+        tasks.sort_by_key(|t| t.data_file_path().to_string());
 
         // Check first task is added data file
         assert_eq!(
-            tasks[0].data().data_file().file_path(),
+            tasks[0].data_file_path(),
             format!("{}/1.parquet", &fixture.table_location)
         );
 
         // Check second task is existing data file
         assert_eq!(
-            tasks[1].data().data_file().file_path(),
+            tasks[1].data_file_path(),
             format!("{}/3.parquet", &fixture.table_location)
         );
     }

Reply via email to