QuakeWang commented on code in PR #225: URL: https://github.com/apache/paimon-rust/pull/225#discussion_r3046033069
########## crates/paimon/src/arrow/format/avro.rs: ########## @@ -0,0 +1,989 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FilePredicates, FormatFileReader}; +use crate::arrow::build_target_arrow_schema; +use crate::io::FileRead; +use crate::spec::{DataField, DataType, MapType, RowType}; +use crate::table::{ArrowRecordBatchStream, RowRange}; +use crate::Error; +use arrow_array::{ + BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, ListArray, MapArray, RecordBatch, StringArray, + StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, +}; +use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::SchemaRef; +use async_stream::try_stream; +use async_trait::async_trait; +use futures::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// AvroValue: a serde_json::Value replacement that handles Avro bytes +// --------------------------------------------------------------------------- + +/// Lightweight value type that can represent all Avro primitives including bytes. +/// `serde_json::Value` rejects byte arrays, so we need our own. +#[derive(Debug, Clone, PartialEq)] +enum AvroValue { + Null, + Bool(bool), + Int(i64), + Float(f64), + String(String), + Bytes(Vec<u8>), + /// Avro array / sequence. + Array(Vec<AvroValue>), + /// Union wrapper or record: `{"type": value}` produced by serde_avro_fast. + Object(HashMap<String, AvroValue>), +} + +impl AvroValue { + fn as_bool(&self) -> Option<bool> { + match self { + AvroValue::Bool(b) => Some(*b), + _ => None, + } + } + + fn as_i64(&self) -> Option<i64> { + match self { + AvroValue::Int(n) => Some(*n), + _ => None, + } + } + + fn as_f64(&self) -> Option<f64> { + match self { + AvroValue::Float(f) => Some(*f), + AvroValue::Int(n) => Some(*n as f64), + _ => None, + } + } + + fn as_str(&self) -> Option<&str> { + match self { + AvroValue::String(s) => Some(s), + _ => None, + } + } + + fn as_bytes(&self) -> Option<&[u8]> { + match self { + AvroValue::Bytes(b) => Some(b), + AvroValue::String(s) => Some(s.as_bytes()), + _ => None, + } + } +} + +impl<'de> serde::Deserialize<'de> for AvroValue { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct AvroValueVisitor; + + impl<'de> serde::de::Visitor<'de> for AvroValueVisitor { + type Value = AvroValue; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("any Avro value") + } + + fn visit_bool<E>(self, v: bool) -> Result<AvroValue, E> { + Ok(AvroValue::Bool(v)) + } + + fn visit_i8<E>(self, v: i8) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i16<E>(self, v: i16) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i32<E>(self, v: i32) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_i64<E>(self, v: i64) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v)) + } + + fn visit_u64<E>(self, v: u64) -> Result<AvroValue, E> { + Ok(AvroValue::Int(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<AvroValue, E> { + Ok(AvroValue::Float(v as f64)) + } + + fn visit_f64<E>(self, v: f64) -> Result<AvroValue, E> { + Ok(AvroValue::Float(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<AvroValue, E> { + Ok(AvroValue::String(v.to_owned())) + } + + fn visit_string<E>(self, v: String) -> Result<AvroValue, E> { + Ok(AvroValue::String(v)) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<AvroValue, E> { + Ok(AvroValue::Bytes(v.to_vec())) + } + + fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<AvroValue, E> { + Ok(AvroValue::Bytes(v)) + } + + fn visit_none<E>(self) -> Result<AvroValue, E> { + Ok(AvroValue::Null) + } + + fn visit_unit<E>(self) -> Result<AvroValue, E> { + Ok(AvroValue::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<AvroValue, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut m = HashMap::new(); + while let Some((k, v)) = map.next_entry::<String, AvroValue>()? { + m.insert(k, v); + } + Ok(AvroValue::Object(m)) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<AvroValue, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut v = Vec::new(); + while let Some(elem) = seq.next_element::<AvroValue>()? { + v.push(elem); + } + Ok(AvroValue::Array(v)) + } + } + + deserializer.deserialize_any(AvroValueVisitor) + } +} + +pub(crate) struct AvroFormatReader; + +const DEFAULT_BATCH_SIZE: usize = 8192; + +#[async_trait] +impl FormatFileReader for AvroFormatReader { + async fn read_batch_stream( + &self, + reader: Box<dyn FileRead>, + file_size: u64, + read_fields: &[DataField], + _predicates: Option<&FilePredicates>, + batch_size: Option<usize>, + row_selection: Option<Vec<RowRange>>, + ) -> crate::Result<ArrowRecordBatchStream> { + // NOTE: Avro OCF requires sequential reading, so we load the entire file into memory. + // This is fine for typical Paimon data files but may be problematic for very large files. + let file_bytes = reader.read(0..file_size).await?; + + let read_fields = read_fields.to_vec(); + let target_schema = build_target_arrow_schema(&read_fields)?; + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + // Deserialize all Avro records from the OCF file. + let mut reader = + serde_avro_fast::object_container_file_encoding::Reader::from_slice(&file_bytes) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to open Avro file: {e}"), + source: Some(Box::new(e)), + })?; + + let mut all_records: Vec<HashMap<String, AvroValue>> = Vec::new(); + for result in reader.deserialize_borrowed::<HashMap<String, AvroValue>>() { + let record = result.map_err(|e| Error::UnexpectedError { + message: format!("Failed to deserialize Avro record: {e}"), + source: Some(Box::new(e)), + })?; + all_records.push(record); + } + + // Apply row selection filtering. + let records: Vec<HashMap<String, AvroValue>> = match row_selection { + Some(ref ranges) => { + let total_rows = all_records.len(); + let mask = ranges_to_mask(total_rows, ranges); + all_records + .into_iter() + .enumerate() + .filter(|(i, _)| mask[*i]) + .map(|(_, r)| r) + .collect() + } + None => all_records, + }; + + Ok(try_stream! { + for chunk in records.chunks(batch_size) { + let batch = records_to_batch(chunk, &read_fields, &target_schema)?; + yield batch; + } + } + .boxed()) + } +} + +// --------------------------------------------------------------------------- +// Row ranges → boolean mask +// --------------------------------------------------------------------------- + +fn ranges_to_mask(total_rows: usize, ranges: &[RowRange]) -> Vec<bool> { + let mut mask = vec![false; total_rows]; + let file_end = total_rows as i64 - 1; + for r in ranges { + let from = r.from().max(0) as usize; + let to = (r.to().min(file_end) as usize).min(total_rows - 1); + for item in mask.iter_mut().take(to + 1).skip(from) { + *item = true; + } + } + mask +} + +// --------------------------------------------------------------------------- +// Avro records → Arrow RecordBatch conversion +// --------------------------------------------------------------------------- + +fn records_to_batch( + records: &[HashMap<String, AvroValue>], + fields: &[DataField], + schema: &SchemaRef, +) -> crate::Result<RecordBatch> { + let num_rows = records.len(); + let mut columns: Vec<Arc<dyn arrow_array::Array>> = Vec::with_capacity(fields.len()); + + for field in fields { + let col = build_column(records, field.name(), field.data_type(), num_rows)?; + columns.push(col); + } + + if columns.is_empty() { + RecordBatch::try_new_with_options( + schema.clone(), + columns, + &arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)), + ) + } else { + RecordBatch::try_new(schema.clone(), columns) + } + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build Avro RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +fn build_column( + records: &[HashMap<String, AvroValue>], + name: &str, + data_type: &DataType, + num_rows: usize, +) -> crate::Result<Arc<dyn arrow_array::Array>> { + Ok(match data_type { + DataType::Boolean(_) => { + let arr: BooleanArray = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_bool())) + .collect(); + Arc::new(arr) + } + DataType::TinyInt(_) => { + let arr: Int8Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name) + .and_then(|v| v.as_i64()) + .map(|v| v as i8) + }) + .collect(); + Arc::new(arr) + } + DataType::SmallInt(_) => { + let arr: Int16Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name) + .and_then(|v| v.as_i64()) + .map(|v| v as i16) + }) + .collect(); + Arc::new(arr) + } + DataType::Int(_) => { + let arr: Int32Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name) + .and_then(|v| v.as_i64()) + .map(|v| v as i32) + }) + .collect(); + Arc::new(arr) + } + DataType::BigInt(_) => { + let arr: Int64Array = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_i64())) + .collect(); + Arc::new(arr) + } + DataType::Float(_) => { + let arr: Float32Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name) + .and_then(|v| v.as_f64()) + .map(|v| v as f32) + }) + .collect(); + Arc::new(arr) + } + DataType::Double(_) => { + let arr: Float64Array = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_f64())) + .collect(); + Arc::new(arr) + } + DataType::Char(_) | DataType::VarChar(_) => { + let arr: StringArray = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_str())) + .collect(); + Arc::new(arr) + } + DataType::Binary(_) | DataType::VarBinary(_) => { + let values: Vec<Option<&[u8]>> = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_bytes())) + .collect(); + let arr: BinaryArray = values.into_iter().collect(); + Arc::new(arr) + } + DataType::Date(_) => { + let arr: Date32Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name) + .and_then(|v| v.as_i64()) + .map(|v| v as i32) + }) + .collect(); + Arc::new(arr) + } + DataType::Decimal(d) => { + let precision = u8::try_from(d.precision()).map_err(|_| Error::Unsupported { + message: "Decimal precision exceeds u8".to_string(), + })?; + let scale = i8::try_from(d.scale() as i32).map_err(|_| Error::Unsupported { + message: "Decimal scale out of i8 range".to_string(), + })?; + let arr: Decimal128Array = (0..num_rows) + .map(|i| { + get_field(&records[i], name).and_then(|v| match v { + // Avro decimal is encoded as big-endian two's complement bytes. + AvroValue::Bytes(b) => Some(bytes_to_i128_be(b)), + AvroValue::Int(n) => Some(*n as i128), + // serde_avro_fast may deserialize decimal as string representation. + AvroValue::String(s) => parse_decimal_string(s, scale), + _ => None, + }) + }) + .collect::<Decimal128Array>() + .with_precision_and_scale(precision, scale) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build Decimal128Array: {e}"), + source: Some(Box::new(e)), + })?; + Arc::new(arr) + } + DataType::Timestamp(t) => { + build_timestamp_column(records, name, num_rows, t.precision(), None) + } + DataType::LocalZonedTimestamp(t) => build_timestamp_column( + records, + name, + num_rows, + t.precision(), + Some(Arc::from("UTC")), + ), + DataType::Array(arr_type) => { + build_array_column(records, name, arr_type.element_type(), num_rows)? + } + DataType::Map(map_type) => build_map_column(records, name, map_type, num_rows)?, + DataType::Row(row_type) => build_row_column(records, name, row_type, num_rows)?, + other => { + return Err(Error::Unsupported { + message: format!("Avro reader does not support data type: {other:?}"), + }); + } + }) +} + +fn build_timestamp_column( + records: &[HashMap<String, AvroValue>], + name: &str, + num_rows: usize, + precision: u32, + tz: Option<Arc<str>>, +) -> Arc<dyn arrow_array::Array> { + let values: Vec<Option<i64>> = (0..num_rows) + .map(|i| get_field(&records[i], name).and_then(|v| v.as_i64())) + .collect(); + match precision { + 0..=3 => Arc::new(TimestampMillisecondArray::from(values).with_timezone_opt(tz)), + 4..=6 => Arc::new(TimestampMicrosecondArray::from(values).with_timezone_opt(tz)), + _ => Arc::new(TimestampNanosecondArray::from(values).with_timezone_opt(tz)), + } +} + +fn build_array_column( + records: &[HashMap<String, AvroValue>], + name: &str, + element_type: &DataType, + num_rows: usize, +) -> crate::Result<Arc<dyn arrow_array::Array>> { + let arrow_element_type = crate::arrow::paimon_type_to_arrow(element_type)?; + let arrow_element_field = + arrow_schema::Field::new("element", arrow_element_type, element_type.is_nullable()); + + let mut offsets = vec![0i32]; + let mut element_records: Vec<HashMap<String, AvroValue>> = Vec::new(); + + for record in records.iter().take(num_rows) { + match get_field(record, name) { + Some(AvroValue::Array(arr)) => { + for elem in arr { + let mut m = HashMap::new(); + m.insert("element".to_string(), elem.clone()); + element_records.push(m); + } + offsets.push(offsets.last().unwrap() + arr.len() as i32); + } + _ => { + offsets.push(*offsets.last().unwrap()); + } + } + } + + let element_col = build_column( + &element_records, + "element", + element_type, + element_records.len(), + )?; + + let offsets_buf = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let nulls = NullBuffer::new(BooleanBuffer::from( + (0..num_rows) + .map(|i| get_field(&records[i], name).is_some()) + .collect::<Vec<_>>(), + )); + + let list_arr = ListArray::try_new( + Arc::new(arrow_element_field), + offsets_buf, + element_col, + Some(nulls), + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build ListArray: {e}"), + source: Some(Box::new(e)), + })?; + Ok(Arc::new(list_arr)) +} + +fn build_map_column( + records: &[HashMap<String, AvroValue>], + name: &str, + map_type: &MapType, + num_rows: usize, +) -> crate::Result<Arc<dyn arrow_array::Array>> { + let arrow_key_type = crate::arrow::paimon_type_to_arrow(map_type.key_type())?; + let arrow_value_type = crate::arrow::paimon_type_to_arrow(map_type.value_type())?; + + let mut offsets = vec![0i32]; + let mut key_records: Vec<HashMap<String, AvroValue>> = Vec::new(); + let mut value_records: Vec<HashMap<String, AvroValue>> = Vec::new(); + + for record in records.iter().take(num_rows) { + match get_field(record, name) { + Some(AvroValue::Object(map)) => { + for (k, v) in map { + let mut km = HashMap::new(); + km.insert("key".to_string(), AvroValue::String(k.clone())); + key_records.push(km); + let mut vm = HashMap::new(); + vm.insert("value".to_string(), v.clone()); + value_records.push(vm); + } + offsets.push(offsets.last().unwrap() + map.len() as i32); + } + _ => { + offsets.push(*offsets.last().unwrap()); + } + } + } + + let key_col = build_column(&key_records, "key", map_type.key_type(), key_records.len())?; + let value_col = build_column( + &value_records, + "value", + map_type.value_type(), + value_records.len(), + )?; + + let struct_arr = StructArray::try_new( + vec![ + Arc::new(arrow_schema::Field::new("key", arrow_key_type, false)), + Arc::new(arrow_schema::Field::new( + "value", + arrow_value_type.clone(), + map_type.value_type().is_nullable(), + )), + ] + .into(), + vec![key_col, value_col], + None, + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build map StructArray: {e}"), + source: Some(Box::new(e)), + })?; + + let entries_field = arrow_schema::Field::new( + "entries", + arrow_schema::DataType::Struct(struct_arr.fields().clone()), + false, + ); + + let offsets_buf = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let nulls = NullBuffer::new(BooleanBuffer::from( + (0..num_rows) + .map(|i| get_field(&records[i], name).is_some()) + .collect::<Vec<_>>(), + )); + + let map_arr = MapArray::try_new( + Arc::new(entries_field), + offsets_buf, + struct_arr, + Some(nulls), + false, + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build MapArray: {e}"), + source: Some(Box::new(e)), + })?; + Ok(Arc::new(map_arr)) +} + +fn build_row_column( + records: &[HashMap<String, AvroValue>], + name: &str, + row_type: &RowType, + num_rows: usize, +) -> crate::Result<Arc<dyn arrow_array::Array>> { + let sub_records: Vec<HashMap<String, AvroValue>> = (0..num_rows) + .map(|i| match get_field(&records[i], name) { + Some(AvroValue::Object(obj)) => obj.clone(), + _ => HashMap::new(), + }) + .collect(); + + let mut child_columns: Vec<Arc<dyn arrow_array::Array>> = Vec::new(); + let mut arrow_fields: Vec<Arc<arrow_schema::Field>> = Vec::new(); + + for field in row_type.fields() { + let col = build_column(&sub_records, field.name(), field.data_type(), num_rows)?; + let arrow_type = crate::arrow::paimon_type_to_arrow(field.data_type())?; + arrow_fields.push(Arc::new(arrow_schema::Field::new( + field.name(), + arrow_type, + field.data_type().is_nullable(), + ))); + child_columns.push(col); + } + + let nulls = NullBuffer::new(BooleanBuffer::from( + (0..num_rows) + .map(|i| get_field(&records[i], name).is_some()) + .collect::<Vec<_>>(), + )); + + let struct_arr = StructArray::try_new(arrow_fields.into(), child_columns, Some(nulls)) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build StructArray: {e}"), + source: Some(Box::new(e)), + })?; + Ok(Arc::new(struct_arr)) +} + +/// Parse a decimal string (e.g. "999.99") into unscaled i128 with the given scale. +/// For example, "999.99" with scale=2 → 99999; "0.000000000000000001" with scale=18 → 1. +fn parse_decimal_string(s: &str, scale: i8) -> Option<i128> { + let negative = s.starts_with('-'); + let s = s.strip_prefix('-').unwrap_or(s); + let (integer_part, frac_part) = match s.find('.') { + Some(pos) => (&s[..pos], &s[pos + 1..]), + None => (s, ""), + }; + let frac_len = frac_part.len() as i8; + let combined = format!("{}{}", integer_part, frac_part); + let unscaled: i128 = combined.parse().ok()?; + // Adjust if the fractional digits differ from the target scale. + let result = if frac_len < scale { + unscaled * 10i128.pow((scale - frac_len) as u32) + } else if frac_len > scale { + unscaled / 10i128.pow((frac_len - scale) as u32) + } else { + unscaled + }; + Some(if negative { -result } else { result }) +} + +/// Decode big-endian two's complement bytes into i128 (Avro decimal encoding). +fn bytes_to_i128_be(bytes: &[u8]) -> i128 { + if bytes.is_empty() { + return 0; + } + // Sign-extend: if the high bit is set, fill with 0xFF, otherwise 0x00. + let sign_byte = if bytes[0] & 0x80 != 0 { 0xFF } else { 0x00 }; + let mut buf = [sign_byte; 16]; + let start = 16 - bytes.len(); + buf[start..].copy_from_slice(bytes); + i128::from_be_bytes(buf) +} + +/// Look up a field in an Avro record, unwrapping union encoding. +fn get_field<'a>(record: &'a HashMap<String, AvroValue>, name: &str) -> Option<&'a AvroValue> { + record.get(name).and_then(unwrap_avro_union) +} + +/// Unwrap Avro union encoding: `{"type": value}` → `value`, or `"null"` → `None`. +fn unwrap_avro_union(v: &AvroValue) -> Option<&AvroValue> { + match v { + AvroValue::Null => None, + AvroValue::Object(map) if map.len() == 1 => { Review Comment: This `Object` path is no longer safe after adding map/row support. `unwrap_avro_union()` still treats any single-entry object as a union wrapper, but a valid Avro map with one entry or a row with one field has the same shape here. That means values like `{"c": 30}` can be unwrapped to `30`, and the later map/row builders will read them back as an empty map or an all-null struct. We need to separate union wrappers from real object values instead of using `len() == 1` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
