alamb commented on code in PR #8123: URL: https://github.com/apache/arrow-rs/pull/8123#discussion_r2274319601
########## arrow-avro/src/writer/encoder.rs: ########## @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro Encoder for Arrow types. + +use arrow_array::cast::AsArray; +use arrow_array::types::{ + ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType, +}; +use arrow_array::OffsetSizeTrait; +use arrow_array::{Array, GenericBinaryArray, PrimitiveArray, RecordBatch}; +use arrow_buffer::NullBuffer; +use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit}; +use std::io::Write; + +/// Behavior knobs for the Avro encoder. +/// +/// When `impala_mode` is `true`, optional/nullable values are encoded +/// as Avro unions with **null second** (`[T, "null"]`). When `false` +/// (default), we use **null first** (`["null", T]`). +#[derive(Debug, Clone, Copy, Default)] +pub struct EncoderOptions { Review Comment: I think we found on other APIs in this crate that using a builder style was good for these option structs because it meant we could add new fields without breaking the API SO for example in this case (eventually) ```rust let options = EncoderOptions:new() .with_impala_mode(true); ``` ########## arrow-avro/src/writer/mod.rs: ########## @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro writer implementation for the `arrow-avro` crate. +//! +//! # Overview +//! +//! * Use **`AvroWriter`** (Object Container File) when you want a +//! self‑contained Avro file with header, schema JSON, optional compression, +//! blocks, and sync markers. +//! * Use **`AvroStreamWriter`** (raw binary stream) when you already know the +//! schema out‑of‑band (i.e., via a schema registry) and need a stream +//! of Avro‑encoded records with minimal framing. +//! + +/// Encodes `RecordBatch` into the Avro binary format. +pub mod encoder; +/// Logic for different Avro container file formats. +pub mod format; + +use crate::compression::CompressionCodec; +use crate::schema::AvroSchema; +use crate::writer::encoder::{encode_record_batch, write_long}; +use crate::writer::format::{AvroBinaryFormat, AvroFormat, AvroOcfFormat}; +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, Schema}; +use std::io::{self, Write}; +use std::sync::Arc; + +/// Builder to configure and create a `Writer`. +#[derive(Debug, Clone)] +pub struct WriterBuilder { + schema: Schema, + codec: Option<CompressionCodec>, +} + +impl WriterBuilder { + /// Create a new builder with default settings. + pub fn new(schema: Schema) -> Self { + Self { + schema, + codec: None, + } + } + + /// Change the compression codec. + pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self { + self.codec = codec; + self + } + + /// Create a new `Writer` with specified `AvroFormat` and builder options. + pub fn build<W, F>(self, writer: W) -> Writer<W, F> + where + W: Write, + F: AvroFormat, + { + Writer { + writer, + schema: Arc::from(self.schema), + format: F::default(), + compression: self.codec, + started: false, + } + } +} + +/// Generic Avro writer. +#[derive(Debug)] +pub struct Writer<W: Write, F: AvroFormat> { + writer: W, + schema: Arc<Schema>, + format: F, + compression: Option<CompressionCodec>, + started: bool, +} + +/// Alias for an Avro **Object Container File** writer. +pub type AvroWriter<W> = Writer<W, AvroOcfFormat>; +/// Alias for a raw Avro **binary stream** writer. +pub type AvroStreamWriter<W> = Writer<W, AvroBinaryFormat>; + +impl<W: Write> Writer<W, AvroOcfFormat> { + /// Convenience constructor – same as + pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + Ok(WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)) + } + + /// Change the compression codec after construction. + pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self { + self.compression = codec; + self + } + + /// Return a reference to the 16‑byte sync marker generated for this file. + pub fn sync_marker(&self) -> Option<&[u8; 16]> { + self.format.sync_marker() + } +} + +impl<W: Write> Writer<W, AvroBinaryFormat> { + /// Convenience constructor to create a new [`AvroStreamWriter`]. + pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + Ok(WriterBuilder::new(schema).build::<W, AvroBinaryFormat>(writer)) + } +} + +impl<W: Write, F: AvroFormat> Writer<W, F> { + /// Serialize one [`RecordBatch`] to the output. + pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + if !self.started { + self.format + .start_stream(&mut self.writer, &self.schema, self.compression)?; + self.started = true; + } + if batch.schema() != self.schema { + return Err(ArrowError::SchemaError( + "Schema of RecordBatch differs from Writer schema".to_string(), + )); + } + match self.format.sync_marker() { + Some(&sync) => self.write_ocf_block(batch, &sync), + None => self.write_stream(batch), + } + } + + /// A convenience method to write a slice of [`RecordBatch`]. + /// + /// This is equivalent to calling `write` for each batch in the slice. + pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> { + for b in batches { + self.write(b)?; + } + Ok(()) + } + + /// Flush remaining buffered data and (for OCF) ensure the header is present. + pub fn finish(&mut self) -> Result<(), ArrowError> { + if !self.started { + self.format + .start_stream(&mut self.writer, &self.schema, self.compression)?; + self.started = true; + } + self.writer + .flush() + .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e)) + } + + /// Consume the writer, returning the underlying output object. + pub fn into_inner(self) -> W { + self.writer + } + + fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> { + let mut buf = Vec::<u8>::with_capacity(1024); + encode_record_batch(batch, &mut buf)?; + let encoded = match self.compression { + Some(codec) => codec.compress(&buf)?, + None => buf, + }; + write_long(&mut self.writer, batch.num_rows() as i64)?; + write_long(&mut self.writer, encoded.len() as i64)?; + self.writer + .write_all(&encoded) + .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?; + self.writer + .write_all(sync) + .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?; + Ok(()) + } + + fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + encode_record_batch(batch, &mut self.writer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::reader::ReaderBuilder; + use crate::test_util::arrow_test_data; + use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use std::fs::{remove_file, File}; + use std::io::BufReader; + use std::sync::Arc; + + fn make_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Binary, false), + ]) + } + + fn make_batch() -> RecordBatch { + let ids = Int32Array::from(vec![1, 2, 3]); + let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]); + RecordBatch::try_new( + Arc::new(make_schema()), + vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef], + ) + .expect("failed to build test RecordBatch") + } + + fn contains_ascii(haystack: &[u8], needle: &[u8]) -> bool { + haystack.windows(needle.len()).any(|w| w == needle) + } + + fn unique_temp_path(prefix: &str) -> std::path::PathBuf { Review Comment: I think it would be better to use methods from the `tempfile` crate here, as is done elsewhere in this repo. For example: https://github.com/apache/arrow-rs/blob/7f0aae9b9d20c49c86bfdaf53f689ae43d0237ac/arrow-ipc/src/reader.rs#L1847-L1846 ########## arrow-avro/src/writer/encoder.rs: ########## @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro Encoder for Arrow types. + +use arrow_array::cast::AsArray; +use arrow_array::types::{ + ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType, +}; +use arrow_array::OffsetSizeTrait; +use arrow_array::{Array, GenericBinaryArray, PrimitiveArray, RecordBatch}; +use arrow_buffer::NullBuffer; +use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit}; +use std::io::Write; + +/// Behavior knobs for the Avro encoder. +/// +/// When `impala_mode` is `true`, optional/nullable values are encoded +/// as Avro unions with **null second** (`[T, "null"]`). When `false` +/// (default), we use **null first** (`["null", T]`). +#[derive(Debug, Clone, Copy, Default)] +pub struct EncoderOptions { + impala_mode: bool, // Will be fully implemented in a follow-up PR +} + +/// Encode a single Avro-`long` using ZigZag + variable length, buffered. +/// +/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding> +#[inline] +pub fn write_long<W: Write + ?Sized>(writer: &mut W, value: i64) -> Result<(), ArrowError> { + let mut zz = ((value << 1) ^ (value >> 63)) as u64; + // At most 10 bytes for 64-bit varint + let mut buf = [0u8; 10]; + let mut i = 0; + while (zz & !0x7F) != 0 { + buf[i] = ((zz & 0x7F) as u8) | 0x80; + i += 1; + zz >>= 7; + } + buf[i] = (zz & 0x7F) as u8; + i += 1; + writer + .write_all(&buf[..i]) + .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e)) +} + +#[inline] +fn write_int<W: Write + ?Sized>(writer: &mut W, value: i32) -> Result<(), ArrowError> { + write_long(writer, value as i64) +} + +#[inline] +fn write_len_prefixed<W: Write + ?Sized>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> { + write_long(writer, bytes.len() as i64)?; + writer + .write_all(bytes) + .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e)) +} + +#[inline] +fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) -> Result<(), ArrowError> { + writer + .write_all(&[if v { 1 } else { 0 }]) + .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e)) +} + +/// Write the union branch index for an optional field. +/// +/// Branch index is 0-based per Avro unions: +/// - Null-first (default): null => 0, value => 1 +/// - Null-second (Impala): value => 0, null => 1 +#[inline] +fn write_optional_branch<W: Write + ?Sized>( + writer: &mut W, + is_null: bool, + impala_mode: bool, +) -> Result<(), ArrowError> { + let branch = if impala_mode == is_null { 1 } else { 0 }; + write_int(writer, branch) +} + +/// Encode a `RecordBatch` in Avro binary format using **default options**. +pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> Result<(), ArrowError> { + encode_record_batch_with_options(batch, out, &EncoderOptions::default()) +} + +/// Encode a `RecordBatch` with explicit `EncoderOptions`. +pub fn encode_record_batch_with_options<W: Write>( + batch: &RecordBatch, + out: &mut W, + opts: &EncoderOptions, +) -> Result<(), ArrowError> { + let mut encoders = batch + .schema() + .fields() + .iter() + .zip(batch.columns()) + .map(|(field, array)| Ok((field.is_nullable(), make_encoder(array.as_ref())?))) + .collect::<Result<Vec<_>, ArrowError>>()?; + (0..batch.num_rows()).try_for_each(|row| { + encoders.iter_mut().try_for_each(|(is_nullable, enc)| { + if *is_nullable { + let is_null = enc.is_null(row); + write_optional_branch(out, is_null, opts.impala_mode)?; + if is_null { + return Ok(()); + } + } + enc.encode(row, out) + }) + }) +} + +/// Enum for static dispatch of concrete encoders. +enum Encoder<'a> { + Boolean(BooleanEncoder<'a>), + Int(IntEncoder<'a, Int32Type>), + Long(LongEncoder<'a, Int64Type>), + Timestamp(LongEncoder<'a, TimestampMicrosecondType>), + Float32(F32Encoder<'a>), + Float64(F64Encoder<'a>), + Binary(BinaryEncoder<'a, i32>), +} + +impl<'a> Encoder<'a> { + /// Encode the value at `idx`. + #[inline] + fn encode(&mut self, idx: usize, out: &mut dyn Write) -> Result<(), ArrowError> { + match self { + Encoder::Boolean(e) => e.encode(idx, out), + Encoder::Int(e) => e.encode(idx, out), + Encoder::Long(e) => e.encode(idx, out), + Encoder::Timestamp(e) => e.encode(idx, out), + Encoder::Float32(e) => e.encode(idx, out), + Encoder::Float64(e) => e.encode(idx, out), + Encoder::Binary(e) => e.encode(idx, out), + } + } +} + +/// An encoder + a null buffer for nullable fields. +pub struct NullableEncoder<'a> { + encoder: Encoder<'a>, + nulls: Option<NullBuffer>, +} + +impl<'a> NullableEncoder<'a> { + /// Create a new nullable encoder, wrapping a non-null encoder and a null buffer. + #[inline] + fn new(encoder: Encoder<'a>, nulls: Option<NullBuffer>) -> Self { + Self { encoder, nulls } + } + + /// Encode the value at `idx`, assuming it's not-null. + #[inline] + fn encode(&mut self, idx: usize, out: &mut dyn Write) -> Result<(), ArrowError> { + self.encoder.encode(idx, out) + } + + /// Check if the value at `idx` is null. + #[inline] + fn is_null(&self, idx: usize) -> bool { + self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx)) + } +} + +/// Creates an Avro encoder for the given `array`. +pub fn make_encoder<'a>(array: &'a dyn Array) -> Result<NullableEncoder<'a>, ArrowError> { + let nulls = array.nulls().cloned(); + let enc = match array.data_type() { + DataType::Boolean => { + let arr = array.as_boolean(); + NullableEncoder::new(Encoder::Boolean(BooleanEncoder(arr)), nulls) + } + DataType::Int32 => { + let arr = array.as_primitive::<Int32Type>(); + NullableEncoder::new(Encoder::Int(IntEncoder(arr)), nulls) + } + DataType::Int64 => { + let arr = array.as_primitive::<Int64Type>(); + NullableEncoder::new(Encoder::Long(LongEncoder(arr)), nulls) + } + DataType::Float32 => { + let arr = array.as_primitive::<Float32Type>(); + NullableEncoder::new(Encoder::Float32(F32Encoder(arr)), nulls) + } + DataType::Float64 => { + let arr = array.as_primitive::<Float64Type>(); + NullableEncoder::new(Encoder::Float64(F64Encoder(arr)), nulls) + } + DataType::Binary => { + let arr = array.as_binary::<i32>(); + NullableEncoder::new(Encoder::Binary(BinaryEncoder(arr)), nulls) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let arr = array.as_primitive::<TimestampMicrosecondType>(); + NullableEncoder::new(Encoder::Timestamp(LongEncoder(arr)), nulls) + } + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Unsupported data type for Avro encoding in slim build: {other:?}" + ))) + } + }; + Ok(enc) +} + +struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray); +impl BooleanEncoder<'_> { + #[inline] + fn encode(&mut self, idx: usize, out: &mut dyn Write) -> Result<(), ArrowError> { Review Comment: Is there any reason this takes a `dyn Write` rather than a `Write`? If you template it I think you might end up with more efficient code (the rust compiler can inline the function calls) ########## arrow-avro/src/writer/mod.rs: ########## @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro writer implementation for the `arrow-avro` crate. +//! +//! # Overview +//! +//! * Use **`AvroWriter`** (Object Container File) when you want a +//! self‑contained Avro file with header, schema JSON, optional compression, +//! blocks, and sync markers. +//! * Use **`AvroStreamWriter`** (raw binary stream) when you already know the +//! schema out‑of‑band (i.e., via a schema registry) and need a stream +//! of Avro‑encoded records with minimal framing. +//! + +/// Encodes `RecordBatch` into the Avro binary format. +pub mod encoder; +/// Logic for different Avro container file formats. +pub mod format; + +use crate::compression::CompressionCodec; +use crate::schema::AvroSchema; +use crate::writer::encoder::{encode_record_batch, write_long}; +use crate::writer::format::{AvroBinaryFormat, AvroFormat, AvroOcfFormat}; +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, Schema}; +use std::io::{self, Write}; +use std::sync::Arc; + +/// Builder to configure and create a `Writer`. +#[derive(Debug, Clone)] +pub struct WriterBuilder { + schema: Schema, + codec: Option<CompressionCodec>, +} + +impl WriterBuilder { + /// Create a new builder with default settings. + pub fn new(schema: Schema) -> Self { + Self { + schema, + codec: None, + } + } + + /// Change the compression codec. + pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self { + self.codec = codec; + self + } + + /// Create a new `Writer` with specified `AvroFormat` and builder options. + pub fn build<W, F>(self, writer: W) -> Writer<W, F> + where + W: Write, + F: AvroFormat, + { + Writer { + writer, + schema: Arc::from(self.schema), + format: F::default(), + compression: self.codec, + started: false, + } + } +} + +/// Generic Avro writer. +#[derive(Debug)] +pub struct Writer<W: Write, F: AvroFormat> { + writer: W, + schema: Arc<Schema>, + format: F, + compression: Option<CompressionCodec>, + started: bool, +} + +/// Alias for an Avro **Object Container File** writer. +pub type AvroWriter<W> = Writer<W, AvroOcfFormat>; +/// Alias for a raw Avro **binary stream** writer. +pub type AvroStreamWriter<W> = Writer<W, AvroBinaryFormat>; + +impl<W: Write> Writer<W, AvroOcfFormat> { + /// Convenience constructor – same as + pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + Ok(WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)) + } + + /// Change the compression codec after construction. + pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self { + self.compression = codec; + self + } + + /// Return a reference to the 16‑byte sync marker generated for this file. + pub fn sync_marker(&self) -> Option<&[u8; 16]> { + self.format.sync_marker() + } +} + +impl<W: Write> Writer<W, AvroBinaryFormat> { + /// Convenience constructor to create a new [`AvroStreamWriter`]. + pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + Ok(WriterBuilder::new(schema).build::<W, AvroBinaryFormat>(writer)) + } +} + +impl<W: Write, F: AvroFormat> Writer<W, F> { + /// Serialize one [`RecordBatch`] to the output. + pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + if !self.started { + self.format + .start_stream(&mut self.writer, &self.schema, self.compression)?; + self.started = true; + } + if batch.schema() != self.schema { + return Err(ArrowError::SchemaError( + "Schema of RecordBatch differs from Writer schema".to_string(), + )); + } + match self.format.sync_marker() { + Some(&sync) => self.write_ocf_block(batch, &sync), + None => self.write_stream(batch), + } + } + + /// A convenience method to write a slice of [`RecordBatch`]. + /// + /// This is equivalent to calling `write` for each batch in the slice. + pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> { + for b in batches { + self.write(b)?; + } + Ok(()) + } + + /// Flush remaining buffered data and (for OCF) ensure the header is present. + pub fn finish(&mut self) -> Result<(), ArrowError> { + if !self.started { + self.format + .start_stream(&mut self.writer, &self.schema, self.compression)?; + self.started = true; + } + self.writer + .flush() + .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e)) + } + + /// Consume the writer, returning the underlying output object. + pub fn into_inner(self) -> W { + self.writer + } + + fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> { + let mut buf = Vec::<u8>::with_capacity(1024); + encode_record_batch(batch, &mut buf)?; + let encoded = match self.compression { + Some(codec) => codec.compress(&buf)?, + None => buf, + }; + write_long(&mut self.writer, batch.num_rows() as i64)?; + write_long(&mut self.writer, encoded.len() as i64)?; + self.writer + .write_all(&encoded) + .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?; + self.writer + .write_all(sync) + .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?; + Ok(()) + } + + fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + encode_record_batch(batch, &mut self.writer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::reader::ReaderBuilder; + use crate::test_util::arrow_test_data; + use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use std::fs::{remove_file, File}; + use std::io::BufReader; + use std::sync::Arc; + + fn make_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Binary, false), + ]) + } + + fn make_batch() -> RecordBatch { + let ids = Int32Array::from(vec![1, 2, 3]); + let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]); + RecordBatch::try_new( + Arc::new(make_schema()), + vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef], + ) + .expect("failed to build test RecordBatch") + } + + fn contains_ascii(haystack: &[u8], needle: &[u8]) -> bool { + haystack.windows(needle.len()).any(|w| w == needle) + } + + fn unique_temp_path(prefix: &str) -> std::path::PathBuf { + let mut p = std::env::temp_dir(); + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + p.push(format!("{}_{}_{}.avro", prefix, std::process::id(), nanos)); + p + } + + #[test] + fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> { + let batch = make_batch(); + let buffer: Vec<u8> = Vec::new(); + let mut writer = AvroWriter::new(buffer, make_schema())?; + writer.write(&batch)?; + writer.finish()?; + let out = writer.into_inner(); + assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect"); + let sync = AvroWriter::new(Vec::new(), make_schema())? + .sync_marker() + .cloned(); + let trailer = &out[out.len() - 16..]; + assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker"); + let _ = sync; + Ok(()) + } + + #[test] + fn test_schema_mismatch_yields_error() { + let batch = make_batch(); + let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let buffer = Vec::<u8>::new(); + let mut writer = AvroWriter::new(buffer, alt_schema).unwrap(); + let err = writer.write(&batch).unwrap_err(); + assert!(matches!(err, ArrowError::SchemaError(_))); + } + + #[test] + fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> { + let batch1 = make_batch(); + let batch2 = make_batch(); + let buffer = Vec::<u8>::new(); + let mut writer = AvroWriter::new(buffer, make_schema())?; + writer.write_batches(&[&batch1, &batch2])?; + writer.finish()?; + let out = writer.into_inner(); + assert!(out.len() > 4, "combined batches produced tiny file"); + Ok(()) + } + + #[test] + fn test_finish_without_write_adds_header() -> Result<(), ArrowError> { + let buffer = Vec::<u8>::new(); + let mut writer = AvroWriter::new(buffer, make_schema())?; + writer.finish()?; + let out = writer.into_inner(); + assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header"); + Ok(()) + } + + #[test] + fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> { + let mut buf = Vec::new(); + write_long(&mut buf, 0)?; + write_long(&mut buf, -1)?; + write_long(&mut buf, 1)?; + write_long(&mut buf, -2)?; + write_long(&mut buf, 2147483647)?; + assert!( + buf.starts_with(&[0x00, 0x01, 0x02, 0x03]), + "zig‑zag varint encodings incorrect: {buf:?}" + ); + Ok(()) + } + + #[test] + fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> { + let files = [ + "avro/alltypes_plain.avro", + "avro/alltypes_plain.snappy.avro", + "avro/alltypes_plain.zstandard.avro", + "avro/alltypes_plain.bzip2.avro", + "avro/alltypes_plain.xz.avro", + ]; + for rel in files { + let path = arrow_test_data(rel); + let rdr_file = File::open(&path).expect("open input avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build reader"); + let schema = reader.schema(); + let input_batches = reader.collect::<Result<Vec<_>, _>>()?; + let original = + arrow::compute::concat_batches(&schema, &input_batches).expect("concat input"); + let out_path = unique_temp_path("arrow_avro_roundtrip"); + let out_file = File::create(&out_path).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; + if rel.contains(".snappy.") { + writer = writer.with_compression(Some(CompressionCodec::Snappy)); + } else if rel.contains(".zstandard.") { + writer = writer.with_compression(Some(CompressionCodec::ZStandard)); + } else if rel.contains(".bzip2.") { + writer = writer.with_compression(Some(CompressionCodec::Bzip2)); + } else if rel.contains(".xz.") { + writer = writer.with_compression(Some(CompressionCodec::Xz)); + } + writer.write(&original)?; + writer.finish()?; + drop(writer); + let rt_file = File::open(&out_path).expect("open roundtrip avro"); + let mut rt_reader = ReaderBuilder::new() + .build(BufReader::new(rt_file)) + .expect("build roundtrip reader"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); + assert_eq!( Review Comment: this is a great test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org