This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e4308d  Implement basic Parquet data file reading capability (#207)
8e4308d is described below

commit 8e4308d53fdc00195429261f8566e0bf19ef6849
Author: Scott Donnelly <[email protected]>
AuthorDate: Thu Mar 7 09:40:13 2024 +0000

    Implement basic Parquet data file reading capability (#207)
    
    * feat: TableScan parquet file read to RecordBatch stream
    
    * chore: add inline hinting and fix incorrect comment
    
    * refactor: extract record batch reader
    
    * refactor: rename `FileRecordBatchReader` to `ArrowReader`
    
    * refactor: rename file_record_batch_reader.rs to arrow.rs
    * refactor: move `batch_size` param to `TableScanBuilder`
    * refactor: rename `TableScan.execute` to `to_arrow`
    
    * refactor: use builder pattern to create `ArrowReader`
---
 Cargo.toml                          |   3 +
 crates/iceberg/Cargo.toml           |   4 +
 crates/iceberg/src/arrow.rs         | 108 ++++++++++++++
 crates/iceberg/src/error.rs         |   6 +
 crates/iceberg/src/io.rs            |   8 +-
 crates/iceberg/src/lib.rs           |   1 +
 crates/iceberg/src/scan.rs          | 271 +++++++++++++++++++++++-------------
 crates/iceberg/src/spec/manifest.rs |   8 ++
 8 files changed, 312 insertions(+), 97 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index dccc6bd..3234bd0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,9 +34,11 @@ apache-avro = "0.16"
 arrow-arith = { version = ">=46" }
 arrow-array = { version = ">=46" }
 arrow-schema = { version = ">=46" }
+async-stream = "0.3.5"
 async-trait = "0.1"
 bimap = "0.6"
 bitvec = "1.0.1"
+bytes = "1.5"
 chrono = "0.4"
 derive_builder = "0.20.0"
 either = "1"
@@ -52,6 +54,7 @@ murmur3 = "0.5.2"
 once_cell = "1"
 opendal = "0.45"
 ordered-float = "4.0.0"
+parquet = "50"
 pretty_assertions = "1.4.0"
 port_scanner = "0.1.5"
 reqwest = { version = "^0.11", features = ["json"] }
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 32288ee..181832c 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -34,9 +34,11 @@ apache-avro = { workspace = true }
 arrow-arith = { workspace = true }
 arrow-array = { workspace = true }
 arrow-schema = { workspace = true }
+async-stream = { workspace = true }
 async-trait = { workspace = true }
 bimap = { workspace = true }
 bitvec = { workspace = true }
+bytes = { workspace = true }
 chrono = { workspace = true }
 derive_builder = { workspace = true }
 either = { workspace = true }
@@ -48,6 +50,7 @@ murmur3 = { workspace = true }
 once_cell = { workspace = true }
 opendal = { workspace = true }
 ordered-float = { workspace = true }
+parquet = { workspace = true, features = ["async"] }
 reqwest = { workspace = true }
 rust_decimal = { workspace = true }
 serde = { workspace = true }
@@ -56,6 +59,7 @@ serde_derive = { workspace = true }
 serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
+tokio = { workspace = true }
 typed-builder = { workspace = true }
 url = { workspace = true }
 urlencoding = { workspace = true }
diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs
new file mode 100644
index 0000000..6de9b6a
--- /dev/null
+++ b/crates/iceberg/src/arrow.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet file data reader
+
+use async_stream::try_stream;
+use futures::stream::StreamExt;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+
+use crate::io::FileIO;
+use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
+use crate::spec::SchemaRef;
+
+/// Builder to create ArrowReader
+pub struct ArrowReaderBuilder {
+    batch_size: Option<usize>,
+    file_io: FileIO,
+    schema: SchemaRef,
+}
+
+impl ArrowReaderBuilder {
+    /// Create a new ArrowReaderBuilder
+    pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
+        ArrowReaderBuilder {
+            batch_size: None,
+            file_io,
+            schema,
+        }
+    }
+
+    /// Sets the desired size of batches in the response
+    /// to something other than the default
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = Some(batch_size);
+        self
+    }
+
+    /// Build the ArrowReader.
+    pub fn build(self) -> ArrowReader {
+        ArrowReader {
+            batch_size: self.batch_size,
+            schema: self.schema,
+            file_io: self.file_io,
+        }
+    }
+}
+
+/// Reads data from Parquet files
+pub struct ArrowReader {
+    batch_size: Option<usize>,
+    #[allow(dead_code)]
+    schema: SchemaRef,
+    file_io: FileIO,
+}
+
+impl ArrowReader {
+    /// Take a stream of FileScanTasks and reads all the files.
+    /// Returns a stream of Arrow RecordBatches containing the data from the 
files
+    pub fn read(self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+
+        Ok(try_stream! {
+            while let Some(Ok(task)) = tasks.next().await {
+
+                let projection_mask = self.get_arrow_projection_mask(&task);
+
+                let parquet_reader = file_io
+                    .new_input(task.data_file().file_path())?
+                    .reader()
+                    .await?;
+
+                let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                    .await?
+                    .with_projection(projection_mask);
+
+                if let Some(batch_size) = self.batch_size {
+                    batch_stream_builder = 
batch_stream_builder.with_batch_size(batch_size);
+                }
+
+                let mut batch_stream = batch_stream_builder.build()?;
+
+                while let Some(batch) = batch_stream.next().await {
+                    yield batch?;
+                }
+            }
+        }
+        .boxed())
+    }
+
+    fn get_arrow_projection_mask(&self, _task: &FileScanTask) -> 
ProjectionMask {
+        // TODO: full implementation
+        ProjectionMask::all()
+    }
+}
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index c851402..f920fa9 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -325,6 +325,12 @@ define_from_err!(
     "Failed to convert decimal literal to rust decimal"
 );
 
+define_from_err!(
+    parquet::errors::ParquetError,
+    ErrorKind::Unexpected,
+    "Failed to read a Parquet file"
+);
+
 define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
 
 /// Helper macro to check arguments.
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 3a7c85f..410d870 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
 use futures::{AsyncRead, AsyncSeek, AsyncWrite};
 use once_cell::sync::Lazy;
 use opendal::{Operator, Scheme};
+use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
 use url::Url;
 
 /// Following are arguments for [s3 file 
io](https://py.iceberg.apache.org/configuration/#s3).
@@ -215,9 +216,12 @@ pub struct InputFile {
 }
 
 /// Trait for reading file.
-pub trait FileRead: AsyncRead + AsyncSeek {}
+pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + 
TokioAsyncSeek {}
 
-impl<T> FileRead for T where T: AsyncRead + AsyncSeek {}
+impl<T> FileRead for T where
+    T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
+{
+}
 
 impl InputFile {
     /// Absolute path to root uri.
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 9ceadca..17a94d4 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -52,4 +52,5 @@ pub mod expr;
 pub mod transaction;
 pub mod transform;
 
+pub mod arrow;
 pub mod writer;
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 0a3b9a9..cca26b6 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -17,6 +17,7 @@
 
 //! Table scan api.
 
+use crate::arrow::ArrowReaderBuilder;
 use crate::io::FileIO;
 use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, 
TableMetadataRef};
 use crate::table::Table;
@@ -31,6 +32,7 @@ pub struct TableScanBuilder<'a> {
     // Empty column names means to select all columns
     column_names: Vec<String>,
     snapshot_id: Option<i64>,
+    batch_size: Option<usize>,
 }
 
 impl<'a> TableScanBuilder<'a> {
@@ -39,9 +41,17 @@ impl<'a> TableScanBuilder<'a> {
             table,
             column_names: vec![],
             snapshot_id: None,
+            batch_size: None,
         }
     }
 
+    /// Sets the desired size of batches in the response
+    /// to something other than the default
+    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Select all columns.
     pub fn select_all(mut self) -> Self {
         self.column_names.clear();
@@ -110,6 +120,7 @@ impl<'a> TableScanBuilder<'a> {
             table_metadata: self.table.metadata_ref(),
             column_names: self.column_names,
             schema,
+            batch_size: self.batch_size,
         })
     }
 }
@@ -123,6 +134,7 @@ pub struct TableScan {
     file_io: FileIO,
     column_names: Vec<String>,
     schema: SchemaRef,
+    batch_size: Option<usize>,
 }
 
 /// A stream of [`FileScanTask`].
@@ -163,14 +175,26 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
+        let mut arrow_reader_builder =
+            ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
+
+        if let Some(batch_size) = self.batch_size {
+            arrow_reader_builder = 
arrow_reader_builder.with_batch_size(batch_size);
+        }
+
+        arrow_reader_builder.build().read(self.plan_files().await?)
+    }
 }
 
 /// A task to scan part of file.
 #[derive(Debug)]
-#[allow(dead_code)]
 pub struct FileScanTask {
     data_file: ManifestEntryRef,
+    #[allow(dead_code)]
     start: u64,
+    #[allow(dead_code)]
     length: u64,
 }
 
@@ -178,9 +202,8 @@ pub struct FileScanTask {
 pub type ArrowRecordBatchStream = BoxStream<'static, 
crate::Result<RecordBatch>>;
 
 impl FileScanTask {
-    /// Returns a stream of arrow record batches.
-    pub async fn execute(&self) -> crate::Result<ArrowRecordBatchStream> {
-        todo!()
+    pub(crate) fn data_file(&self) -> ManifestEntryRef {
+        self.data_file.clone()
     }
 }
 
@@ -194,8 +217,15 @@ mod tests {
     };
     use crate::table::Table;
     use crate::TableIdent;
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
     use futures::TryStreamExt;
+    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
+    use parquet::basic::Compression;
+    use parquet::file::properties::WriterProperties;
+    use std::collections::HashMap;
     use std::fs;
+    use std::fs::File;
+    use std::sync::Arc;
     use tempfile::TempDir;
     use tera::{Context, Tera};
     use uuid::Uuid;
@@ -257,6 +287,128 @@ mod tests {
                 ))
                 .unwrap()
         }
+
+        async fn setup_manifest_files(&mut self) {
+            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
+            let parent_snapshot = current_snapshot
+                .parent_snapshot(self.table.metadata())
+                .unwrap();
+            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
+            let current_partition_spec = 
self.table.metadata().default_partition_spec().unwrap();
+
+            // Write data files
+            let data_file_manifest = ManifestWriter::new(
+                self.next_manifest_file(),
+                current_snapshot.snapshot_id(),
+                vec![],
+            )
+            .write(Manifest::new(
+                ManifestMetadata::builder()
+                    .schema((*current_schema).clone())
+                    .content(ManifestContentType::Data)
+                    .format_version(FormatVersion::V2)
+                    .partition_spec((**current_partition_spec).clone())
+                    .schema_id(current_schema.schema_id())
+                    .build(),
+                vec![
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Added)
+                        .data_file(
+                            DataFile::builder()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/1.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                                .build(),
+                        )
+                        .build(),
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Deleted)
+                        .snapshot_id(parent_snapshot.snapshot_id())
+                        .sequence_number(parent_snapshot.sequence_number())
+                        
.file_sequence_number(parent_snapshot.sequence_number())
+                        .data_file(
+                            DataFile::builder()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/2.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                
.partition(Struct::from_iter([Some(Literal::long(200))]))
+                                .build(),
+                        )
+                        .build(),
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Existing)
+                        .snapshot_id(parent_snapshot.snapshot_id())
+                        .sequence_number(parent_snapshot.sequence_number())
+                        
.file_sequence_number(parent_snapshot.sequence_number())
+                        .data_file(
+                            DataFile::builder()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/3.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                
.partition(Struct::from_iter([Some(Literal::long(300))]))
+                                .build(),
+                        )
+                        .build(),
+                ],
+            ))
+            .await
+            .unwrap();
+
+            // Write to manifest list
+            let mut manifest_list_write = ManifestListWriter::v2(
+                self.table
+                    .file_io()
+                    .new_output(current_snapshot.manifest_list())
+                    .unwrap(),
+                current_snapshot.snapshot_id(),
+                current_snapshot
+                    .parent_snapshot_id()
+                    .unwrap_or(EMPTY_SNAPSHOT_ID),
+                current_snapshot.sequence_number(),
+            );
+            manifest_list_write
+                .add_manifest_entries(vec![data_file_manifest].into_iter())
+                .unwrap();
+            manifest_list_write.close().await.unwrap();
+
+            // prepare data
+            let schema = {
+                let fields =
+                    vec![
+                        arrow_schema::Field::new("col", 
arrow_schema::DataType::Int64, true)
+                            .with_metadata(HashMap::from([(
+                                PARQUET_FIELD_ID_META_KEY.to_string(),
+                                "0".to_string(),
+                            )])),
+                    ];
+                Arc::new(arrow_schema::Schema::new(fields))
+            };
+            let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
+            let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
+
+            // Write the Parquet files
+            let props = WriterProperties::builder()
+                .set_compression(Compression::SNAPPY)
+                .build();
+
+            for n in 1..=3 {
+                let file = File::create(format!("{}/{}.parquet", 
&self.table_location, n)).unwrap();
+                let mut writer =
+                    ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
+
+                writer.write(&to_write).expect("Writing batch");
+
+                // writer must be closed to write footer
+                writer.close().unwrap();
+            }
+        }
     }
 
     #[test]
@@ -324,97 +476,8 @@ mod tests {
 
     #[tokio::test]
     async fn test_plan_files_no_deletions() {
-        let fixture = TableTestFixture::new();
-
-        let current_snapshot = 
fixture.table.metadata().current_snapshot().unwrap();
-        let parent_snapshot = current_snapshot
-            .parent_snapshot(fixture.table.metadata())
-            .unwrap();
-        let current_schema = 
current_snapshot.schema(fixture.table.metadata()).unwrap();
-        let current_partition_spec = 
fixture.table.metadata().default_partition_spec().unwrap();
-
-        // Write data files
-        let data_file_manifest = ManifestWriter::new(
-            fixture.next_manifest_file(),
-            current_snapshot.snapshot_id(),
-            vec![],
-        )
-        .write(Manifest::new(
-            ManifestMetadata::builder()
-                .schema((*current_schema).clone())
-                .content(ManifestContentType::Data)
-                .format_version(FormatVersion::V2)
-                .partition_spec((**current_partition_spec).clone())
-                .schema_id(current_schema.schema_id())
-                .build(),
-            vec![
-                ManifestEntry::builder()
-                    .status(ManifestStatus::Added)
-                    .data_file(
-                        DataFile::builder()
-                            .content(DataContentType::Data)
-                            .file_path(format!("{}/1.parquet", 
&fixture.table_location))
-                            .file_format(DataFileFormat::Parquet)
-                            .file_size_in_bytes(100)
-                            .record_count(1)
-                            
.partition(Struct::from_iter([Some(Literal::long(100))]))
-                            .build(),
-                    )
-                    .build(),
-                ManifestEntry::builder()
-                    .status(ManifestStatus::Deleted)
-                    .snapshot_id(parent_snapshot.snapshot_id())
-                    .sequence_number(parent_snapshot.sequence_number())
-                    .file_sequence_number(parent_snapshot.sequence_number())
-                    .data_file(
-                        DataFile::builder()
-                            .content(DataContentType::Data)
-                            .file_path(format!("{}/2.parquet", 
&fixture.table_location))
-                            .file_format(DataFileFormat::Parquet)
-                            .file_size_in_bytes(100)
-                            .record_count(1)
-                            
.partition(Struct::from_iter([Some(Literal::long(200))]))
-                            .build(),
-                    )
-                    .build(),
-                ManifestEntry::builder()
-                    .status(ManifestStatus::Existing)
-                    .snapshot_id(parent_snapshot.snapshot_id())
-                    .sequence_number(parent_snapshot.sequence_number())
-                    .file_sequence_number(parent_snapshot.sequence_number())
-                    .data_file(
-                        DataFile::builder()
-                            .content(DataContentType::Data)
-                            .file_path(format!("{}/3.parquet", 
&fixture.table_location))
-                            .file_format(DataFileFormat::Parquet)
-                            .file_size_in_bytes(100)
-                            .record_count(1)
-                            
.partition(Struct::from_iter([Some(Literal::long(300))]))
-                            .build(),
-                    )
-                    .build(),
-            ],
-        ))
-        .await
-        .unwrap();
-
-        // Write to manifest list
-        let mut manifest_list_write = ManifestListWriter::v2(
-            fixture
-                .table
-                .file_io()
-                .new_output(current_snapshot.manifest_list())
-                .unwrap(),
-            current_snapshot.snapshot_id(),
-            current_snapshot
-                .parent_snapshot_id()
-                .unwrap_or(EMPTY_SNAPSHOT_ID),
-            current_snapshot.sequence_number(),
-        );
-        manifest_list_write
-            .add_manifest_entries(vec![data_file_manifest].into_iter())
-            .unwrap();
-        manifest_list_write.close().await.unwrap();
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
 
         // Create table scan for current snapshot and plan files
         let table_scan = fixture.table.scan().build().unwrap();
@@ -445,4 +508,22 @@ mod tests {
             format!("{}/3.parquet", &fixture.table_location)
         );
     }
+
+    #[tokio::test]
+    async fn test_open_parquet_no_deletions() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Create table scan for current snapshot and plan files
+        let table_scan = fixture.table.scan().build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        let col = batches[0].column_by_name("col").unwrap();
+
+        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(int64_arr.value(0), 1);
+    }
 }
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index 7af8a1b..5a82007 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -851,11 +851,19 @@ impl ManifestEntry {
     }
 
     /// Content type of this manifest entry.
+    #[inline]
     pub fn content_type(&self) -> DataContentType {
         self.data_file.content
     }
 
+    /// File format of this manifest entry.
+    #[inline]
+    pub fn file_format(&self) -> DataFileFormat {
+        self.data_file.file_format
+    }
+
     /// Data file path of this manifest entry.
+    #[inline]
     pub fn file_path(&self) -> &str {
         &self.data_file.file_path
     }

Reply via email to