This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1bf80e1 make file scan task serializable (#377)
1bf80e1 is described below
commit 1bf80e1894e03fe37cb47f5ccba00ccc0ebe510a
Author: ZENOTME <[email protected]>
AuthorDate: Mon May 27 22:34:47 2024 +0800
make file scan task serializable (#377)
Co-authored-by: ZENOTME <[email protected]>
---
crates/iceberg/src/arrow/reader.rs | 2 +-
crates/iceberg/src/lib.rs | 2 +-
crates/iceberg/src/scan.rs | 24 +++++++++++++-----------
3 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 391239c..c10264b 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -125,7 +125,7 @@ impl ArrowReader {
Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_file = file_io
- .new_input(task.data().data_file().file_path())?;
+ .new_input(task.data_file_path())?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 17a94d4..b7e0b3b 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -45,7 +45,7 @@ mod avro;
pub mod io;
pub mod spec;
-mod scan;
+pub mod scan;
#[allow(dead_code)]
pub mod expr;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 6641d67..397633d 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -24,8 +24,8 @@ use
crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
- DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile,
Schema, SchemaRef,
- SnapshotRef, TableMetadataRef,
+ DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef,
SnapshotRef,
+ TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
@@ -33,6 +33,7 @@ use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::BoxStream;
use futures::StreamExt;
+use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
@@ -55,7 +56,7 @@ pub struct TableScanBuilder<'a> {
}
impl<'a> TableScanBuilder<'a> {
- pub fn new(table: &'a Table) -> Self {
+ pub(crate) fn new(table: &'a Table) -> Self {
Self {
table,
column_names: vec![],
@@ -265,7 +266,7 @@ impl TableScan {
}
DataContentType::Data => {
let scan_task: Result<FileScanTask> =
Ok(FileScanTask {
- data_manifest_entry: manifest_entry.clone(),
+ data_file_path:
manifest_entry.data_file().file_path().to_string(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
});
@@ -463,9 +464,9 @@ impl ManifestEvaluatorCache {
}
/// A task to scan part of file.
-#[derive(Debug)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
- data_manifest_entry: ManifestEntryRef,
+ data_file_path: String,
#[allow(dead_code)]
start: u64,
#[allow(dead_code)]
@@ -473,8 +474,9 @@ pub struct FileScanTask {
}
impl FileScanTask {
- pub fn data(&self) -> ManifestEntryRef {
- self.data_manifest_entry.clone()
+ /// Returns the data file path of this file scan task.
+ pub fn data_file_path(&self) -> &str {
+ &self.data_file_path
}
}
@@ -794,17 +796,17 @@ mod tests {
assert_eq!(tasks.len(), 2);
- tasks.sort_by_key(|t| t.data().data_file().file_path().to_string());
+ tasks.sort_by_key(|t| t.data_file_path().to_string());
// Check first task is added data file
assert_eq!(
- tasks[0].data().data_file().file_path(),
+ tasks[0].data_file_path(),
format!("{}/1.parquet", &fixture.table_location)
);
// Check second task is existing data file
assert_eq!(
- tasks[1].data().data_file().file_path(),
+ tasks[1].data_file_path(),
format!("{}/3.parquet", &fixture.table_location)
);
}