luoyuxia commented on code in PR #225: URL: https://github.com/apache/paimon-rust/pull/225#discussion_r3048788740
########## crates/paimon/src/arrow/format/avro.rs: ########## @@ -0,0 +1,998 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FilePredicates, FormatFileReader}; +use crate::arrow::build_target_arrow_schema; +use crate::io::FileRead; +use crate::spec::{DataField, DataType, MapType, RowType}; +use crate::table::{ArrowRecordBatchStream, RowRange}; +use crate::Error; +use arrow_array::{ + BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, ListArray, MapArray, RecordBatch, StringArray, + StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, +}; +use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::SchemaRef; +use async_stream::try_stream; +use async_trait::async_trait; +use futures::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// AvroValue: a serde_json::Value replacement that handles Avro bytes +// --------------------------------------------------------------------------- + +/// Lightweight value type that can represent all Avro primitives including bytes. +/// `serde_json::Value` rejects byte arrays, so we need our own. +#[derive(Debug, Clone, PartialEq)] +enum AvroValue { + Null, + Bool(bool), + Int(i64), + Float(f64), + String(String), + Bytes(Vec<u8>), + /// Avro array / sequence. + Array(Vec<AvroValue>), + /// Union wrapper or record: `{"type": value}` produced by serde_avro_fast. + Object(HashMap<String, AvroValue>), +} + +impl AvroValue { + fn as_bool(&self) -> Option<bool> { + match self { + AvroValue::Bool(b) => Some(*b), + _ => None, + } + } + + fn as_i64(&self) -> Option<i64> { + match self { + AvroValue::Int(n) => Some(*n), + _ => None, + } + } + + fn as_f64(&self) -> Option<f64> { + match self { + AvroValue::Float(f) => Some(*f), + AvroValue::Int(n) => Some(*n as f64), + _ => None, + } + } + + fn as_str(&self) -> Option<&str> { + match self { + AvroValue::String(s) => Some(s), + _ => None, + } + } + + fn as_bytes(&self) -> Option<&[u8]> { + match self { + AvroValue::Bytes(b) => Some(b), + AvroValue::String(s) => Some(s.as_bytes()), + _ => None, + } + } +} + +impl<'de> serde::Deserialize<'de> for AvroValue { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct AvroValueVisitor; + + impl<'de> serde::de::Visitor<'de> for AvroValueVisitor { + type Value = AvroValue; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("any Avro value") + } + + fn visit_bool<E>(self, v: bool) -> Result<AvroValue, E> { + Ok(AvroValue::Bool(v)) + } + + fn visit_i8<E>(self, v: i8) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i16<E>(self, v: i16) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i32<E>(self, v: i32) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i64<E>(self, v: i64) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v)) + } + + fn visit_u64<E>(self, v: u64) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<AvroValue, E> { + Ok(AvroValue::Float(v as f64)) + } + + fn visit_f64<E>(self, v: f64) -> Result<AvroValue, E> { + Ok(AvroValue::Float(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<AvroValue, E> { + Ok(AvroValue::String(v.to_owned())) + } + + fn visit_string<E>(self, v: String) -> Result<AvroValue, E> { + Ok(AvroValue::String(v)) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<AvroValue, E> { + Ok(AvroValue::Bytes(v.to_vec())) + } + + fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<AvroValue, E> { + Ok(AvroValue::Bytes(v)) + } + + fn visit_none<E>(self) -> Result<AvroValue, E> { + Ok(AvroValue::Null) + } + + fn visit_unit<E>(self) -> Result<AvroValue, E> { + Ok(AvroValue::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<AvroValue, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut m = HashMap::new(); + while let Some((k, v)) = map.next_entry::<String, AvroValue>()? { + m.insert(k, v); + } + Ok(AvroValue::Object(m)) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<AvroValue, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut v = Vec::new(); + while let Some(elem) = seq.next_element::<AvroValue>()? { + v.push(elem); + } + Ok(AvroValue::Array(v)) + } + } + + deserializer.deserialize_any(AvroValueVisitor) + } +} + +pub(crate) struct AvroFormatReader; + +const DEFAULT_BATCH_SIZE: usize = 8192; + +#[async_trait] +impl FormatFileReader for AvroFormatReader { + async fn read_batch_stream( + &self, + reader: Box<dyn FileRead>, + file_size: u64, + read_fields: &[DataField], + _predicates: Option<&FilePredicates>, + batch_size: Option<usize>, + row_selection: Option<Vec<RowRange>>, + ) -> crate::Result<ArrowRecordBatchStream> { + // NOTE: Avro OCF requires sequential reading, so we load the entire file into memory. + // This is fine for typical Paimon data files but may be problematic for very large files. + let file_bytes = reader.read(0..file_size).await?; + + let read_fields = read_fields.to_vec(); + let target_schema = build_target_arrow_schema(&read_fields)?; + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + // Deserialize all Avro records from the OCF file. + let mut reader = + serde_avro_fast::object_container_file_encoding::Reader::from_slice(&file_bytes) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to open Avro file: {e}"), + source: Some(Box::new(e)), + })?; + + let mut all_records: Vec<HashMap<String, AvroValue>> = Vec::new(); + for result in reader.deserialize_borrowed::<HashMap<String, AvroValue>>() { + let record = result.map_err(|e| Error::UnexpectedError { + message: format!("Failed to deserialize Avro record: {e}"), + source: Some(Box::new(e)), + })?; + all_records.push(record); + } + + // Apply row selection filtering. + let records: Vec<HashMap<String, AvroValue>> = match row_selection { + Some(ref ranges) => { + let total_rows = all_records.len(); + let mask = ranges_to_mask(total_rows, ranges); + all_records + .into_iter() + .enumerate() + .filter(|(i, _)| mask[*i]) + .map(|(_, r)| r) + .collect() + } + None => all_records, + }; + + Ok(try_stream! { + for chunk in records.chunks(batch_size) { + let batch = records_to_batch(chunk, &read_fields, &target_schema)?; + yield batch; + } + } + .boxed()) + } +} + +// --------------------------------------------------------------------------- +// Row ranges → boolean mask +// --------------------------------------------------------------------------- + +fn ranges_to_mask(total_rows: usize, ranges: &[RowRange]) -> Vec<bool> { + let mut mask = vec![false; total_rows]; + let file_end = total_rows as i64 - 1; + for r in ranges { + let from = r.from().max(0) as usize; + let to = (r.to().min(file_end) as usize).min(total_rows - 1); Review Comment: not sure whether it's possible that total_rows would be 0, if total_rows is 0, `total_rows - 1` may bring unexpected behavior. I saw orc just return directly when total_rows is 0 ########## crates/paimon/src/arrow/format/orc.rs: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FilePredicates, FormatFileReader}; +use crate::io::FileRead; +use crate::spec::DataField; +use crate::table::{ArrowRecordBatchStream, RowRange}; +use crate::Error; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{future::BoxFuture, StreamExt}; +use orc_rust::projection::ProjectionMask; +use orc_rust::reader::AsyncChunkReader; +use orc_rust::ArrowReaderBuilder; + +pub(crate) struct OrcFormatReader; + +#[async_trait] +impl FormatFileReader for OrcFormatReader { + async fn read_batch_stream( + &self, + reader: Box<dyn FileRead>, + file_size: u64, + read_fields: &[DataField], + _predicates: Option<&FilePredicates>, Review Comment: nit: maybe add todo comment for the predicate pushdown? ########## crates/paimon/src/arrow/format/orc.rs: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FilePredicates, FormatFileReader}; +use crate::io::FileRead; +use crate::spec::DataField; +use crate::table::{ArrowRecordBatchStream, RowRange}; +use crate::Error; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{future::BoxFuture, StreamExt}; +use orc_rust::projection::ProjectionMask; +use orc_rust::reader::AsyncChunkReader; +use orc_rust::ArrowReaderBuilder; + +pub(crate) struct OrcFormatReader; + +#[async_trait] +impl FormatFileReader for OrcFormatReader { + async fn read_batch_stream( + &self, + reader: Box<dyn FileRead>, + file_size: u64, + read_fields: &[DataField], + _predicates: Option<&FilePredicates>, + batch_size: Option<usize>, + row_selection: Option<Vec<RowRange>>, + ) -> crate::Result<ArrowRecordBatchStream> { + let orc_reader = OrcFileReader::new(file_size, reader); + + let builder = ArrowReaderBuilder::try_new_async(orc_reader) + .await + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to open ORC file: {e}"), + source: Some(Box::new(e)), + })?; + + let root_data_type = builder.file_metadata().root_data_type().clone(); + let projected_names: Vec<&str> = read_fields.iter().map(|f| f.name()).collect(); + let projection = ProjectionMask::named_roots(&root_data_type, &projected_names); Review Comment: nit: ```suggestion let projection = ProjectionMask::named_roots(builder.file_metadata().root_data_type(), &projected_names); ``` Then, we can remove root_data_type clone -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
