This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 865f774 refine: seperate parquet reader and arrow convert (#313)
865f774 is described below
commit 865f774d5dbd4fa16bce1f58f81f87ef3ffe5f2b
Author: ZENOTME <[email protected]>
AuthorDate: Wed Apr 3 22:52:12 2024 +0900
refine: seperate parquet reader and arrow convert (#313)
---
crates/iceberg/src/arrow/mod.rs | 23 +++
crates/iceberg/src/arrow/reader.rs | 189 +++++++++++++++++++++++
crates/iceberg/src/{arrow.rs => arrow/schema.rs} | 171 +-------------------
3 files changed, 213 insertions(+), 170 deletions(-)
diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs
new file mode 100644
index 0000000..2076a95
--- /dev/null
+++ b/crates/iceberg/src/arrow/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Conversion between Iceberg and Arrow schema
+
+mod schema;
+pub use schema::*;
+mod reader;
+pub use reader::*;
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
new file mode 100644
index 0000000..e3f30f8
--- /dev/null
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet file data reader
+
+use arrow_schema::SchemaRef as ArrowSchemaRef;
+use async_stream::try_stream;
+use futures::stream::StreamExt;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask,
PARQUET_FIELD_ID_META_KEY};
+use parquet::schema::types::SchemaDescriptor;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use crate::arrow::arrow_schema_to_schema;
+use crate::io::FileIO;
+use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
+use crate::spec::SchemaRef;
+use crate::{Error, ErrorKind};
+
+/// Builder to create ArrowReader
+pub struct ArrowReaderBuilder {
+ batch_size: Option<usize>,
+ field_ids: Vec<usize>,
+ file_io: FileIO,
+ schema: SchemaRef,
+}
+
+impl ArrowReaderBuilder {
+ /// Create a new ArrowReaderBuilder
+ pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
+ ArrowReaderBuilder {
+ batch_size: None,
+ field_ids: vec![],
+ file_io,
+ schema,
+ }
+ }
+
+ /// Sets the desired size of batches in the response
+ /// to something other than the default
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = Some(batch_size);
+ self
+ }
+
+ /// Sets the desired column projection with a list of field ids.
+ pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item =
usize>) -> Self {
+ self.field_ids = field_ids.into_iter().collect();
+ self
+ }
+
+ /// Build the ArrowReader.
+ pub fn build(self) -> ArrowReader {
+ ArrowReader {
+ batch_size: self.batch_size,
+ field_ids: self.field_ids,
+ schema: self.schema,
+ file_io: self.file_io,
+ }
+ }
+}
+
+/// Reads data from Parquet files
+pub struct ArrowReader {
+ batch_size: Option<usize>,
+ field_ids: Vec<usize>,
+ #[allow(dead_code)]
+ schema: SchemaRef,
+ file_io: FileIO,
+}
+
+impl ArrowReader {
+ /// Take a stream of FileScanTasks and reads all the files.
+ /// Returns a stream of Arrow RecordBatches containing the data from the
files
+ pub fn read(self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+
+ Ok(try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data().data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await?;
+
+ let parquet_schema = batch_stream_builder.parquet_schema();
+ let arrow_schema = batch_stream_builder.schema();
+ let projection_mask =
self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
+ batch_stream_builder =
batch_stream_builder.with_projection(projection_mask);
+
+ if let Some(batch_size) = self.batch_size {
+ batch_stream_builder =
batch_stream_builder.with_batch_size(batch_size);
+ }
+
+ let mut batch_stream = batch_stream_builder.build()?;
+
+ while let Some(batch) = batch_stream.next().await {
+ yield batch?;
+ }
+ }
+ }
+ .boxed())
+ }
+
+ fn get_arrow_projection_mask(
+ &self,
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ ) -> crate::Result<ProjectionMask> {
+ if self.field_ids.is_empty() {
+ Ok(ProjectionMask::all())
+ } else {
+ // Build the map between field id and column index in Parquet
schema.
+ let mut column_map = HashMap::new();
+
+ let fields = arrow_schema.fields();
+ let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
+ fields.filter_leaves(|idx, field| {
+ let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
+ if field_id.is_none() {
+ return false;
+ }
+
+ let field_id = i32::from_str(field_id.unwrap());
+ if field_id.is_err() {
+ return false;
+ }
+ let field_id = field_id.unwrap();
+
+ if !self.field_ids.contains(&(field_id as usize)) {
+ return false;
+ }
+
+ let iceberg_field = self.schema.field_by_id(field_id);
+ let parquet_iceberg_field =
iceberg_schema.field_by_id(field_id);
+
+ if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
+ return false;
+ }
+
+ if iceberg_field.unwrap().field_type !=
parquet_iceberg_field.unwrap().field_type {
+ return false;
+ }
+
+ column_map.insert(field_id, idx);
+ true
+ });
+
+ if column_map.len() != self.field_ids.len() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Parquet schema {} and Iceberg schema {} do not
match.",
+ iceberg_schema, self.schema
+ ),
+ ));
+ }
+
+ let mut indices = vec![];
+ for field_id in &self.field_ids {
+ if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
+ indices.push(*col_idx);
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Field {} is not found in Parquet schema.",
field_id),
+ ));
+ }
+ }
+ Ok(ProjectionMask::leaves(parquet_schema, indices))
+ }
+ }
+}
diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow/schema.rs
similarity index 82%
rename from crates/iceberg/src/arrow.rs
rename to crates/iceberg/src/arrow/schema.rs
index 4b23df8..7e01b20 100644
--- a/crates/iceberg/src/arrow.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -15,19 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Parquet file data reader
-
-use arrow_schema::SchemaRef as ArrowSchemaRef;
-use async_stream::try_stream;
-use futures::stream::StreamExt;
-use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask,
PARQUET_FIELD_ID_META_KEY};
-use parquet::schema::types::SchemaDescriptor;
-use std::collections::HashMap;
-use std::str::FromStr;
-
-use crate::io::FileIO;
-use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
-use crate::spec::SchemaRef;
+//! Conversion between Arrow schema and Iceberg schema.
use crate::error::Result;
use crate::spec::{
@@ -37,163 +25,6 @@ use crate::{Error, ErrorKind};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use std::sync::Arc;
-/// Builder to create ArrowReader
-pub struct ArrowReaderBuilder {
- batch_size: Option<usize>,
- field_ids: Vec<usize>,
- file_io: FileIO,
- schema: SchemaRef,
-}
-
-impl ArrowReaderBuilder {
- /// Create a new ArrowReaderBuilder
- pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
- ArrowReaderBuilder {
- batch_size: None,
- field_ids: vec![],
- file_io,
- schema,
- }
- }
-
- /// Sets the desired size of batches in the response
- /// to something other than the default
- pub fn with_batch_size(mut self, batch_size: usize) -> Self {
- self.batch_size = Some(batch_size);
- self
- }
-
- /// Sets the desired column projection with a list of field ids.
- pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item =
usize>) -> Self {
- self.field_ids = field_ids.into_iter().collect();
- self
- }
-
- /// Build the ArrowReader.
- pub fn build(self) -> ArrowReader {
- ArrowReader {
- batch_size: self.batch_size,
- field_ids: self.field_ids,
- schema: self.schema,
- file_io: self.file_io,
- }
- }
-}
-
-/// Reads data from Parquet files
-pub struct ArrowReader {
- batch_size: Option<usize>,
- field_ids: Vec<usize>,
- #[allow(dead_code)]
- schema: SchemaRef,
- file_io: FileIO,
-}
-
-impl ArrowReader {
- /// Take a stream of FileScanTasks and reads all the files.
- /// Returns a stream of Arrow RecordBatches containing the data from the
files
- pub fn read(self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
- let file_io = self.file_io.clone();
-
- Ok(try_stream! {
- while let Some(Ok(task)) = tasks.next().await {
- let parquet_reader = file_io
- .new_input(task.data().data_file().file_path())?
- .reader()
- .await?;
-
- let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
- .await?;
-
- let parquet_schema = batch_stream_builder.parquet_schema();
- let arrow_schema = batch_stream_builder.schema();
- let projection_mask =
self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
- batch_stream_builder =
batch_stream_builder.with_projection(projection_mask);
-
- if let Some(batch_size) = self.batch_size {
- batch_stream_builder =
batch_stream_builder.with_batch_size(batch_size);
- }
-
- let mut batch_stream = batch_stream_builder.build()?;
-
- while let Some(batch) = batch_stream.next().await {
- yield batch?;
- }
- }
- }
- .boxed())
- }
-
- fn get_arrow_projection_mask(
- &self,
- parquet_schema: &SchemaDescriptor,
- arrow_schema: &ArrowSchemaRef,
- ) -> crate::Result<ProjectionMask> {
- if self.field_ids.is_empty() {
- Ok(ProjectionMask::all())
- } else {
- // Build the map between field id and column index in Parquet
schema.
- let mut column_map = HashMap::new();
-
- let fields = arrow_schema.fields();
- let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
- fields.filter_leaves(|idx, field| {
- let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
- if field_id.is_none() {
- return false;
- }
-
- let field_id = i32::from_str(field_id.unwrap());
- if field_id.is_err() {
- return false;
- }
- let field_id = field_id.unwrap();
-
- if !self.field_ids.contains(&(field_id as usize)) {
- return false;
- }
-
- let iceberg_field = self.schema.field_by_id(field_id);
- let parquet_iceberg_field =
iceberg_schema.field_by_id(field_id);
-
- if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
- return false;
- }
-
- if iceberg_field.unwrap().field_type !=
parquet_iceberg_field.unwrap().field_type {
- return false;
- }
-
- column_map.insert(field_id, idx);
- true
- });
-
- if column_map.len() != self.field_ids.len() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Parquet schema {} and Iceberg schema {} do not
match.",
- iceberg_schema, self.schema
- ),
- ));
- }
-
- let mut indices = vec![];
- for field_id in &self.field_ids {
- if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
- indices.push(*col_idx);
- } else {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Field {} is not found in Parquet schema.",
field_id),
- ));
- }
- }
- Ok(ProjectionMask::leaves(parquet_schema, indices))
- }
- }
-}
-
/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].