This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8e4308d Implement basic Parquet data file reading capability (#207)
8e4308d is described below
commit 8e4308d53fdc00195429261f8566e0bf19ef6849
Author: Scott Donnelly <[email protected]>
AuthorDate: Thu Mar 7 09:40:13 2024 +0000
Implement basic Parquet data file reading capability (#207)
* feat: TableScan parquet file read to RecordBatch stream
* chore: add inline hinting and fix incorrect comment
* refactor: extract record batch reader
* refactor: rename `FileRecordBatchReader` to `ArrowReader`
* refactor: rename file_record_batch_reader.rs to arrow.rs
* refactor: move `batch_size` param to `TableScanBuilder`
* refactor: rename `TableScan.execute` to `to_arrow`
* refactor: use builder pattern to create `ArrowReader`
---
Cargo.toml | 3 +
crates/iceberg/Cargo.toml | 4 +
crates/iceberg/src/arrow.rs | 108 ++++++++++++++
crates/iceberg/src/error.rs | 6 +
crates/iceberg/src/io.rs | 8 +-
crates/iceberg/src/lib.rs | 1 +
crates/iceberg/src/scan.rs | 271 +++++++++++++++++++++++-------------
crates/iceberg/src/spec/manifest.rs | 8 ++
8 files changed, 312 insertions(+), 97 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index dccc6bd..3234bd0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,9 +34,11 @@ apache-avro = "0.16"
arrow-arith = { version = ">=46" }
arrow-array = { version = ">=46" }
arrow-schema = { version = ">=46" }
+async-stream = "0.3.5"
async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
+bytes = "1.5"
chrono = "0.4"
derive_builder = "0.20.0"
either = "1"
@@ -52,6 +54,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
+parquet = "50"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 32288ee..181832c 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -34,9 +34,11 @@ apache-avro = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
+async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
+bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
either = { workspace = true }
@@ -48,6 +50,7 @@ murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
+parquet = { workspace = true, features = ["async"] }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
@@ -56,6 +59,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
+tokio = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs
new file mode 100644
index 0000000..6de9b6a
--- /dev/null
+++ b/crates/iceberg/src/arrow.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet file data reader
+
+use async_stream::try_stream;
+use futures::stream::StreamExt;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+
+use crate::io::FileIO;
+use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
+use crate::spec::SchemaRef;
+
+/// Builder to create ArrowReader
+pub struct ArrowReaderBuilder {
+ batch_size: Option<usize>,
+ file_io: FileIO,
+ schema: SchemaRef,
+}
+
+impl ArrowReaderBuilder {
+ /// Create a new ArrowReaderBuilder
+ pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
+ ArrowReaderBuilder {
+ batch_size: None,
+ file_io,
+ schema,
+ }
+ }
+
+ /// Sets the desired size of batches in the response
+ /// to something other than the default
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = Some(batch_size);
+ self
+ }
+
+ /// Build the ArrowReader.
+ pub fn build(self) -> ArrowReader {
+ ArrowReader {
+ batch_size: self.batch_size,
+ schema: self.schema,
+ file_io: self.file_io,
+ }
+ }
+}
+
+/// Reads data from Parquet files
+pub struct ArrowReader {
+ batch_size: Option<usize>,
+ #[allow(dead_code)]
+ schema: SchemaRef,
+ file_io: FileIO,
+}
+
+impl ArrowReader {
+ /// Take a stream of FileScanTasks and reads all the files.
+ /// Returns a stream of Arrow RecordBatches containing the data from the
files
+ pub fn read(self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+
+ Ok(try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+
+ let projection_mask = self.get_arrow_projection_mask(&task);
+
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await?
+ .with_projection(projection_mask);
+
+ if let Some(batch_size) = self.batch_size {
+ batch_stream_builder =
batch_stream_builder.with_batch_size(batch_size);
+ }
+
+ let mut batch_stream = batch_stream_builder.build()?;
+
+ while let Some(batch) = batch_stream.next().await {
+ yield batch?;
+ }
+ }
+ }
+ .boxed())
+ }
+
+ fn get_arrow_projection_mask(&self, _task: &FileScanTask) ->
ProjectionMask {
+ // TODO: full implementation
+ ProjectionMask::all()
+ }
+}
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index c851402..f920fa9 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -325,6 +325,12 @@ define_from_err!(
"Failed to convert decimal literal to rust decimal"
);
+define_from_err!(
+ parquet::errors::ParquetError,
+ ErrorKind::Unexpected,
+ "Failed to read a Parquet file"
+);
+
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
/// Helper macro to check arguments.
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 3a7c85f..410d870 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
+use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;
/// Following are arguments for [s3 file
io](https://py.iceberg.apache.org/configuration/#s3).
@@ -215,9 +216,12 @@ pub struct InputFile {
}
/// Trait for reading file.
-pub trait FileRead: AsyncRead + AsyncSeek {}
+pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead +
TokioAsyncSeek {}
-impl<T> FileRead for T where T: AsyncRead + AsyncSeek {}
+impl<T> FileRead for T where
+ T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
+{
+}
impl InputFile {
/// Absolute path to root uri.
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 9ceadca..17a94d4 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -52,4 +52,5 @@ pub mod expr;
pub mod transaction;
pub mod transform;
+pub mod arrow;
pub mod writer;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 0a3b9a9..cca26b6 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -17,6 +17,7 @@
//! Table scan api.
+use crate::arrow::ArrowReaderBuilder;
use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef,
TableMetadataRef};
use crate::table::Table;
@@ -31,6 +32,7 @@ pub struct TableScanBuilder<'a> {
// Empty column names means to select all columns
column_names: Vec<String>,
snapshot_id: Option<i64>,
+ batch_size: Option<usize>,
}
impl<'a> TableScanBuilder<'a> {
@@ -39,9 +41,17 @@ impl<'a> TableScanBuilder<'a> {
table,
column_names: vec![],
snapshot_id: None,
+ batch_size: None,
}
}
+ /// Sets the desired size of batches in the response
+ /// to something other than the default
+ pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names.clear();
@@ -110,6 +120,7 @@ impl<'a> TableScanBuilder<'a> {
table_metadata: self.table.metadata_ref(),
column_names: self.column_names,
schema,
+ batch_size: self.batch_size,
})
}
}
@@ -123,6 +134,7 @@ pub struct TableScan {
file_io: FileIO,
column_names: Vec<String>,
schema: SchemaRef,
+ batch_size: Option<usize>,
}
/// A stream of [`FileScanTask`].
@@ -163,14 +175,26 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
+ let mut arrow_reader_builder =
+ ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
+
+ if let Some(batch_size) = self.batch_size {
+ arrow_reader_builder =
arrow_reader_builder.with_batch_size(batch_size);
+ }
+
+ arrow_reader_builder.build().read(self.plan_files().await?)
+ }
}
/// A task to scan part of file.
#[derive(Debug)]
-#[allow(dead_code)]
pub struct FileScanTask {
data_file: ManifestEntryRef,
+ #[allow(dead_code)]
start: u64,
+ #[allow(dead_code)]
length: u64,
}
@@ -178,9 +202,8 @@ pub struct FileScanTask {
pub type ArrowRecordBatchStream = BoxStream<'static,
crate::Result<RecordBatch>>;
impl FileScanTask {
- /// Returns a stream of arrow record batches.
- pub async fn execute(&self) -> crate::Result<ArrowRecordBatchStream> {
- todo!()
+ pub(crate) fn data_file(&self) -> ManifestEntryRef {
+ self.data_file.clone()
}
}
@@ -194,8 +217,15 @@ mod tests {
};
use crate::table::Table;
use crate::TableIdent;
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use futures::TryStreamExt;
+ use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
+ use parquet::basic::Compression;
+ use parquet::file::properties::WriterProperties;
+ use std::collections::HashMap;
use std::fs;
+ use std::fs::File;
+ use std::sync::Arc;
use tempfile::TempDir;
use tera::{Context, Tera};
use uuid::Uuid;
@@ -257,6 +287,128 @@ mod tests {
))
.unwrap()
}
+
+ async fn setup_manifest_files(&mut self) {
+ let current_snapshot =
self.table.metadata().current_snapshot().unwrap();
+ let parent_snapshot = current_snapshot
+ .parent_snapshot(self.table.metadata())
+ .unwrap();
+ let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
+ let current_partition_spec =
self.table.metadata().default_partition_spec().unwrap();
+
+ // Write data files
+ let data_file_manifest = ManifestWriter::new(
+ self.next_manifest_file(),
+ current_snapshot.snapshot_id(),
+ vec![],
+ )
+ .write(Manifest::new(
+ ManifestMetadata::builder()
+ .schema((*current_schema).clone())
+ .content(ManifestContentType::Data)
+ .format_version(FormatVersion::V2)
+ .partition_spec((**current_partition_spec).clone())
+ .schema_id(current_schema.schema_id())
+ .build(),
+ vec![
+ ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .data_file(
+ DataFile::builder()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/1.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(100))]))
+ .build(),
+ )
+ .build(),
+ ManifestEntry::builder()
+ .status(ManifestStatus::Deleted)
+ .snapshot_id(parent_snapshot.snapshot_id())
+ .sequence_number(parent_snapshot.sequence_number())
+
.file_sequence_number(parent_snapshot.sequence_number())
+ .data_file(
+ DataFile::builder()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/2.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(200))]))
+ .build(),
+ )
+ .build(),
+ ManifestEntry::builder()
+ .status(ManifestStatus::Existing)
+ .snapshot_id(parent_snapshot.snapshot_id())
+ .sequence_number(parent_snapshot.sequence_number())
+
.file_sequence_number(parent_snapshot.sequence_number())
+ .data_file(
+ DataFile::builder()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/3.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(300))]))
+ .build(),
+ )
+ .build(),
+ ],
+ ))
+ .await
+ .unwrap();
+
+ // Write to manifest list
+ let mut manifest_list_write = ManifestListWriter::v2(
+ self.table
+ .file_io()
+ .new_output(current_snapshot.manifest_list())
+ .unwrap(),
+ current_snapshot.snapshot_id(),
+ current_snapshot
+ .parent_snapshot_id()
+ .unwrap_or(EMPTY_SNAPSHOT_ID),
+ current_snapshot.sequence_number(),
+ );
+ manifest_list_write
+ .add_manifest_entries(vec![data_file_manifest].into_iter())
+ .unwrap();
+ manifest_list_write.close().await.unwrap();
+
+ // prepare data
+ let schema = {
+ let fields =
+ vec![
+ arrow_schema::Field::new("col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ ];
+ Arc::new(arrow_schema::Schema::new(fields))
+ };
+ let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let to_write = RecordBatch::try_new(schema.clone(),
vec![col]).unwrap();
+
+ // Write the Parquet files
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ for n in 1..=3 {
+ let file = File::create(format!("{}/{}.parquet",
&self.table_location, n)).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file, to_write.schema(),
Some(props.clone())).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+
+ // writer must be closed to write footer
+ writer.close().unwrap();
+ }
+ }
}
#[test]
@@ -324,97 +476,8 @@ mod tests {
#[tokio::test]
async fn test_plan_files_no_deletions() {
- let fixture = TableTestFixture::new();
-
- let current_snapshot =
fixture.table.metadata().current_snapshot().unwrap();
- let parent_snapshot = current_snapshot
- .parent_snapshot(fixture.table.metadata())
- .unwrap();
- let current_schema =
current_snapshot.schema(fixture.table.metadata()).unwrap();
- let current_partition_spec =
fixture.table.metadata().default_partition_spec().unwrap();
-
- // Write data files
- let data_file_manifest = ManifestWriter::new(
- fixture.next_manifest_file(),
- current_snapshot.snapshot_id(),
- vec![],
- )
- .write(Manifest::new(
- ManifestMetadata::builder()
- .schema((*current_schema).clone())
- .content(ManifestContentType::Data)
- .format_version(FormatVersion::V2)
- .partition_spec((**current_partition_spec).clone())
- .schema_id(current_schema.schema_id())
- .build(),
- vec![
- ManifestEntry::builder()
- .status(ManifestStatus::Added)
- .data_file(
- DataFile::builder()
- .content(DataContentType::Data)
- .file_path(format!("{}/1.parquet",
&fixture.table_location))
- .file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
- .record_count(1)
-
.partition(Struct::from_iter([Some(Literal::long(100))]))
- .build(),
- )
- .build(),
- ManifestEntry::builder()
- .status(ManifestStatus::Deleted)
- .snapshot_id(parent_snapshot.snapshot_id())
- .sequence_number(parent_snapshot.sequence_number())
- .file_sequence_number(parent_snapshot.sequence_number())
- .data_file(
- DataFile::builder()
- .content(DataContentType::Data)
- .file_path(format!("{}/2.parquet",
&fixture.table_location))
- .file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
- .record_count(1)
-
.partition(Struct::from_iter([Some(Literal::long(200))]))
- .build(),
- )
- .build(),
- ManifestEntry::builder()
- .status(ManifestStatus::Existing)
- .snapshot_id(parent_snapshot.snapshot_id())
- .sequence_number(parent_snapshot.sequence_number())
- .file_sequence_number(parent_snapshot.sequence_number())
- .data_file(
- DataFile::builder()
- .content(DataContentType::Data)
- .file_path(format!("{}/3.parquet",
&fixture.table_location))
- .file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
- .record_count(1)
-
.partition(Struct::from_iter([Some(Literal::long(300))]))
- .build(),
- )
- .build(),
- ],
- ))
- .await
- .unwrap();
-
- // Write to manifest list
- let mut manifest_list_write = ManifestListWriter::v2(
- fixture
- .table
- .file_io()
- .new_output(current_snapshot.manifest_list())
- .unwrap(),
- current_snapshot.snapshot_id(),
- current_snapshot
- .parent_snapshot_id()
- .unwrap_or(EMPTY_SNAPSHOT_ID),
- current_snapshot.sequence_number(),
- );
- manifest_list_write
- .add_manifest_entries(vec![data_file_manifest].into_iter())
- .unwrap();
- manifest_list_write.close().await.unwrap();
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
// Create table scan for current snapshot and plan files
let table_scan = fixture.table.scan().build().unwrap();
@@ -445,4 +508,22 @@ mod tests {
format!("{}/3.parquet", &fixture.table_location)
);
}
+
+ #[tokio::test]
+ async fn test_open_parquet_no_deletions() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Create table scan for current snapshot and plan files
+ let table_scan = fixture.table.scan().build().unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ let col = batches[0].column_by_name("col").unwrap();
+
+ let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(int64_arr.value(0), 1);
+ }
}
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index 7af8a1b..5a82007 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -851,11 +851,19 @@ impl ManifestEntry {
}
/// Content type of this manifest entry.
+ #[inline]
pub fn content_type(&self) -> DataContentType {
self.data_file.content
}
+ /// File format of this manifest entry.
+ #[inline]
+ pub fn file_format(&self) -> DataFileFormat {
+ self.data_file.file_format
+ }
+
/// Data file path of this manifest entry.
+ #[inline]
pub fn file_path(&self) -> &str {
&self.data_file.file_path
}