This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new fab8e75eff Add BinaryFormatSupport and Row Encoder to `arrow-avro`
Writer (#9171)
fab8e75eff is described below
commit fab8e75eff6d1dd71708b71e1a2a85275394fa80
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Jan 26 19:20:24 2026 -0600
Add BinaryFormatSupport and Row Encoder to `arrow-avro` Writer (#9171)
# Which issue does this PR close?
- Closes #8701.
# Rationale for this change
`arrow-avro` already supports writing Avro Object Container Files (OCF)
and framed streaming encodings (e.g. Single-Object Encoding / registry
wire formats). However, many systems exchange **raw Avro binary datum
payloads** (i.e. *only* the Avro record body bytes) while supplying the
schema out-of-band (configuration, RPC contract, topic metadata, etc.).
Without first-class support for unframed datum output, users must
either:
- accept framing overhead that downstream systems don’t expect, or
- re-implement datum encoding themselves.
This PR adds the missing unframed write path and exposes a row-by-row
encoding API to make it easy to embed Avro datums into other transport
protocols.
# What changes are included in this PR?
- Added `AvroBinaryFormat` (unframed) as an `AvroFormat` implementation
to emit **raw Avro record body bytes** (no SOE prefix and no OCF header)
and to explicitly reject container-level compression for this format.
- Added `RecordEncoder::encode_rows` to encode a `RecordBatch` into a
single contiguous buffer while tracking per-row boundaries via appended
offsets.
- Introduced a higher-level `Encoder` + `EncodedRows` API for row-by-row
streaming use cases, providing zero-copy access to individual row slices
(via `Bytes`).
- Updated the writer API to provide `build_encoder` for stream formats
(e.g. SOE) and added row-capacity configuration to better support
incremental/streaming workflows.
- Added the `bytes` crate as a dependency to support efficient buffering
and slicing in the row encoder, and adjusted dev-dependencies to support
the new tests/docs.
# Are these changes tested?
Yes.
This PR adds unit tests that cover:
- single- and multi-column row encoding
- nullable columns
- prefix-based vs. unprefixed row encoding behavior
- empty batch encoding
- appending to existing output buffers and validating offset invariants
# Are there any user-facing changes?
Yes, these changes are additive (no breaking public API changes
expected).
- New writer format support for **unframed Avro binary datum** output
(`AvroBinaryFormat`).
- New row-by-row encoding APIs (`RecordEncoder::encode_rows`, `Encoder`,
`EncodedRows`) to support zero-copy access to per-row encoded bytes.
- New `WriterBuilder` functionality (`build_encoder` + row-capacity
configuration) to enable encoder construction without committing to a
specific `Write` sink.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-avro/Cargo.toml | 8 +-
arrow-avro/benches/encoder.rs | 87 ++++
arrow-avro/src/writer/encoder.rs | 396 ++++++++++++++-
arrow-avro/src/writer/format.rs | 78 +++
arrow-avro/src/writer/mod.rs | 1042 ++++++++++++++++++++++++++++++++++----
5 files changed, 1485 insertions(+), 126 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 48cea8467e..a4699a2efa 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -66,6 +66,7 @@ indexmap = "2.10"
rand = "0.9"
md5 = { version = "0.8", optional = true }
sha2 = { version = "0.10", optional = true }
+bytes = "1.11"
[dev-dependencies]
arrow-data = { workspace = true }
@@ -76,9 +77,8 @@ rand = { version = "0.9.1", default-features = false,
features = [
] }
criterion = { workspace = true, default-features = false }
tempfile = "3.3"
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["prettyprint"] }
futures = "0.3.31"
-bytes = "1.10.1"
async-stream = "0.3.6"
apache-avro = "0.21.0"
num-bigint = "0.4"
@@ -95,3 +95,7 @@ harness = false
[[bench]]
name = "avro_writer"
harness = false
+
+[[bench]]
+name = "encoder"
+harness = false
diff --git a/arrow-avro/benches/encoder.rs b/arrow-avro/benches/encoder.rs
new file mode 100644
index 0000000000..2e0a7d1a3e
--- /dev/null
+++ b/arrow-avro/benches/encoder.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for the `arrow-avro` Encoder
+
+use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+use arrow_avro::writer::format::AvroSoeFormat;
+use arrow_avro::writer::{EncodedRows, WriterBuilder};
+use arrow_schema::{DataType, Field, Schema};
+use criterion::{BenchmarkId, Criterion, Throughput, criterion_group,
criterion_main};
+use once_cell::sync::Lazy;
+use std::hint::black_box;
+use std::sync::Arc;
+use std::time::Duration;
+
+const SIZES: [usize; 4] = [1_000, 10_000, 100_000, 1_000_000];
+
+/// Pre-generate EncodedRows for each size to avoid setup overhead in
benchmarks.
+static ENCODED_DATA: Lazy<Vec<EncodedRows>> =
+ Lazy::new(|| SIZES.iter().map(|&n| make_encoded_rows(n)).collect());
+
+/// Create an EncodedRows with `n` rows of Int32 data.
+fn make_encoded_rows(n: usize) -> EncodedRows {
+ let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
+ let values: Vec<i32> = (0..n as i32).collect();
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(Int32Array::from(values)) as ArrayRef],
+ )
+ .unwrap();
+ let mut encoder = WriterBuilder::new(schema)
+ .build_encoder::<AvroSoeFormat>()
+ .unwrap();
+ encoder.encode(&batch).unwrap();
+ encoder.flush()
+}
+
+fn bench_row_access(c: &mut Criterion) {
+ let mut group = c.benchmark_group("row_access");
+ for (idx, &size) in SIZES.iter().enumerate() {
+ let encoded = &ENCODED_DATA[idx];
+ let num_rows = encoded.len();
+ // Configure sampling based on data size
+ match size {
+ 100_000 | 1_000_000 => {
+ group
+ .sample_size(20)
+ .measurement_time(Duration::from_secs(10))
+ .warm_up_time(Duration::from_secs(3));
+ }
+ _ => {
+ group.sample_size(100);
+ }
+ }
+ group.throughput(Throughput::Elements(num_rows as u64));
+ group.bench_function(BenchmarkId::from_parameter(size), |b| {
+ b.iter(|| {
+ for i in 0..num_rows {
+ black_box(encoded.row(i).unwrap());
+ }
+ })
+ });
+ }
+ group.finish();
+}
+
+criterion_group! {
+ name = encoder;
+ config = Criterion::default().configure_from_args();
+ targets = bench_row_access
+}
+
+criterion_main!(encoder);
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 651d998fea..94f6d88447 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -42,10 +42,30 @@ use arrow_array::{
use arrow_array::{Decimal32Array, Decimal64Array};
use arrow_buffer::{ArrowNativeType, NullBuffer};
use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema,
TimeUnit, UnionMode};
+use bytes::{BufMut, BytesMut};
use std::io::Write;
use std::sync::Arc;
use uuid::Uuid;
+macro_rules! for_rows_with_prefix {
+ ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
+ match $prefix {
+ Some(prefix) => {
+ for $row in 0..$n {
+ $out.write_all(prefix)
+ .map_err(|e| AvroError::IoError(format!("write prefix:
{e}"), e))?;
+ $body
+ }
+ }
+ None => {
+ for $row in 0..$n {
+ $body
+ }
+ }
+ }
+ }};
+}
+
/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
///
/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
@@ -608,17 +628,13 @@ impl<'a> FieldEncoder<'a> {
// Compute the effective null state from writer-declared nullability
and data nulls.
let null_state = match nullability {
None => NullState::NonNullable,
- Some(null_order) => {
- match array.nulls() {
- Some(nulls) if array.null_count() > 0 => {
- NullState::Nullable { nulls, null_order }
- }
- _ => NullState::NullableNoNulls {
- // Nullable site with no null buffer for this view
- union_value_byte: union_value_branch_byte(null_order,
false),
- },
- }
- }
+ Some(null_order) => match array.nulls() {
+ Some(nulls) if array.null_count() > 0 => NullState::Nullable {
nulls, null_order },
+ _ => NullState::NullableNoNulls {
+ // Nullable site with no null buffer for this view
+ union_value_byte: union_value_branch_byte(null_order,
false),
+ },
+ },
};
Ok(Self {
encoder,
@@ -797,24 +813,86 @@ impl RecordEncoder {
) -> Result<(), AvroError> {
let mut column_encoders = self.prepare_for_batch(batch)?;
let n = batch.num_rows();
- match self.prefix {
- Some(prefix) => {
- for row in 0..n {
- out.write_all(prefix.as_slice())?;
- for enc in column_encoders.iter_mut() {
- enc.encode(out, row)?;
- }
- }
+ let prefix = self.prefix.as_ref().map(|p| p.as_slice());
+ for_rows_with_prefix!(n, prefix, out, |row| {
+ for enc in column_encoders.iter_mut() {
+ enc.encode(out, row)?;
}
- None => {
- for row in 0..n {
+ });
+ Ok(())
+ }
+
+ /// Encode rows into a single contiguous `BytesMut` and append row-end
offsets.
+ ///
+ /// # Invariants
+ ///
+ /// * `offsets` must be non-empty and seeded with `0` at index 0.
+ /// * `offsets.last()` must equal `out.len()` on entry.
+ /// * On success, exactly `batch.num_rows()` additional offsets are
pushed, and
+ /// `offsets.last()` equals the new `out.len()`.
+ pub(crate) fn encode_rows(
+ &self,
+ batch: &RecordBatch,
+ row_capacity: usize,
+ out: &mut BytesMut,
+ offsets: &mut Vec<usize>,
+ ) -> Result<(), AvroError> {
+ let out_len = out.len();
+ if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
+ return Err(AvroError::General(
+ "encode_rows requires offsets to start with 0 and end at
out.len()".to_string(),
+ ));
+ }
+ let n = batch.num_rows();
+ if n == 0 {
+ return Ok(());
+ }
+ if offsets.len().checked_add(n).is_none() {
+ return Err(AvroError::General(
+ "encode_rows cannot append offsets: too many rows".to_string(),
+ ));
+ }
+ let mut column_encoders = self.prepare_for_batch(batch)?;
+ offsets.reserve(n);
+ let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
+ let prefix_len = prefix_bytes.map_or(0, |p| p.len());
+ let per_row_hint = row_capacity.max(prefix_len);
+ if let Some(additional) = n
+ .checked_mul(per_row_hint)
+ .filter(|&a| out_len.checked_add(a).is_some())
+ {
+ out.reserve(additional);
+ }
+ let start_out_len = out.len();
+ let start_offsets_len = offsets.len();
+ let res = (|| -> Result<(), AvroError> {
+ let mut w = out.writer();
+ if let [enc0] = column_encoders.as_mut_slice() {
+ for_rows_with_prefix!(n, prefix_bytes, w, |row| {
+ enc0.encode(&mut w, row)?;
+ offsets.push(w.get_ref().len());
+ });
+ } else {
+ for_rows_with_prefix!(n, prefix_bytes, w, |row| {
for enc in column_encoders.iter_mut() {
- enc.encode(out, row)?;
+ enc.encode(&mut w, row)?;
}
- }
+ offsets.push(w.get_ref().len());
+ });
}
+ Ok(())
+ })();
+ if res.is_err() {
+ out.truncate(start_out_len);
+ offsets.truncate(start_offsets_len);
+ } else {
+ debug_assert_eq!(
+ *offsets.last().unwrap(),
+ out.len(),
+ "encode_rows: offsets/out length mismatch after successful
encode"
+ );
}
- Ok(())
+ res
}
}
@@ -1958,6 +2036,12 @@ mod tests {
}
}
+ fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8]
{
+ let start = offsets[row];
+ let end = offsets[row + 1];
+ &buf[start..end]
+ }
+
#[test]
fn binary_encoder() {
let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
@@ -3026,4 +3110,268 @@ mod tests {
other => panic!("expected NullableNoNulls, got {other:?}"),
}
}
+
+ #[test]
+ fn encode_rows_single_column_int32() {
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ let arr = Int32Array::from(vec![1, 2, 3]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ }],
+ prefix: None,
+ };
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 16, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 4);
+ assert_eq!(*offsets.last().unwrap(), out.len());
+ assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
+ assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
+ assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
+ }
+
+ #[test]
+ fn encode_rows_multiple_columns() {
+ let schema = ArrowSchema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, false),
+ ]);
+ let int_arr = Int32Array::from(vec![10, 20]);
+ let str_arr = StringArray::from(vec!["hello", "world"]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(int_arr), Arc::new(str_arr)],
+ )
+ .unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![
+ FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ FieldBinding {
+ arrow_index: 1,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ ],
+ prefix: None,
+ };
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 32, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 3);
+ assert_eq!(*offsets.last().unwrap(), out.len());
+ let mut expected_row0 = Vec::new();
+ expected_row0.extend(avro_long_bytes(10));
+ expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
+ assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
+ let mut expected_row1 = Vec::new();
+ expected_row1.extend(avro_long_bytes(20));
+ expected_row1.extend(avro_len_prefixed_bytes(b"world"));
+ assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
+ }
+
+ #[test]
+ fn encode_rows_with_prefix() {
+ use crate::codec::AvroFieldBuilder;
+ use crate::schema::AvroSchema;
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ let arr = Int32Array::from(vec![42]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let avro_schema = AvroSchema::try_from(&schema).unwrap();
+ let fingerprint = avro_schema
+ .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
+ .unwrap();
+ let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
+ .build()
+ .unwrap();
+ let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
+ .with_fingerprint(Some(fingerprint))
+ .build()
+ .unwrap();
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 32, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 2);
+ let row0 = row_slice(&out, &offsets, 0);
+ assert!(row0.len() > 10, "Row should contain prefix + encoded value");
+ assert_eq!(row0[0], 0xC3);
+ assert_eq!(row0[1], 0x01);
+ }
+
+ #[test]
+ fn encode_rows_empty_batch() {
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ let arr = Int32Array::from(Vec::<i32>::new());
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ }],
+ prefix: None,
+ };
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 16, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets, vec![0]);
+ assert!(out.is_empty());
+ }
+
+ #[test]
+ fn encode_rows_matches_encode_output() {
+ let schema = ArrowSchema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Float64, false),
+ ]);
+ let int_arr = Int64Array::from(vec![100i64, 200, 300]);
+ let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(int_arr), Arc::new(float_arr)],
+ )
+ .unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![
+ FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ FieldBinding {
+ arrow_index: 1,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ ],
+ prefix: None,
+ };
+ let mut stream_buf = Vec::new();
+ encoder.encode(&mut stream_buf, &batch).unwrap();
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 32, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 1 + batch.num_rows());
+ assert_bytes_eq(&out[..], &stream_buf);
+ }
+
+ #[test]
+ fn encode_rows_appends_to_existing_buffer() {
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ let arr = Int32Array::from(vec![5, 6]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ }],
+ prefix: None,
+ };
+ let mut out = BytesMut::new();
+ out.extend_from_slice(&[0xAA, 0xBB]);
+ let mut offsets: Vec<usize> = vec![0, out.len()];
+ encoder
+ .encode_rows(&batch, 16, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 4);
+ assert_eq!(*offsets.last().unwrap(), out.len());
+ assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
+ assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
+ assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
+ }
+
+ #[test]
+ fn encode_rows_nullable_column() {
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
true)]);
+ let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let encoder = RecordEncoder {
+ columns: vec![FieldBinding {
+ arrow_index: 0,
+ nullability: Some(Nullability::NullFirst),
+ plan: FieldPlan::Scalar,
+ }],
+ prefix: None,
+ };
+ let mut out = BytesMut::new();
+ let mut offsets: Vec<usize> = vec![0];
+ encoder
+ .encode_rows(&batch, 16, &mut out, &mut offsets)
+ .unwrap();
+ assert_eq!(offsets.len(), 4);
+ let mut expected_row0 = Vec::new();
+ expected_row0.extend(avro_long_bytes(1)); // union branch for value
+ expected_row0.extend(avro_long_bytes(1)); // value
+ assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
+ let expected_row1 = avro_long_bytes(0); // union branch for null
+ assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
+ let mut expected_row2 = Vec::new();
+ expected_row2.extend(avro_long_bytes(1)); // union branch for value
+ expected_row2.extend(avro_long_bytes(3)); // value
+ assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
+ }
+
+ #[test]
+ fn encode_prefix_write_error() {
+ use crate::codec::AvroFieldBuilder;
+ use crate::schema::{AvroSchema, FingerprintAlgorithm};
+ use std::io;
+
+ struct FailWriter {
+ failed: bool,
+ }
+
+ impl io::Write for FailWriter {
+ fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
+ if !self.failed {
+ self.failed = true;
+ Err(io::Error::other("fail write"))
+ } else {
+ Ok(0)
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+ }
+
+ let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ let arr = Int32Array::from(vec![42]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(arr)]).unwrap();
+ let avro_schema = AvroSchema::try_from(&schema).unwrap();
+ let fingerprint = avro_schema
+ .fingerprint(FingerprintAlgorithm::Rabin)
+ .unwrap();
+ let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
+ .build()
+ .unwrap();
+ let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
+ .with_fingerprint(Some(fingerprint))
+ .build()
+ .unwrap();
+
+ let mut writer = FailWriter { failed: false };
+ let err = encoder.encode(&mut writer, &batch).unwrap_err();
+ let msg = format!("{err}");
+ assert!(msg.contains("write prefix"), "unexpected error: {msg}");
+ }
}
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index e8e93a62c7..c969651f16 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -133,6 +133,40 @@ impl AvroFormat for AvroSoeFormat {
}
}
+/// Unframed Avro binary streaming format ("raw Avro record body bytes (no
prefix, no OCF header)").
+///
+/// Each record written by the stream writer contains only the raw Avro
+/// record body bytes (i.e., the Avro binary encoding of the datum) with **no**
+/// per-record prefix and **no** Object Container File (OCF) header.
+///
+/// This format is useful when another transport provides framing (for example,
+/// length-delimited buffers) or when embedding Avro record payloads inside a
+/// larger envelope.
+#[derive(Debug, Default)]
+pub struct AvroBinaryFormat;
+
+impl AvroFormat for AvroBinaryFormat {
+ const NEEDS_PREFIX: bool = false;
+
+ fn start_stream<W: Write>(
+ &mut self,
+ _writer: &mut W,
+ _schema: &Schema,
+ compression: Option<CompressionCodec>,
+ ) -> Result<(), AvroError> {
+ if compression.is_some() {
+ return Err(AvroError::InvalidArgument(
+ "Compression not supported for Avro binary
streaming".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn sync_marker(&self) -> Option<&[u8; 16]> {
+ None
+ }
+}
+
#[inline]
fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
write_bytes(writer, s.as_bytes())
@@ -144,3 +178,47 @@ fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) ->
Result<(), AvroError>
writer.write_all(bytes)?;
Ok(())
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_schema::{DataType, Field, Schema};
+
+ fn test_schema() -> Schema {
+ Schema::new(vec![Field::new("x", DataType::Int32, false)])
+ }
+
+ #[test]
+ fn avro_binary_format_rejects_compression() {
+ let mut format = AvroBinaryFormat;
+ let schema = test_schema();
+ let err = format
+ .start_stream(
+ &mut Vec::<u8>::new(),
+ &schema,
+ Some(CompressionCodec::Snappy),
+ )
+ .unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Compression not supported for Avro binary
streaming")
+ );
+ }
+
+ #[test]
+ fn avro_soe_format_rejects_compression() {
+ let mut format = AvroSoeFormat::default();
+ let schema = test_schema();
+ let err = format
+ .start_stream(
+ &mut Vec::<u8>::new(),
+ &schema,
+ Some(CompressionCodec::Snappy),
+ )
+ .unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Compression not supported for Avro SOE streaming")
+ );
+ }
+}
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index f4a2e60ed5..2a97f0545d 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -19,58 +19,141 @@
//!
//! # Overview
//!
-//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two
output
-//! formats are supported:
+//! Use this module to serialize Arrow [`arrow_array::RecordBatch`] values
into Avro. Three output
+//! modes are supported:
//!
-//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object
Container File (OCF)**: a self‑describing
-//! file with header (schema JSON + metadata), optional compression, data
blocks, and
-//! sync markers. See Avro 1.11.1 “Object Container Files.”
+//! * **[`crate::writer::AvroWriter`]** — writes an **Object Container File
(OCF)**: a self‑describing
+//! file with header (schema JSON and metadata), optional compression, data
blocks, and
+//! sync markers. See Avro 1.11.1 "Object Container Files."
//!
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
-//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a
**Single Object Encoding (SOE) Stream** (“datum” bytes) without
+//!
+//! * **[`crate::writer::AvroStreamWriter`]** — writes a **Single Object
Encoding (SOE) Stream** without
//! any container framing. This is useful when the schema is known
out‑of‑band (i.e.,
//! via a registry) and you want minimal overhead.
//!
-//! ## Which format should you use?
+//! * **[`crate::writer::Encoder`]** — a row-by-row encoder that buffers
encoded records into a single
+//! contiguous byte buffer and returns per-row [`bytes::Bytes`] slices.
+//! Ideal for publishing individual messages to Kafka, Pulsar, or other
message queues
+//! where each message must be a self-contained Avro payload.
+//!
+//! ## Which writer should you use?
+//!
+//! | Use Case | Recommended Type |
+//! |----------|------------------|
+//! | Write an OCF file to disk | [`crate::writer::AvroWriter`] |
+//! | Stream records continuously to a file/socket |
[`crate::writer::AvroStreamWriter`] |
+//! | Publish individual records to Kafka/Pulsar | [`crate::writer::Encoder`] |
+//! | Need per-row byte slices for custom framing | [`crate::writer::Encoder`]
|
+//!
+//! ## Per-Record Prefix Formats
+//!
+//! For [`crate::writer::AvroStreamWriter`] and [`crate::writer::Encoder`],
each record is automatically prefixed
+//! based on the fingerprint strategy:
//!
-//! * Use **OCF** when you need a portable, self‑contained file. The schema
travels with
-//! the data, making it easy to read elsewhere.
-//! * Use the **SOE stream** when your surrounding protocol supplies schema
information
-//! (i.e., a schema registry). The writer automatically adds the per‑record
prefix:
-//! - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`)
followed by
-//! an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body.
-//! See Avro 1.11.1 "Single object encoding".
-//!
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
-//! - **Confluent wire format**: Each record is prefixed with magic byte
`0x00` followed by
-//! a **big‑endian** 4‑byte schema ID, then the Avro body. Use
`FingerprintStrategy::Id(schema_id)`.
-//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
-//! - **Apicurio wire format**: Each record is prefixed with magic byte
`0x00` followed by
-//! a **big‑endian** 8‑byte schema ID, then the Avro body. Use
`FingerprintStrategy::Id64(schema_id)`.
-//!
<https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
+//! | Strategy | Prefix | Use Case |
+//! |----------|--------|----------|
+//! | `FingerprintStrategy::Rabin` (default) | `0xC3 0x01` + 8-byte LE Rabin
fingerprint | Standard Avro SOE |
+//! | `FingerprintStrategy::Id(id)` | `0x00` + 4-byte BE schema ID |
[Confluent Schema Registry] |
+//! | `FingerprintStrategy::Id64(id)` | `0x00` + 8-byte BE schema ID |
[Apicurio Registry] |
//!
-//! ## Choosing the Avro schema
+//! [Confluent Schema Registry]:
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
+//! [Apicurio Registry]:
https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry
+//!
+//! ## Choosing the Avro Schema
//!
//! By default, the writer converts your Arrow schema to Avro (including a
top‑level record
//! name). If you already have an Avro schema JSON you want to use verbatim,
put it into the
-//! Arrow schema metadata under the `avro.schema` key before constructing the
writer. The
-//! builder will use that schema instead of generating a new one (unless
`strip_metadata` is
-//! set to true in the options).
+//! Arrow schema metadata under the
[`SCHEMA_METADATA_KEY`](crate::schema::SCHEMA_METADATA_KEY)
+//! key before constructing the writer. The builder will use that schema
instead of generating
+//! a new one.
//!
//! ## Compression
//!
-//! For OCF, you may enable a compression codec via
`WriterBuilder::with_compression`. The
-//! chosen codec is written into the file header and used for subsequent
blocks. SOE stream
-//! writing doesn’t apply container‑level compression.
+//! For OCF ([`crate::writer::AvroWriter`]), you may enable a compression
codec via
+//! [`crate::writer::WriterBuilder::with_compression`]. The chosen codec is
written into the file header
+//! and used for subsequent blocks. SOE stream writing
([`crate::writer::AvroStreamWriter`], [`crate::writer::Encoder`])
+//! does not apply container‑level compression.
+//!
+//! # Examples
+//!
+//! ## Writing an OCF File
+//!
+//! ```
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![
+//! Field::new("id", DataType::Int64, false),
+//! Field::new("name", DataType::Utf8, false),
+//! ]);
+//!
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![
+//! Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+//! Arc::new(StringArray::from(vec!["alice", "bob"])) as ArrayRef,
+//! ],
+//! )?;
+//!
+//! let mut writer = AvroWriter::new(Vec::<u8>::new(), schema)?;
+//! writer.write(&batch)?;
+//! writer.finish()?;
+//! let bytes = writer.into_inner();
+//! assert!(!bytes.is_empty());
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! ## Using the Row-by-Row Encoder for Message Queues
+//!
+//! ```
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
+//! use arrow_avro::schema::FingerprintStrategy;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! // Build an Encoder with Confluent wire format (schema ID = 42)
+//! let mut encoder = WriterBuilder::new(schema)
+//! .with_fingerprint_strategy(FingerprintStrategy::Id(42))
+//! .build_encoder::<AvroSoeFormat>()?;
+//!
+//! encoder.encode(&batch)?;
+//!
+//! // Get the buffered rows (zero-copy views into a single backing buffer)
+//! let rows = encoder.flush();
+//! assert_eq!(rows.len(), 3);
+//!
+//! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body
+//! for row in rows.iter() {
+//! assert_eq!(row[0], 0x00); // Confluent magic byte
+//! }
+//! # Ok(())
+//! # }
+//! ```
//!
//! ---
use crate::codec::AvroFieldBuilder;
use crate::compression::CompressionCodec;
+use crate::errors::AvroError;
use crate::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy,
SCHEMA_METADATA_KEY,
};
use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
use arrow_array::RecordBatch;
-use arrow_schema::{ArrowError, Schema};
+use arrow_schema::{Schema, SchemaRef};
+use bytes::{Bytes, BytesMut};
use std::io::Write;
use std::sync::Arc;
@@ -79,11 +162,163 @@ mod encoder;
/// Logic for different Avro container file formats.
pub mod format;
+/// A contiguous set of Avro encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<usize>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use:
+/// `let vecs: Vec<Vec<u8>> = rows.iter().map(|b| b.to_vec()).collect();`
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+ data: Bytes,
+ offsets: Vec<usize>,
+}
+
+impl EncodedRows {
+ /// Create a new `EncodedRows` from a backing buffer and row boundary
offsets.
+ ///
+ /// `offsets` must have length `rows + 1`, and be monotonically
non-decreasing.
+ /// The last offset should equal `data.len()`.
+ pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
+ Self { data, offsets }
+ }
+
+ /// Returns the number of encoded rows stored in this container.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.offsets.len().saturating_sub(1)
+ }
+
+ /// Returns `true` if this container holds no encoded rows.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Returns a reference to the single contiguous backing buffer.
+ ///
+ /// This buffer contains the payloads of all rows concatenated together.
+ ///
+ /// # Note
+ ///
+ /// To access individual row payloads, prefer using [`Self::row`] or
[`Self::iter`]
+ /// rather than slicing this buffer manually.
+ #[inline]
+ pub fn bytes(&self) -> &Bytes {
+ &self.data
+ }
+
+ /// Returns the row boundary offsets.
+ ///
+ /// The returned slice always has the length `self.len() + 1`. The `n`th
row payload
+ /// corresponds to `bytes[offsets[n] ... offsets[n+1]]`.
+ #[inline]
+ pub fn offsets(&self) -> &[usize] {
+ &self.offsets
+ }
+
+ /// Return the `n`th row as a zero-copy `Bytes` slice.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if `n` is out of bounds or if the internal offsets
are invalid
+ /// (e.g., offsets are not within the backing buffer).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// assert_eq!(rows.iter().count(), 2);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
+ if n >= self.len() {
+ return Err(AvroError::General(format!(
+ "Row index {n} out of bounds for len {}",
+ self.len()
+ )));
+ }
+ // SAFETY:
+ // self.len() is defined as self.offsets.len().saturating_sub(1).
+ // The check `n >= self.len()` above ensures that `n <
self.offsets.len() - 1`.
+ // Therefore, both `n` and `n + 1` are strictly within the bounds of
`self.offsets`.
+ let (start, end) = unsafe {
+ (
+ *self.offsets.get_unchecked(n),
+ *self.offsets.get_unchecked(n + 1),
+ )
+ };
+ if start > end || end > self.data.len() {
+ return Err(AvroError::General(format!(
+ "Invalid row offsets for row {n}: start={start}, end={end},
data_len={}",
+ self.data.len()
+ )));
+ }
+ Ok(self.data.slice(start..end))
+ }
+
+ /// Iterate over rows as zero-copy `Bytes` slices.
+ ///
+ /// This iterator is infallible and is intended for the common case where
+ /// `EncodedRows` is produced by [`Encoder::flush`], which guarantees
valid offsets.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::WriterBuilder;
+ /// use arrow_avro::writer::format::AvroSoeFormat;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
+ /// )?;
+ ///
+ /// let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ /// encoder.encode(&batch)?;
+ /// let rows = encoder.flush();
+ ///
+ /// assert_eq!(rows.iter().count(), 2);
+ /// # Ok(())
+ /// # }
+ /// ```
+ #[inline]
+ pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
+ self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
+ }
+}
+
/// Builder to configure and create a `Writer`.
#[derive(Debug, Clone)]
pub struct WriterBuilder {
schema: Schema,
codec: Option<CompressionCodec>,
+ row_capacity: Option<usize>,
capacity: usize,
fingerprint_strategy: Option<FingerprintStrategy>,
}
@@ -99,6 +334,7 @@ impl WriterBuilder {
Self {
schema,
codec: None,
+ row_capacity: None,
capacity: 1024,
fingerprint_strategy: None,
}
@@ -117,30 +353,34 @@ impl WriterBuilder {
self
}
- /// Sets the capacity for the given object and returns the modified
instance.
+ /// Sets the expected capacity (in bytes) for internal buffers.
+ ///
+ /// This is used as a hint to pre-allocate staging buffers for writing.
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
- /// Create a new `Writer` with specified `AvroFormat` and builder options.
- /// Performs one‑time startup (header/stream init, encoder plan).
- pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
- where
- W: Write,
- F: AvroFormat,
- {
- let mut format = F::default();
+ /// Sets the expected byte size for each encoded row.
+ ///
+ /// This setting affects [`Encoder`] created via
[`build_encoder`](Self::build_encoder).
+ /// It is used as a hint to reduce reallocations when the typical encoded
row size is known.
+ pub fn with_row_capacity(mut self, capacity: usize) -> Self {
+ self.row_capacity = Some(capacity);
+ self
+ }
+
+ fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>,
RecordEncoder), AvroError> {
let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
Some(json) => AvroSchema::new(json.clone()),
None => AvroSchema::try_from(&self.schema)?,
};
let maybe_fingerprint = if F::NEEDS_PREFIX {
- match self.fingerprint_strategy {
- Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
- Some(FingerprintStrategy::Id64(id)) =>
Some(Fingerprint::Id64(id)),
+ match &self.fingerprint_strategy {
+ Some(FingerprintStrategy::Id(id)) =>
Some(Fingerprint::Id(*id)),
+ Some(FingerprintStrategy::Id64(id)) =>
Some(Fingerprint::Id64(*id)),
Some(strategy) => {
-
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
+
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
}
None => Some(
avro_schema
@@ -156,11 +396,48 @@ impl WriterBuilder {
avro_schema.clone().json_string,
);
let schema =
Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
- format.start_stream(&mut writer, &schema, self.codec)?;
let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
.with_fingerprint(maybe_fingerprint)
.build()?;
+ Ok((schema, encoder))
+ }
+
+ /// Build a new [`Encoder`] for the given [`AvroFormat`].
+ ///
+ /// `Encoder` only supports stream formats (no OCF sync markers).
Attempting to build an
+ /// encoder with an OCF format (e.g. [`AvroOcfFormat`]) will return an
error.
+ pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
+ if F::default().sync_marker().is_some() {
+ return Err(AvroError::InvalidArgument(
+ "Encoder only supports stream formats (no OCF header/sync
marker)".to_string(),
+ ));
+ }
+ let (schema, encoder) = self.prepare_encoder::<F>()?;
+ Ok(Encoder {
+ schema,
+ encoder,
+ row_capacity: self.row_capacity,
+ buffer: BytesMut::with_capacity(self.capacity),
+ offsets: vec![0],
+ })
+ }
+
+ /// Build a new [`Writer`] with the specified [`AvroFormat`] and builder
options.
+ pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
+ where
+ W: Write,
+ F: AvroFormat,
+ {
+ let mut format = F::default();
+ if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
+ return Err(AvroError::InvalidArgument(
+ "AvroBinaryFormat is only supported with Encoder, use
build_encoder instead"
+ .to_string(),
+ ));
+ }
+ let (schema, encoder) = self.prepare_encoder::<F>()?;
+ format.start_stream(&mut writer, &schema, self.codec)?;
Ok(Writer {
writer,
schema,
@@ -172,6 +449,110 @@ impl WriterBuilder {
}
}
+/// A row-by-row encoder for Avro *stream/message* formats (SOE / registry
wire formats / raw binary).
+///
+/// Unlike [`Writer`], which emits a single continuous byte stream to a
[`std::io::Write`] sink,
+/// `Encoder` tracks row boundaries during encoding and returns an
[`EncodedRows`] containing:
+/// - one backing buffer (`Bytes`)
+/// - row boundary offsets
+///
+/// This enables zero-copy per-row payloads (for instance, one Kafka message
per Arrow row) without
+/// re-encoding or decoding the byte stream to recover record boundaries.
+///
+/// ### Example
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
+/// use arrow_avro::schema::FingerprintStrategy;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// let schema = Schema::new(vec![Field::new("value", DataType::Int32,
false)]);
+/// let batch = RecordBatch::try_new(
+/// Arc::new(schema.clone()),
+/// vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+/// )?;
+///
+/// // Configure the encoder (here: Confluent Wire Format with schema ID 100)
+/// let mut encoder = WriterBuilder::new(schema)
+/// .with_fingerprint_strategy(FingerprintStrategy::Id(100))
+/// .build_encoder::<AvroSoeFormat>()?;
+///
+/// // Encode the batch
+/// encoder.encode(&batch)?;
+///
+/// // Get the encoded rows
+/// let rows = encoder.flush();
+///
+/// // Convert to owned Vec<u8> payloads (e.g., for a Kafka producer)
+/// let payloads: Vec<Vec<u8>> = rows.iter().map(|row| row.to_vec()).collect();
+///
+/// assert_eq!(payloads.len(), 3);
+/// assert_eq!(payloads[0][0], 0x00); // Magic byte
+/// # Ok(())
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct Encoder {
+ schema: SchemaRef,
+ encoder: RecordEncoder,
+ row_capacity: Option<usize>,
+ buffer: BytesMut,
+ offsets: Vec<usize>,
+}
+
+impl Encoder {
+ /// Serialize one [`RecordBatch`] into the internal buffer.
+ pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
+ if batch.schema().fields() != self.schema.fields() {
+ return Err(AvroError::SchemaError(
+ "Schema of RecordBatch differs from Writer schema".to_string(),
+ ));
+ }
+ self.encoder.encode_rows(
+ batch,
+ self.row_capacity.unwrap_or(0),
+ &mut self.buffer,
+ &mut self.offsets,
+ )?;
+ Ok(())
+ }
+
+ /// A convenience method to write a slice of [`RecordBatch`] values.
+ pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(),
AvroError> {
+ for b in batches {
+ self.encode(b)?;
+ }
+ Ok(())
+ }
+
+ /// Drain and return all currently buffered encoded rows.
+ ///
+ /// The returned [`EncodedRows`] provides per-row payloads as `Bytes`
slices.
+ pub fn flush(&mut self) -> EncodedRows {
+ let data = self.buffer.split().freeze();
+ let mut offsets = Vec::with_capacity(self.offsets.len());
+ offsets.append(&mut self.offsets);
+ self.offsets.push(0);
+ EncodedRows::new(data, offsets)
+ }
+
+ /// Returns the Arrow schema used by this encoder.
+ ///
+ /// The returned schema includes metadata with the Avro schema JSON under
+ /// the `avro.schema` key.
+ pub fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Returns the number of encoded rows currently buffered.
+ pub fn buffered_len(&self) -> usize {
+ self.offsets.len().saturating_sub(1)
+ }
+}
+
/// Generic Avro writer.
///
/// This type is generic over the output Write sink (`W`) and the Avro format
(`F`).
@@ -182,7 +563,7 @@ impl WriterBuilder {
#[derive(Debug)]
pub struct Writer<W: Write, F: AvroFormat> {
writer: W,
- schema: Arc<Schema>,
+ schema: SchemaRef,
format: F,
compression: Option<CompressionCodec>,
capacity: usize,
@@ -290,7 +671,7 @@ impl<W: Write> Writer<W, AvroOcfFormat> {
/// assert!(!bytes.is_empty());
/// # Ok(()) }
/// ```
- pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
+ pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
}
@@ -328,16 +709,16 @@ impl<W: Write> Writer<W, AvroSoeFormat> {
/// assert!(!bytes.is_empty());
/// # Ok(()) }
/// ```
- pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
+ pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
}
}
impl<W: Write, F: AvroFormat> Writer<W, F> {
/// Serialize one [`RecordBatch`] to the output.
- pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
if batch.schema().fields() != self.schema.fields() {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Schema of RecordBatch differs from Writer schema".to_string(),
));
}
@@ -350,7 +731,7 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
/// A convenience method to write a slice of [`RecordBatch`].
///
/// This is equivalent to calling `write` for each batch in the slice.
- pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(),
ArrowError> {
+ pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(),
AvroError> {
for b in batches {
self.write(b)?;
}
@@ -358,10 +739,10 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
}
/// Flush remaining buffered data and (for OCF) ensure the header is
present.
- pub fn finish(&mut self) -> Result<(), ArrowError> {
+ pub fn finish(&mut self) -> Result<(), AvroError> {
self.writer
.flush()
- .map_err(|e| ArrowError::IoError(format!("Error flushing writer:
{e}"), e))
+ .map_err(|e| AvroError::IoError(format!("Error flushing writer:
{e}"), e))
}
/// Consume the writer, returning the underlying output object.
@@ -369,7 +750,7 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
self.writer
}
- fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) ->
Result<(), ArrowError> {
+ fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) ->
Result<(), AvroError> {
let mut buf = Vec::<u8>::with_capacity(self.capacity);
self.encoder.encode(&mut buf, batch)?;
let encoded = match self.compression {
@@ -380,14 +761,14 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
write_long(&mut self.writer, encoded.len() as i64)?;
self.writer
.write_all(&encoded)
- .map_err(|e| ArrowError::IoError(format!("Error writing Avro
block: {e}"), e))?;
+ .map_err(|e| AvroError::IoError(format!("Error writing Avro block:
{e}"), e))?;
self.writer
.write_all(sync)
- .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync:
{e}"), e))?;
+ .map_err(|e| AvroError::IoError(format!("Error writing Avro sync:
{e}"), e))?;
Ok(())
}
- fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
+ fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
self.encoder.encode(&mut self.writer, batch)?;
Ok(())
}
@@ -401,6 +782,8 @@ mod tests {
use crate::schema::{AvroSchema, SchemaStore};
use crate::test_util::arrow_test_data;
use arrow::datatypes::TimeUnit;
+ use arrow::util::pretty::pretty_format_batches;
+ use arrow_array::builder::{Int32Builder, ListBuilder};
#[cfg(feature = "avro_custom_types")]
use arrow_array::types::{Int16Type, Int32Type, Int64Type};
use arrow_array::types::{
@@ -408,16 +791,17 @@ mod tests {
TimestampMillisecondType, TimestampNanosecondType,
};
use arrow_array::{
- Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray,
RecordBatch,
- StringArray, StructArray, UnionArray,
+ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Int32Array,
Int64Array,
+ PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray,
};
#[cfg(feature = "avro_custom_types")]
- use arrow_array::{Int16Array, Int64Array, RunArray};
+ use arrow_array::{Int16Array, RunArray};
use arrow_schema::UnionMode;
#[cfg(not(feature = "avro_custom_types"))]
use arrow_schema::{DataType, Field, Schema};
#[cfg(feature = "avro_custom_types")]
use arrow_schema::{DataType, Field, Schema};
+ use bytes::BytesMut;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs::File;
@@ -461,7 +845,7 @@ mod tests {
}
#[test]
- fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError>
{
+ fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
@@ -679,7 +1063,7 @@ mod tests {
}
#[test]
- fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
+ fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
use arrow_array::UnionArray;
use arrow_buffer::Buffer;
use arrow_schema::UnionFields;
@@ -724,7 +1108,7 @@ mod tests {
}
#[test]
- fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
+ fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
@@ -758,7 +1142,7 @@ mod tests {
}
#[test]
- fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError>
{
+ fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
@@ -792,7 +1176,7 @@ mod tests {
}
#[test]
- fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
+ fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
let batch = make_batch();
let buffer: Vec<u8> = Vec::new();
let mut writer = AvroWriter::new(buffer, make_schema())?;
@@ -812,11 +1196,11 @@ mod tests {
let buffer = Vec::<u8>::new();
let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
let err = writer.write(&batch).unwrap_err();
- assert!(matches!(err, ArrowError::SchemaError(_)));
+ assert!(matches!(err, AvroError::SchemaError(_)));
}
#[test]
- fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
+ fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
let batch1 = make_batch();
let batch2 = make_batch();
let buffer = Vec::<u8>::new();
@@ -829,7 +1213,7 @@ mod tests {
}
#[test]
- fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
+ fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
let buffer = Vec::<u8>::new();
let mut writer = AvroWriter::new(buffer, make_schema())?;
writer.finish()?;
@@ -839,7 +1223,7 @@ mod tests {
}
#[test]
- fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
+ fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
let mut buf = Vec::new();
write_long(&mut buf, 0)?;
write_long(&mut buf, -1)?;
@@ -854,7 +1238,7 @@ mod tests {
}
#[test]
- fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
+ fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
for rel in files() {
let path = arrow_test_data(rel);
let rdr_file = File::open(&path).expect("open input avro");
@@ -903,7 +1287,7 @@ mod tests {
}
#[test]
- fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
+ fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
let path = arrow_test_data("avro/nested_records.avro");
let rdr_file = File::open(&path).expect("open nested_records.avro");
let reader = ReaderBuilder::new()
@@ -937,7 +1321,7 @@ mod tests {
#[test]
#[cfg(feature = "snappy")]
- fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
+ fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
let path = arrow_test_data("avro/nested_lists.snappy.avro");
let rdr_file = File::open(&path).expect("open
nested_lists.snappy.avro");
let reader = ReaderBuilder::new()
@@ -972,7 +1356,7 @@ mod tests {
}
#[test]
- fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
+ fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
let path = arrow_test_data("avro/simple_fixed.avro");
let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
let reader = ReaderBuilder::new()
@@ -1003,7 +1387,7 @@ mod tests {
// Strict equality (schema + values) only when canonical extension types
are enabled
#[test]
#[cfg(feature = "canonical_extension_types")]
- fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
+ fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
use arrow_schema::{DataType, IntervalUnit};
let in_file =
File::open("test/data/duration_uuid.avro").expect("open
test/data/duration_uuid.avro");
@@ -1051,8 +1435,7 @@ mod tests {
// Feature OFF: only values are asserted equal; schema may legitimately
differ (uuid as fixed(16))
#[test]
#[cfg(not(feature = "canonical_extension_types"))]
- fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() ->
Result<(), ArrowError>
- {
+ fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() ->
Result<(), AvroError> {
use arrow::datatypes::{DataType, IntervalUnit};
use std::io::BufReader;
@@ -1133,7 +1516,7 @@ mod tests {
#[test]
// TODO: avoid requiring snappy for this file
#[cfg(feature = "snappy")]
- fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
+ fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
// Load source Avro with Map fields
let path = arrow_test_data("avro/nonnullable.impala.avro");
let rdr_file = File::open(&path).expect("open
avro/nonnullable.impala.avro");
@@ -1180,7 +1563,7 @@ mod tests {
#[test]
// TODO: avoid requiring snappy for these files
#[cfg(feature = "snappy")]
- fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
+ fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
// (file, resolve via ARROW_TEST_DATA?)
let files: [(&str, bool); 8] = [
("avro/fixed_length_decimal.avro", true), // fixed-backed ->
Decimal128(25,2)
@@ -1229,7 +1612,7 @@ mod tests {
}
#[test]
- fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
+ fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
// 1. Read the new, more complex named references file.
let path =
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
@@ -1526,7 +1909,7 @@ mod tests {
}
#[test]
- fn test_union_roundtrip() -> Result<(), ArrowError> {
+ fn test_union_roundtrip() -> Result<(), AvroError> {
let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test/data/union_fields.avro")
.to_string_lossy()
@@ -1560,7 +1943,7 @@ mod tests {
}
#[test]
- fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
+ fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
// Read the known-good enum file (same as reader::test_simple)
let path = arrow_test_data("avro/simple_enum.avro");
let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
@@ -1603,7 +1986,7 @@ mod tests {
}
#[test]
- fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
+ fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
let cap = 64 * 1024;
let buffer = Vec::<u8>::new();
let mut writer = WriterBuilder::new(make_schema())
@@ -1619,7 +2002,7 @@ mod tests {
}
#[test]
- fn test_stream_writer_stores_capacity_direct_writes() -> Result<(),
ArrowError> {
+ fn test_stream_writer_stores_capacity_direct_writes() -> Result<(),
AvroError> {
use arrow_array::{ArrayRef, Int32Array};
use arrow_schema::{DataType, Field, Schema};
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
@@ -1639,7 +2022,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
+ fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test/data/duration_logical_types.avro")
.to_string_lossy()
@@ -1703,7 +2086,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
+ fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
let run_values = Int32Array::from(vec![Some(1), Some(2), None,
Some(3)]);
let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
@@ -1747,7 +2130,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() ->
Result<(), ArrowError>
+ fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() ->
Result<(), AvroError>
{
let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
@@ -1790,8 +2173,8 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
- -> Result<(), ArrowError> {
+ fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
-> Result<(), AvroError>
+ {
let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
@@ -1830,7 +2213,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(),
ArrowError> {
+ fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError>
{
let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
let run_values = Int32Array::from(vec![Some(1), Some(2), None,
Some(3)]);
let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
@@ -1938,7 +2321,7 @@ mod tests {
#[cfg(not(feature = "avro_custom_types"))]
#[test]
- fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(),
ArrowError> {
+ fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(),
AvroError> {
use arrow_schema::{DataType, Field, Schema};
let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2),
None, Some(3)]);
@@ -1983,7 +2366,7 @@ mod tests {
#[cfg(not(feature = "avro_custom_types"))]
#[test]
fn
test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
- -> Result<(), ArrowError> {
+ -> Result<(), AvroError> {
use arrow_schema::{DataType, Field, Schema};
let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
let run_values = arrow_array::StringArray::from(vec![Some("a"), None,
Some("c")]);
@@ -2027,7 +2410,7 @@ mod tests {
#[cfg(not(feature = "avro_custom_types"))]
#[test]
fn
test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
- -> Result<(), ArrowError> {
+ -> Result<(), AvroError> {
use arrow_schema::{DataType, Field, Schema};
let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
@@ -2071,7 +2454,7 @@ mod tests {
#[cfg(not(feature = "avro_custom_types"))]
#[test]
- fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() ->
Result<(), ArrowError> {
+ fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() ->
Result<(), AvroError> {
use arrow_schema::{DataType, Field, Schema};
let run_ends = Int32Array::from(vec![2, 4, 6]);
let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
@@ -2107,7 +2490,7 @@ mod tests {
#[test]
// TODO: avoid requiring snappy for this file
#[cfg(feature = "snappy")]
- fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
+ fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
let path = arrow_test_data("avro/nullable.impala.avro");
let rdr_file = File::open(&path).expect("open
avro/nullable.impala.avro");
let reader = ReaderBuilder::new()
@@ -2142,7 +2525,7 @@ mod tests {
#[test]
#[cfg(feature = "snappy")]
- fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
+ fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
let path = arrow_test_data("avro/datapage_v2.snappy.avro");
let rdr_file = File::open(&path).expect("open
avro/datapage_v2.snappy.avro");
let reader = ReaderBuilder::new()
@@ -2172,7 +2555,7 @@ mod tests {
#[test]
#[cfg(feature = "snappy")]
- fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
+ fn test_single_nan_roundtrip() -> Result<(), AvroError> {
let path = arrow_test_data("avro/single_nan.avro");
let in_file = File::open(&path).expect("open avro/single_nan.avro");
let reader = ReaderBuilder::new()
@@ -2202,7 +2585,7 @@ mod tests {
#[test]
// TODO: avoid requiring snappy for this file
#[cfg(feature = "snappy")]
- fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
+ fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
let path = arrow_test_data("avro/dict-page-offset-zero.avro");
let rdr_file = File::open(&path).expect("open
avro/dict-page-offset-zero.avro");
let reader = ReaderBuilder::new()
@@ -2233,7 +2616,7 @@ mod tests {
#[test]
#[cfg(feature = "snappy")]
- fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
+ fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
let path = arrow_test_data("avro/repeated_no_annotation.avro");
let in_file = File::open(&path).expect("open
avro/repeated_no_annotation.avro");
let reader = ReaderBuilder::new()
@@ -2262,7 +2645,7 @@ mod tests {
}
#[test]
- fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
+ fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test/data/nested_record_reuse.avro")
.to_string_lossy()
@@ -2294,7 +2677,7 @@ mod tests {
}
#[test]
- fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
+ fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
let path =
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
let rdr_file = std::fs::File::open(&path).expect("open
test/data/enum_reuse.avro");
@@ -2324,7 +2707,7 @@ mod tests {
}
#[test]
- fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
+ fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test/data/comprehensive_e2e.avro");
let rdr_file = File::open(&path).expect("open
test/data/comprehensive_e2e.avro");
@@ -2355,7 +2738,7 @@ mod tests {
}
#[test]
- fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
+ fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
let schema = Schema::new(vec![
Field::new("d32", DataType::Date32, false),
Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond),
false),
@@ -2410,4 +2793,463 @@ mod tests {
assert_eq!(roundtrip, batch);
Ok(())
}
+
+ fn make_encoder_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ])
+ }
+
+ fn make_encoder_batch(schema: &Schema) -> RecordBatch {
+ let a = Int32Array::from(vec![1, 2, 3]);
+ let b = Int32Array::from(vec![10, 20, 30]);
+ RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
+ )
+ .expect("failed to build test RecordBatch")
+ }
+
+ fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch,
AvroSchema), AvroError> {
+ let avro_json = r#"
+ {
+ "type": "record",
+ "name": "User",
+ "fields": [
+ { "name": "id", "type": "long" },
+ { "name": "name", "type": "string" },
+ { "name": "active", "type": "boolean" },
+ { "name": "tags", "type": { "type": "array", "items": "int" } },
+ { "name": "opt", "type": ["null", "string"], "default": null }
+ ]
+ }"#;
+ let avro_schema = AvroSchema::new(avro_json.to_string());
+ let mut md = HashMap::new();
+ md.insert(
+ SCHEMA_METADATA_KEY.to_string(),
+ avro_schema.json_string.clone(),
+ );
+ let item_field = Arc::new(Field::new(
+ Field::LIST_FIELD_DEFAULT_NAME,
+ DataType::Int32,
+ false,
+ ));
+ let schema = Schema::new_with_metadata(
+ vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("active", DataType::Boolean, false),
+ Field::new("tags", DataType::List(item_field.clone()), false),
+ Field::new("opt", DataType::Utf8, true),
+ ],
+ md,
+ );
+ let id = Int64Array::from(vec![1, 2, 3]);
+ let name = StringArray::from(vec!["alice", "bob", "carol"]);
+ let active = BooleanArray::from(vec![true, false, true]);
+ let mut tags_builder =
ListBuilder::new(Int32Builder::new()).with_field(item_field);
+ tags_builder.values().append_value(1);
+ tags_builder.values().append_value(2);
+ tags_builder.append(true);
+ tags_builder.append(true);
+ tags_builder.values().append_value(3);
+ tags_builder.append(true);
+ let tags = tags_builder.finish();
+ let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(id) as ArrayRef,
+ Arc::new(name) as ArrayRef,
+ Arc::new(active) as ArrayRef,
+ Arc::new(tags) as ArrayRef,
+ Arc::new(opt) as ArrayRef,
+ ],
+ )?;
+ Ok((schema, batch, avro_schema))
+ }
+
+ #[test]
+ fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
+ let schema = make_encoder_schema();
+ let batch = make_encoder_batch(&schema);
+ let mut stream = AvroStreamWriter::new(Vec::<u8>::new(),
schema.clone())?;
+ stream.write(&batch)?;
+ stream.finish()?;
+ let stream_bytes = stream.into_inner();
+ let mut row_writer =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ row_writer.encode(&batch)?;
+ let rows = row_writer.flush();
+ let row_bytes: Vec<u8> = rows.bytes().to_vec();
+ assert_eq!(stream_bytes, row_bytes);
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
+ let schema = make_encoder_schema();
+ let batch = make_encoder_batch(&schema);
+ let mut row_writer =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ row_writer.encode(&batch)?;
+ assert_eq!(row_writer.buffered_len(), batch.num_rows());
+ let out1 = row_writer.flush();
+ assert_eq!(out1.len(), batch.num_rows());
+ assert_eq!(row_writer.buffered_len(), 0);
+ let out2 = row_writer.flush();
+ assert_eq!(out2.len(), 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(),
AvroError> {
+ let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+ let mut store = SchemaStore::new();
+ store.register(avro_schema.clone())?;
+ let mut row_writer =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ row_writer.encode(&batch)?;
+ let rows = row_writer.flush();
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(1024)
+ .build_decoder()?;
+ for row in rows.iter() {
+ let consumed = decoder.decode(row.as_ref())?;
+ assert_eq!(
+ consumed,
+ row.len(),
+ "decoder should consume the full row frame"
+ );
+ }
+ let out = decoder.flush()?.expect("decoded batch");
+ let expected =
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+ let actual = pretty_format_batches(&[out])?.to_string();
+ assert_eq!(expected, actual);
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(),
AvroError> {
+ let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+ let mut store = SchemaStore::new();
+ store.register(avro_schema.clone())?;
+ let mut row_writer =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ row_writer.encode(&batch)?;
+ let rows = row_writer.flush();
+ // Build a contiguous stream and frame boundaries (prefix sums) from
EncodedRows.
+ let mut stream: Vec<u8> = Vec::new();
+ let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
+ boundaries.push(0usize);
+ for row in rows.iter() {
+ stream.extend_from_slice(row.as_ref());
+ boundaries.push(stream.len());
+ }
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(1024)
+ .build_decoder()?;
+ let mut buffered = BytesMut::new();
+ let chunk_rows = [1usize, 2, 3, 1, 4, 2];
+ let mut row_idx = 0usize;
+ let mut i = 0usize;
+ let n_rows = rows.len();
+ while row_idx < n_rows {
+ let take = chunk_rows[i % chunk_rows.len()];
+ i += 1;
+ let end_row = (row_idx + take).min(n_rows);
+ let byte_start = boundaries[row_idx];
+ let byte_end = boundaries[end_row];
+ buffered.extend_from_slice(&stream[byte_start..byte_end]);
+ loop {
+ let consumed = decoder.decode(&buffered)?;
+ if consumed == 0 {
+ break;
+ }
+ let _ = buffered.split_to(consumed);
+ }
+ assert!(
+ buffered.is_empty(),
+ "expected decoder to consume the entire frame-aligned chunk"
+ );
+ row_idx = end_row;
+ }
+ let out = decoder.flush()?.expect("decoded batch");
+ let expected =
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+ let actual = pretty_format_batches(&[out])?.to_string();
+ assert_eq!(expected, actual);
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() ->
Result<(), AvroError> {
+ let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+ let schema_id: u32 = 42;
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
+ store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
+ let mut row_writer = WriterBuilder::new(schema)
+ .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
+ .build_encoder::<AvroSoeFormat>()?;
+ row_writer.encode(&batch)?;
+ let rows = row_writer.flush();
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(1024)
+ .build_decoder()?;
+ for row in rows.iter() {
+ let consumed = decoder.decode(row.as_ref())?;
+ assert_eq!(consumed, row.len());
+ }
+ let out = decoder.flush()?.expect("decoded batch");
+ let expected =
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+ let actual = pretty_format_batches(&[out])?.to_string();
+ assert_eq!(expected, actual);
+ Ok(())
+ }
+ #[test]
+ fn
test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
+ -> Result<(), AvroError> {
+ use crate::writer::format::AvroBinaryFormat;
+ use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ use arrow_schema::{DataType, Field, Schema};
+ use std::sync::Arc;
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let schema_ref = Arc::new(schema.clone());
+ let batch1 = RecordBatch::try_new(
+ schema_ref.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
+ ],
+ )?;
+ let batch2 = RecordBatch::try_new(
+ schema_ref,
+ vec![
+ Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
+ ],
+ )?;
+ let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
+ let empty = Encoder::flush(&mut encoder);
+ assert_eq!(EncodedRows::len(&empty), 0);
+ assert!(EncodedRows::is_empty(&empty));
+ assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
+ assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
+ assert_eq!(EncodedRows::iter(&empty).count(), 0);
+ let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b|
b.to_vec()).collect();
+ assert!(empty_vecs.is_empty());
+ let batches = vec![batch1, batch2];
+ Encoder::encode_batches(&mut encoder, &batches)?;
+ assert_eq!(encoder.buffered_len(), 5);
+ let rows = Encoder::flush(&mut encoder);
+ assert_eq!(
+ encoder.buffered_len(),
+ 0,
+ "Encoder::flush should reset the internal offsets"
+ );
+ assert_eq!(EncodedRows::len(&rows), 5);
+ assert!(!EncodedRows::is_empty(&rows));
+ let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
+ assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
+ let expected_rows: Vec<Vec<u8>> = vec![
+ vec![2, 20],
+ vec![4, 40],
+ vec![6, 60],
+ vec![8, 80],
+ vec![10, 100],
+ ];
+ let expected_stream: Vec<u8> = expected_rows.concat();
+ assert_eq!(
+ EncodedRows::bytes(&rows).as_ref(),
+ expected_stream.as_slice()
+ );
+ for (i, expected) in expected_rows.iter().enumerate() {
+ assert_eq!(EncodedRows::row(&rows, i)?.as_ref(),
expected.as_slice());
+ }
+ let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b|
b.to_vec()).collect();
+ assert_eq!(iter_rows, expected_rows);
+ let recreated = EncodedRows::new(
+ EncodedRows::bytes(&rows).clone(),
+ EncodedRows::offsets(&rows).to_vec(),
+ );
+ assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
+ assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
+ assert_eq!(
+ EncodedRows::offsets(&recreated),
+ EncodedRows::offsets(&rows)
+ );
+ let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b|
b.to_vec()).collect();
+ assert_eq!(rec_vecs, iter_rows);
+ let empty_again = Encoder::flush(&mut encoder);
+ assert!(EncodedRows::is_empty(&empty_again));
+ Ok(())
+ }
+
+ #[test]
+ fn test_writer_builder_build_rejects_avro_binary_format() {
+ use crate::writer::format::AvroBinaryFormat;
+ use arrow_schema::{DataType, Field, Schema};
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+ let err = WriterBuilder::new(schema)
+ .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
+ .unwrap_err();
+ match err {
+ AvroError::InvalidArgument(msg) => assert_eq!(
+ msg,
+ "AvroBinaryFormat is only supported with Encoder, use
build_encoder instead"
+ ),
+ other => panic!("expected InvalidArgumentError, got {:?}", other),
+ }
+ }
+ #[test]
+ fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
+ -> Result<(), AvroError> {
+ use crate::writer::format::AvroBinaryFormat;
+ let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+ let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
+ let expected = arrow::compute::concat_batches(&batch.schema(),
&batches)?;
+ let mut binary_encoder =
+
WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
+ binary_encoder.encode_batches(&batches)?;
+ let binary_rows = binary_encoder.flush();
+ assert_eq!(
+ binary_rows.len(),
+ expected.num_rows(),
+ "binary encoder row count mismatch"
+ );
+ let mut soe_encoder =
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+ soe_encoder.encode_batches(&batches)?;
+ let soe_rows = soe_encoder.flush();
+ assert_eq!(
+ soe_rows.len(),
+ binary_rows.len(),
+ "SOE vs binary row count mismatch"
+ );
+ let mut store = SchemaStore::new(); // Rabin by default
+ let fp = store.register(avro_schema)?;
+ let fp_le_bytes = match fp {
+ Fingerprint::Rabin(v) => v.to_le_bytes(),
+ other => panic!("expected Rabin fingerprint from
SchemaStore::new(), got {other:?}"),
+ };
+ const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
+ const SOE_PREFIX_LEN: usize = 2 + 8;
+ for i in 0..binary_rows.len() {
+ let body = binary_rows.row(i)?;
+ let soe = soe_rows.row(i)?;
+ assert!(
+ soe.len() >= SOE_PREFIX_LEN,
+ "expected SOE row to include prefix"
+ );
+ assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
+ assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
+ assert_eq!(
+ &soe.as_ref()[SOE_PREFIX_LEN..],
+ body.as_ref(),
+ "SOE body bytes differ from AvroBinaryFormat body bytes (row
{i})"
+ );
+ }
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(1024)
+ .build_decoder()?;
+ for body in binary_rows.iter() {
+ let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
+ framed.extend_from_slice(&SOE_MAGIC);
+ framed.extend_from_slice(&fp_le_bytes);
+ framed.extend_from_slice(body.as_ref());
+ let consumed = decoder.decode(&framed)?;
+ assert_eq!(
+ consumed,
+ framed.len(),
+ "decoder should consume the full SOE-framed message"
+ );
+ }
+ let out = decoder.flush()?.expect("expected a decoded RecordBatch");
+ let expected_str = pretty_format_batches(&[expected])?.to_string();
+ let actual_str = pretty_format_batches(&[out])?.to_string();
+ assert_eq!(expected_str, actual_str);
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
+ -> Result<(), AvroError> {
+ use crate::writer::format::AvroBinaryFormat;
+ let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+ let mut encoder =
WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
+ encoder.encode(&batch)?;
+ let rows = encoder.flush();
+ let mut store = SchemaStore::new();
+ let fp = store.register(avro_schema)?;
+ let fp_le_bytes = match fp {
+ Fingerprint::Rabin(v) => v.to_le_bytes(),
+ other => panic!("expected Rabin fingerprint from
SchemaStore::new(), got {other:?}"),
+ };
+ const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
+ const SOE_PREFIX_LEN: usize = 2 + 8;
+ let mut stream: Vec<u8> = Vec::new();
+ for body in rows.iter() {
+ let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
+ .try_into()
+ .expect("message length must fit in u32");
+ stream.extend_from_slice(&msg_len.to_le_bytes());
+ stream.extend_from_slice(&SOE_MAGIC);
+ stream.extend_from_slice(&fp_le_bytes);
+ stream.extend_from_slice(body.as_ref());
+ }
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(1024)
+ .build_decoder()?;
+ let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
+ let mut pos = 0usize;
+ let mut i = 0usize;
+ let mut buffered = BytesMut::new();
+ let mut decoded_frames = 0usize;
+ while pos < stream.len() {
+ let take = chunk_sizes[i % chunk_sizes.len()];
+ i += 1;
+ let end = (pos + take).min(stream.len());
+ buffered.extend_from_slice(&stream[pos..end]);
+ pos = end;
+ loop {
+ if buffered.len() < 4 {
+ break;
+ }
+ let msg_len =
+ u32::from_le_bytes([buffered[0], buffered[1], buffered[2],
buffered[3]])
+ as usize;
+ if buffered.len() < 4 + msg_len {
+ break;
+ }
+ let frame = buffered.split_to(4 + msg_len);
+ let payload = &frame[4..];
+ let consumed = decoder.decode(payload)?;
+ assert_eq!(
+ consumed,
+ payload.len(),
+ "decoder should consume the full SOE-framed message"
+ );
+
+ decoded_frames += 1;
+ }
+ }
+ assert!(
+ buffered.is_empty(),
+ "expected transport framer to consume all bytes; leftover = {}",
+ buffered.len()
+ );
+ assert_eq!(
+ decoded_frames,
+ rows.len(),
+ "expected to decode exactly one frame per encoded row"
+ );
+ let out = decoder.flush()?.expect("expected decoded RecordBatch");
+ let expected_str =
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+ let actual_str = pretty_format_batches(&[out])?.to_string();
+ assert_eq!(expected_str, actual_str);
+ Ok(())
+ }
}