This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new fab8e75eff Add BinaryFormatSupport and Row Encoder to `arrow-avro` 
Writer (#9171)
fab8e75eff is described below

commit fab8e75eff6d1dd71708b71e1a2a85275394fa80
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Jan 26 19:20:24 2026 -0600

    Add BinaryFormatSupport and Row Encoder to `arrow-avro` Writer (#9171)
    
    # Which issue does this PR close?
    
    - Closes #8701.
    
    # Rationale for this change
    
    `arrow-avro` already supports writing Avro Object Container Files (OCF)
    and framed streaming encodings (e.g. Single-Object Encoding / registry
    wire formats). However, many systems exchange **raw Avro binary datum
    payloads** (i.e. *only* the Avro record body bytes) while supplying the
    schema out-of-band (configuration, RPC contract, topic metadata, etc.).
    
    Without first-class support for unframed datum output, users must
    either:
    - accept framing overhead that downstream systems don’t expect, or
    - re-implement datum encoding themselves.
    
    This PR adds the missing unframed write path and exposes a row-by-row
    encoding API to make it easy to embed Avro datums into other transport
    protocols.
    
    # What changes are included in this PR?
    
    - Added `AvroBinaryFormat` (unframed) as an `AvroFormat` implementation
    to emit **raw Avro record body bytes** (no SOE prefix and no OCF header)
    and to explicitly reject container-level compression for this format.
    - Added `RecordEncoder::encode_rows` to encode a `RecordBatch` into a
    single contiguous buffer while tracking per-row boundaries via appended
    offsets.
    - Introduced a higher-level `Encoder` + `EncodedRows` API for row-by-row
    streaming use cases, providing zero-copy access to individual row slices
    (via `Bytes`).
    - Updated the writer API to provide `build_encoder` for stream formats
    (e.g. SOE) and added row-capacity configuration to better support
    incremental/streaming workflows.
    - Added the `bytes` crate as a dependency to support efficient buffering
    and slicing in the row encoder, and adjusted dev-dependencies to support
    the new tests/docs.
    
    # Are these changes tested?
    
    Yes.
    
    This PR adds unit tests that cover:
    - single- and multi-column row encoding
    - nullable columns
    - prefix-based vs. unprefixed row encoding behavior
    - empty batch encoding
    - appending to existing output buffers and validating offset invariants
    
    # Are there any user-facing changes?
    
    Yes, these changes are additive (no breaking public API changes
    expected).
    
    - New writer format support for **unframed Avro binary datum** output
    (`AvroBinaryFormat`).
    - New row-by-row encoding APIs (`RecordEncoder::encode_rows`, `Encoder`,
    `EncodedRows`) to support zero-copy access to per-row encoded bytes.
    - New `WriterBuilder` functionality (`build_encoder` + row-capacity
    configuration) to enable encoder construction without committing to a
    specific `Write` sink.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-avro/Cargo.toml            |    8 +-
 arrow-avro/benches/encoder.rs    |   87 ++++
 arrow-avro/src/writer/encoder.rs |  396 ++++++++++++++-
 arrow-avro/src/writer/format.rs  |   78 +++
 arrow-avro/src/writer/mod.rs     | 1042 ++++++++++++++++++++++++++++++++++----
 5 files changed, 1485 insertions(+), 126 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 48cea8467e..a4699a2efa 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -66,6 +66,7 @@ indexmap = "2.10"
 rand = "0.9"
 md5 = { version = "0.8", optional = true }
 sha2 = { version = "0.10", optional = true }
+bytes = "1.11"
 
 [dev-dependencies]
 arrow-data = { workspace = true }
@@ -76,9 +77,8 @@ rand = { version = "0.9.1", default-features = false, 
features = [
 ] }
 criterion = { workspace = true, default-features = false }
 tempfile = "3.3"
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["prettyprint"] }
 futures = "0.3.31"
-bytes = "1.10.1"
 async-stream = "0.3.6"
 apache-avro = "0.21.0"
 num-bigint = "0.4"
@@ -95,3 +95,7 @@ harness = false
 [[bench]]
 name = "avro_writer"
 harness = false
+
+[[bench]]
+name = "encoder"
+harness = false
diff --git a/arrow-avro/benches/encoder.rs b/arrow-avro/benches/encoder.rs
new file mode 100644
index 0000000000..2e0a7d1a3e
--- /dev/null
+++ b/arrow-avro/benches/encoder.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for the `arrow-avro` Encoder
+
+use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+use arrow_avro::writer::format::AvroSoeFormat;
+use arrow_avro::writer::{EncodedRows, WriterBuilder};
+use arrow_schema::{DataType, Field, Schema};
+use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, 
criterion_main};
+use once_cell::sync::Lazy;
+use std::hint::black_box;
+use std::sync::Arc;
+use std::time::Duration;
+
+const SIZES: [usize; 4] = [1_000, 10_000, 100_000, 1_000_000];
+
+/// Pre-generate EncodedRows for each size to avoid setup overhead in 
benchmarks.
+static ENCODED_DATA: Lazy<Vec<EncodedRows>> =
+    Lazy::new(|| SIZES.iter().map(|&n| make_encoded_rows(n)).collect());
+
+/// Create an EncodedRows with `n` rows of Int32 data.
+fn make_encoded_rows(n: usize) -> EncodedRows {
+    let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
+    let values: Vec<i32> = (0..n as i32).collect();
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(Int32Array::from(values)) as ArrayRef],
+    )
+    .unwrap();
+    let mut encoder = WriterBuilder::new(schema)
+        .build_encoder::<AvroSoeFormat>()
+        .unwrap();
+    encoder.encode(&batch).unwrap();
+    encoder.flush()
+}
+
+fn bench_row_access(c: &mut Criterion) {
+    let mut group = c.benchmark_group("row_access");
+    for (idx, &size) in SIZES.iter().enumerate() {
+        let encoded = &ENCODED_DATA[idx];
+        let num_rows = encoded.len();
+        // Configure sampling based on data size
+        match size {
+            100_000 | 1_000_000 => {
+                group
+                    .sample_size(20)
+                    .measurement_time(Duration::from_secs(10))
+                    .warm_up_time(Duration::from_secs(3));
+            }
+            _ => {
+                group.sample_size(100);
+            }
+        }
+        group.throughput(Throughput::Elements(num_rows as u64));
+        group.bench_function(BenchmarkId::from_parameter(size), |b| {
+            b.iter(|| {
+                for i in 0..num_rows {
+                    black_box(encoded.row(i).unwrap());
+                }
+            })
+        });
+    }
+    group.finish();
+}
+
+criterion_group! {
+    name = encoder;
+    config = Criterion::default().configure_from_args();
+    targets = bench_row_access
+}
+
+criterion_main!(encoder);
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 651d998fea..94f6d88447 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -42,10 +42,30 @@ use arrow_array::{
 use arrow_array::{Decimal32Array, Decimal64Array};
 use arrow_buffer::{ArrowNativeType, NullBuffer};
 use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, 
TimeUnit, UnionMode};
+use bytes::{BufMut, BytesMut};
 use std::io::Write;
 use std::sync::Arc;
 use uuid::Uuid;
 
+macro_rules! for_rows_with_prefix {
+    ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
+        match $prefix {
+            Some(prefix) => {
+                for $row in 0..$n {
+                    $out.write_all(prefix)
+                        .map_err(|e| AvroError::IoError(format!("write prefix: 
{e}"), e))?;
+                    $body
+                }
+            }
+            None => {
+                for $row in 0..$n {
+                    $body
+                }
+            }
+        }
+    }};
+}
+
 /// Encode a single Avro-`long` using ZigZag + variable length, buffered.
 ///
 /// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
@@ -608,17 +628,13 @@ impl<'a> FieldEncoder<'a> {
         // Compute the effective null state from writer-declared nullability 
and data nulls.
         let null_state = match nullability {
             None => NullState::NonNullable,
-            Some(null_order) => {
-                match array.nulls() {
-                    Some(nulls) if array.null_count() > 0 => {
-                        NullState::Nullable { nulls, null_order }
-                    }
-                    _ => NullState::NullableNoNulls {
-                        // Nullable site with no null buffer for this view
-                        union_value_byte: union_value_branch_byte(null_order, 
false),
-                    },
-                }
-            }
+            Some(null_order) => match array.nulls() {
+                Some(nulls) if array.null_count() > 0 => NullState::Nullable { 
nulls, null_order },
+                _ => NullState::NullableNoNulls {
+                    // Nullable site with no null buffer for this view
+                    union_value_byte: union_value_branch_byte(null_order, 
false),
+                },
+            },
         };
         Ok(Self {
             encoder,
@@ -797,24 +813,86 @@ impl RecordEncoder {
     ) -> Result<(), AvroError> {
         let mut column_encoders = self.prepare_for_batch(batch)?;
         let n = batch.num_rows();
-        match self.prefix {
-            Some(prefix) => {
-                for row in 0..n {
-                    out.write_all(prefix.as_slice())?;
-                    for enc in column_encoders.iter_mut() {
-                        enc.encode(out, row)?;
-                    }
-                }
+        let prefix = self.prefix.as_ref().map(|p| p.as_slice());
+        for_rows_with_prefix!(n, prefix, out, |row| {
+            for enc in column_encoders.iter_mut() {
+                enc.encode(out, row)?;
             }
-            None => {
-                for row in 0..n {
+        });
+        Ok(())
+    }
+
+    /// Encode rows into a single contiguous `BytesMut` and append row-end 
offsets.
+    ///
+    /// # Invariants
+    ///
+    /// * `offsets` must be non-empty and seeded with `0` at index 0.
+    /// * `offsets.last()` must equal `out.len()` on entry.
+    /// * On success, exactly `batch.num_rows()` additional offsets are 
pushed, and
+    ///   `offsets.last()` equals the new `out.len()`.
+    pub(crate) fn encode_rows(
+        &self,
+        batch: &RecordBatch,
+        row_capacity: usize,
+        out: &mut BytesMut,
+        offsets: &mut Vec<usize>,
+    ) -> Result<(), AvroError> {
+        let out_len = out.len();
+        if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
+            return Err(AvroError::General(
+                "encode_rows requires offsets to start with 0 and end at 
out.len()".to_string(),
+            ));
+        }
+        let n = batch.num_rows();
+        if n == 0 {
+            return Ok(());
+        }
+        if offsets.len().checked_add(n).is_none() {
+            return Err(AvroError::General(
+                "encode_rows cannot append offsets: too many rows".to_string(),
+            ));
+        }
+        let mut column_encoders = self.prepare_for_batch(batch)?;
+        offsets.reserve(n);
+        let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
+        let prefix_len = prefix_bytes.map_or(0, |p| p.len());
+        let per_row_hint = row_capacity.max(prefix_len);
+        if let Some(additional) = n
+            .checked_mul(per_row_hint)
+            .filter(|&a| out_len.checked_add(a).is_some())
+        {
+            out.reserve(additional);
+        }
+        let start_out_len = out.len();
+        let start_offsets_len = offsets.len();
+        let res = (|| -> Result<(), AvroError> {
+            let mut w = out.writer();
+            if let [enc0] = column_encoders.as_mut_slice() {
+                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
+                    enc0.encode(&mut w, row)?;
+                    offsets.push(w.get_ref().len());
+                });
+            } else {
+                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
                     for enc in column_encoders.iter_mut() {
-                        enc.encode(out, row)?;
+                        enc.encode(&mut w, row)?;
                     }
-                }
+                    offsets.push(w.get_ref().len());
+                });
             }
+            Ok(())
+        })();
+        if res.is_err() {
+            out.truncate(start_out_len);
+            offsets.truncate(start_offsets_len);
+        } else {
+            debug_assert_eq!(
+                *offsets.last().unwrap(),
+                out.len(),
+                "encode_rows: offsets/out length mismatch after successful 
encode"
+            );
         }
-        Ok(())
+        res
     }
 }
 
@@ -1958,6 +2036,12 @@ mod tests {
         }
     }
 
+    fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] 
{
+        let start = offsets[row];
+        let end = offsets[row + 1];
+        &buf[start..end]
+    }
+
     #[test]
     fn binary_encoder() {
         let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
@@ -3026,4 +3110,268 @@ mod tests {
             other => panic!("expected NullableNoNulls, got {other:?}"),
         }
     }
+
+    #[test]
+    fn encode_rows_single_column_int32() {
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+        let arr = Int32Array::from(vec![1, 2, 3]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![FieldBinding {
+                arrow_index: 0,
+                nullability: None,
+                plan: FieldPlan::Scalar,
+            }],
+            prefix: None,
+        };
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 16, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 4);
+        assert_eq!(*offsets.last().unwrap(), out.len());
+        assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
+        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
+        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
+    }
+
+    #[test]
+    fn encode_rows_multiple_columns() {
+        let schema = ArrowSchema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+        let int_arr = Int32Array::from(vec![10, 20]);
+        let str_arr = StringArray::from(vec!["hello", "world"]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(int_arr), Arc::new(str_arr)],
+        )
+        .unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![
+                FieldBinding {
+                    arrow_index: 0,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+                FieldBinding {
+                    arrow_index: 1,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+            ],
+            prefix: None,
+        };
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 32, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 3);
+        assert_eq!(*offsets.last().unwrap(), out.len());
+        let mut expected_row0 = Vec::new();
+        expected_row0.extend(avro_long_bytes(10));
+        expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
+        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
+        let mut expected_row1 = Vec::new();
+        expected_row1.extend(avro_long_bytes(20));
+        expected_row1.extend(avro_len_prefixed_bytes(b"world"));
+        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
+    }
+
+    #[test]
+    fn encode_rows_with_prefix() {
+        use crate::codec::AvroFieldBuilder;
+        use crate::schema::AvroSchema;
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+        let arr = Int32Array::from(vec![42]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let avro_schema = AvroSchema::try_from(&schema).unwrap();
+        let fingerprint = avro_schema
+            .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
+            .unwrap();
+        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
+            .build()
+            .unwrap();
+        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
+            .with_fingerprint(Some(fingerprint))
+            .build()
+            .unwrap();
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 32, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 2);
+        let row0 = row_slice(&out, &offsets, 0);
+        assert!(row0.len() > 10, "Row should contain prefix + encoded value");
+        assert_eq!(row0[0], 0xC3);
+        assert_eq!(row0[1], 0x01);
+    }
+
+    #[test]
+    fn encode_rows_empty_batch() {
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+        let arr = Int32Array::from(Vec::<i32>::new());
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![FieldBinding {
+                arrow_index: 0,
+                nullability: None,
+                plan: FieldPlan::Scalar,
+            }],
+            prefix: None,
+        };
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 16, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets, vec![0]);
+        assert!(out.is_empty());
+    }
+
+    #[test]
+    fn encode_rows_matches_encode_output() {
+        let schema = ArrowSchema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Float64, false),
+        ]);
+        let int_arr = Int64Array::from(vec![100i64, 200, 300]);
+        let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(int_arr), Arc::new(float_arr)],
+        )
+        .unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![
+                FieldBinding {
+                    arrow_index: 0,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+                FieldBinding {
+                    arrow_index: 1,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+            ],
+            prefix: None,
+        };
+        let mut stream_buf = Vec::new();
+        encoder.encode(&mut stream_buf, &batch).unwrap();
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 32, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 1 + batch.num_rows());
+        assert_bytes_eq(&out[..], &stream_buf);
+    }
+
+    #[test]
+    fn encode_rows_appends_to_existing_buffer() {
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+        let arr = Int32Array::from(vec![5, 6]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![FieldBinding {
+                arrow_index: 0,
+                nullability: None,
+                plan: FieldPlan::Scalar,
+            }],
+            prefix: None,
+        };
+        let mut out = BytesMut::new();
+        out.extend_from_slice(&[0xAA, 0xBB]);
+        let mut offsets: Vec<usize> = vec![0, out.len()];
+        encoder
+            .encode_rows(&batch, 16, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 4);
+        assert_eq!(*offsets.last().unwrap(), out.len());
+        assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
+        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
+        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
+    }
+
+    #[test]
+    fn encode_rows_nullable_column() {
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
true)]);
+        let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let encoder = RecordEncoder {
+            columns: vec![FieldBinding {
+                arrow_index: 0,
+                nullability: Some(Nullability::NullFirst),
+                plan: FieldPlan::Scalar,
+            }],
+            prefix: None,
+        };
+        let mut out = BytesMut::new();
+        let mut offsets: Vec<usize> = vec![0];
+        encoder
+            .encode_rows(&batch, 16, &mut out, &mut offsets)
+            .unwrap();
+        assert_eq!(offsets.len(), 4);
+        let mut expected_row0 = Vec::new();
+        expected_row0.extend(avro_long_bytes(1)); // union branch for value
+        expected_row0.extend(avro_long_bytes(1)); // value
+        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
+        let expected_row1 = avro_long_bytes(0); // union branch for null
+        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
+        let mut expected_row2 = Vec::new();
+        expected_row2.extend(avro_long_bytes(1)); // union branch for value
+        expected_row2.extend(avro_long_bytes(3)); // value
+        assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
+    }
+
+    #[test]
+    fn encode_prefix_write_error() {
+        use crate::codec::AvroFieldBuilder;
+        use crate::schema::{AvroSchema, FingerprintAlgorithm};
+        use std::io;
+
+        struct FailWriter {
+            failed: bool,
+        }
+
+        impl io::Write for FailWriter {
+            fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
+                if !self.failed {
+                    self.failed = true;
+                    Err(io::Error::other("fail write"))
+                } else {
+                    Ok(0)
+                }
+            }
+
+            fn flush(&mut self) -> io::Result<()> {
+                Ok(())
+            }
+        }
+
+        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+        let arr = Int32Array::from(vec![42]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(arr)]).unwrap();
+        let avro_schema = AvroSchema::try_from(&schema).unwrap();
+        let fingerprint = avro_schema
+            .fingerprint(FingerprintAlgorithm::Rabin)
+            .unwrap();
+        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
+            .build()
+            .unwrap();
+        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
+            .with_fingerprint(Some(fingerprint))
+            .build()
+            .unwrap();
+
+        let mut writer = FailWriter { failed: false };
+        let err = encoder.encode(&mut writer, &batch).unwrap_err();
+        let msg = format!("{err}");
+        assert!(msg.contains("write prefix"), "unexpected error: {msg}");
+    }
 }
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index e8e93a62c7..c969651f16 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -133,6 +133,40 @@ impl AvroFormat for AvroSoeFormat {
     }
 }
 
+/// Unframed Avro binary streaming format ("raw Avro record body bytes (no 
prefix, no OCF header)").
+///
+/// Each record written by the stream writer contains only the raw Avro
+/// record body bytes (i.e., the Avro binary encoding of the datum) with **no**
+/// per-record prefix and **no** Object Container File (OCF) header.
+///
+/// This format is useful when another transport provides framing (for example,
+/// length-delimited buffers) or when embedding Avro record payloads inside a
+/// larger envelope.
+#[derive(Debug, Default)]
+pub struct AvroBinaryFormat;
+
+impl AvroFormat for AvroBinaryFormat {
+    const NEEDS_PREFIX: bool = false;
+
+    fn start_stream<W: Write>(
+        &mut self,
+        _writer: &mut W,
+        _schema: &Schema,
+        compression: Option<CompressionCodec>,
+    ) -> Result<(), AvroError> {
+        if compression.is_some() {
+            return Err(AvroError::InvalidArgument(
+                "Compression not supported for Avro binary 
streaming".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn sync_marker(&self) -> Option<&[u8; 16]> {
+        None
+    }
+}
+
 #[inline]
 fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
     write_bytes(writer, s.as_bytes())
@@ -144,3 +178,47 @@ fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> 
Result<(), AvroError>
     writer.write_all(bytes)?;
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_schema::{DataType, Field, Schema};
+
+    fn test_schema() -> Schema {
+        Schema::new(vec![Field::new("x", DataType::Int32, false)])
+    }
+
+    #[test]
+    fn avro_binary_format_rejects_compression() {
+        let mut format = AvroBinaryFormat;
+        let schema = test_schema();
+        let err = format
+            .start_stream(
+                &mut Vec::<u8>::new(),
+                &schema,
+                Some(CompressionCodec::Snappy),
+            )
+            .unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Compression not supported for Avro binary 
streaming")
+        );
+    }
+
+    #[test]
+    fn avro_soe_format_rejects_compression() {
+        let mut format = AvroSoeFormat::default();
+        let schema = test_schema();
+        let err = format
+            .start_stream(
+                &mut Vec::<u8>::new(),
+                &schema,
+                Some(CompressionCodec::Snappy),
+            )
+            .unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Compression not supported for Avro SOE streaming")
+        );
+    }
+}
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index f4a2e60ed5..2a97f0545d 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -19,58 +19,141 @@
 //!
 //! # Overview
 //!
-//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two 
output
-//! formats are supported:
+//! Use this module to serialize Arrow [`arrow_array::RecordBatch`] values 
into Avro. Three output
+//! modes are supported:
 //!
-//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object 
Container File (OCF)**: a self‑describing
-//!   file with header (schema JSON + metadata), optional compression, data 
blocks, and
-//!   sync markers. See Avro 1.11.1 “Object Container Files.”
+//! * **[`crate::writer::AvroWriter`]** — writes an **Object Container File 
(OCF)**: a self‑describing
+//!   file with header (schema JSON and metadata), optional compression, data 
blocks, and
+//!   sync markers. See Avro 1.11.1 "Object Container Files."
 //!   
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
-//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a 
**Single Object Encoding (SOE) Stream** (“datum” bytes) without
+//!
+//! * **[`crate::writer::AvroStreamWriter`]** — writes a **Single Object 
Encoding (SOE) Stream** without
 //!   any container framing. This is useful when the schema is known 
out‑of‑band (i.e.,
 //!   via a registry) and you want minimal overhead.
 //!
-//! ## Which format should you use?
+//! * **[`crate::writer::Encoder`]** — a row-by-row encoder that buffers 
encoded records into a single
+//!   contiguous byte buffer and returns per-row [`bytes::Bytes`] slices.
+//!   Ideal for publishing individual messages to Kafka, Pulsar, or other 
message queues
+//!   where each message must be a self-contained Avro payload.
+//!
+//! ## Which writer should you use?
+//!
+//! | Use Case | Recommended Type |
+//! |----------|------------------|
+//! | Write an OCF file to disk | [`crate::writer::AvroWriter`] |
+//! | Stream records continuously to a file/socket | 
[`crate::writer::AvroStreamWriter`] |
+//! | Publish individual records to Kafka/Pulsar | [`crate::writer::Encoder`] |
+//! | Need per-row byte slices for custom framing | [`crate::writer::Encoder`] 
|
+//!
+//! ## Per-Record Prefix Formats
+//!
+//! For [`crate::writer::AvroStreamWriter`] and [`crate::writer::Encoder`], 
each record is automatically prefixed
+//! based on the fingerprint strategy:
 //!
-//! * Use **OCF** when you need a portable, self‑contained file. The schema 
travels with
-//!   the data, making it easy to read elsewhere.
-//! * Use the **SOE stream** when your surrounding protocol supplies schema 
information
-//!   (i.e., a schema registry). The writer automatically adds the per‑record 
prefix:
-//!   - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) 
followed by
-//!     an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body.
-//!     See Avro 1.11.1 "Single object encoding".
-//!     
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
-//!   - **Confluent wire format**: Each record is prefixed with magic byte 
`0x00` followed by
-//!     a **big‑endian** 4‑byte schema ID, then the Avro body. Use 
`FingerprintStrategy::Id(schema_id)`.
-//!     
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
-//!   - **Apicurio wire format**: Each record is prefixed with magic byte 
`0x00` followed by
-//!     a **big‑endian** 8‑byte schema ID, then the Avro body. Use 
`FingerprintStrategy::Id64(schema_id)`.
-//!     
<https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
+//! | Strategy | Prefix | Use Case |
+//! |----------|--------|----------|
+//! | `FingerprintStrategy::Rabin` (default) | `0xC3 0x01` + 8-byte LE Rabin 
fingerprint | Standard Avro SOE |
+//! | `FingerprintStrategy::Id(id)` | `0x00` + 4-byte BE schema ID | 
[Confluent Schema Registry] |
+//! | `FingerprintStrategy::Id64(id)` | `0x00` + 8-byte BE schema ID | 
[Apicurio Registry] |
 //!
-//! ## Choosing the Avro schema
+//! [Confluent Schema Registry]: 
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
+//! [Apicurio Registry]: 
https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry
+//!
+//! ## Choosing the Avro Schema
 //!
 //! By default, the writer converts your Arrow schema to Avro (including a 
top‑level record
 //! name). If you already have an Avro schema JSON you want to use verbatim, 
put it into the
-//! Arrow schema metadata under the `avro.schema` key before constructing the 
writer. The
-//! builder will use that schema instead of generating a new one (unless 
`strip_metadata` is
-//! set to true in the options).
+//! Arrow schema metadata under the 
[`SCHEMA_METADATA_KEY`](crate::schema::SCHEMA_METADATA_KEY)
+//! key before constructing the writer. The builder will use that schema 
instead of generating
+//! a new one.
 //!
 //! ## Compression
 //!
-//! For OCF, you may enable a compression codec via 
`WriterBuilder::with_compression`. The
-//! chosen codec is written into the file header and used for subsequent 
blocks. SOE stream
-//! writing doesn’t apply container‑level compression.
+//! For OCF ([`crate::writer::AvroWriter`]), you may enable a compression 
codec via
+//! [`crate::writer::WriterBuilder::with_compression`]. The chosen codec is 
written into the file header
+//! and used for subsequent blocks. SOE stream writing 
([`crate::writer::AvroStreamWriter`], [`crate::writer::Encoder`])
+//! does not apply container‑level compression.
+//!
+//! # Examples
+//!
+//! ## Writing an OCF File
+//!
+//! ```
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![
+//!     Field::new("id", DataType::Int64, false),
+//!     Field::new("name", DataType::Utf8, false),
+//! ]);
+//!
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![
+//!         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+//!         Arc::new(StringArray::from(vec!["alice", "bob"])) as ArrayRef,
+//!     ],
+//! )?;
+//!
+//! let mut writer = AvroWriter::new(Vec::<u8>::new(), schema)?;
+//! writer.write(&batch)?;
+//! writer.finish()?;
+//! let bytes = writer.into_inner();
+//! assert!(!bytes.is_empty());
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! ## Using the Row-by-Row Encoder for Message Queues
+//!
+//! ```
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
+//! use arrow_avro::schema::FingerprintStrategy;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! // Build an Encoder with Confluent wire format (schema ID = 42)
+//! let mut encoder = WriterBuilder::new(schema)
+//!     .with_fingerprint_strategy(FingerprintStrategy::Id(42))
+//!     .build_encoder::<AvroSoeFormat>()?;
+//!
+//! encoder.encode(&batch)?;
+//!
+//! // Get the buffered rows (zero-copy views into a single backing buffer)
+//! let rows = encoder.flush();
+//! assert_eq!(rows.len(), 3);
+//!
+//! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body
+//! for row in rows.iter() {
+//!     assert_eq!(row[0], 0x00); // Confluent magic byte
+//! }
+//! # Ok(())
+//! # }
+//! ```
 //!
 //! ---
 use crate::codec::AvroFieldBuilder;
 use crate::compression::CompressionCodec;
+use crate::errors::AvroError;
 use crate::schema::{
     AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, 
SCHEMA_METADATA_KEY,
 };
 use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
 use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
 use arrow_array::RecordBatch;
-use arrow_schema::{ArrowError, Schema};
+use arrow_schema::{Schema, SchemaRef};
+use bytes::{Bytes, BytesMut};
 use std::io::Write;
 use std::sync::Arc;
 
@@ -79,11 +162,163 @@ mod encoder;
 /// Logic for different Avro container file formats.
 pub mod format;
 
+/// A contiguous set of Avro encoded rows.
+///
+/// `EncodedRows` stores:
+/// - a single backing byte buffer (`bytes::Bytes`)
+/// - a `Vec<usize>` of row boundary offsets (length = `rows + 1`)
+///
+/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
+///
+/// For compatibility with APIs that require owned `Vec<u8>`, use:
+/// `let vecs: Vec<Vec<u8>> = rows.iter().map(|b| b.to_vec()).collect();`
+#[derive(Debug, Clone)]
+pub struct EncodedRows {
+    data: Bytes,
+    offsets: Vec<usize>,
+}
+
+impl EncodedRows {
+    /// Create a new `EncodedRows` from a backing buffer and row boundary 
offsets.
+    ///
+    /// `offsets` must have length `rows + 1`, and be monotonically 
non-decreasing.
+    /// The last offset should equal `data.len()`.
+    pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
+        Self { data, offsets }
+    }
+
+    /// Returns the number of encoded rows stored in this container.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.offsets.len().saturating_sub(1)
+    }
+
+    /// Returns `true` if this container holds no encoded rows.
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    /// Returns a reference to the single contiguous backing buffer.
+    ///
+    /// This buffer contains the payloads of all rows concatenated together.
+    ///
+    /// # Note
+    ///
+    /// To access individual row payloads, prefer using [`Self::row`] or 
[`Self::iter`]
+    /// rather than slicing this buffer manually.
+    #[inline]
+    pub fn bytes(&self) -> &Bytes {
+        &self.data
+    }
+
+    /// Returns the row boundary offsets.
+    ///
+    /// The returned slice always has the length `self.len() + 1`. The `n`th 
row payload
+    /// corresponds to `bytes[offsets[n] ... offsets[n+1]]`.
+    #[inline]
+    pub fn offsets(&self) -> &[usize] {
+        &self.offsets
+    }
+
+    /// Return the `n`th row as a zero-copy `Bytes` slice.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if `n` is out of bounds or if the internal offsets 
are invalid
+    /// (e.g., offsets are not within the backing buffer).
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::WriterBuilder;
+    /// use arrow_avro::writer::format::AvroSoeFormat;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
+    /// )?;
+    ///
+    /// let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+    /// encoder.encode(&batch)?;
+    /// let rows = encoder.flush();
+    ///
+    /// assert_eq!(rows.iter().count(), 2);
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
+        if n >= self.len() {
+            return Err(AvroError::General(format!(
+                "Row index {n} out of bounds for len {}",
+                self.len()
+            )));
+        }
+        // SAFETY:
+        // self.len() is defined as self.offsets.len().saturating_sub(1).
+        // The check `n >= self.len()` above ensures that `n < 
self.offsets.len() - 1`.
+        // Therefore, both `n` and `n + 1` are strictly within the bounds of 
`self.offsets`.
+        let (start, end) = unsafe {
+            (
+                *self.offsets.get_unchecked(n),
+                *self.offsets.get_unchecked(n + 1),
+            )
+        };
+        if start > end || end > self.data.len() {
+            return Err(AvroError::General(format!(
+                "Invalid row offsets for row {n}: start={start}, end={end}, 
data_len={}",
+                self.data.len()
+            )));
+        }
+        Ok(self.data.slice(start..end))
+    }
+
+    /// Iterate over rows as zero-copy `Bytes` slices.
+    ///
+    /// This iterator is infallible and is intended for the common case where
+    /// `EncodedRows` is produced by [`Encoder::flush`], which guarantees 
valid offsets.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::WriterBuilder;
+    /// use arrow_avro::writer::format::AvroSoeFormat;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
+    /// )?;
+    ///
+    /// let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+    /// encoder.encode(&batch)?;
+    /// let rows = encoder.flush();
+    ///
+    /// assert_eq!(rows.iter().count(), 2);
+    /// # Ok(())
+    /// # }
+    /// ```
+    #[inline]
+    pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
+        self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
+    }
+}
+
 /// Builder to configure and create a `Writer`.
 #[derive(Debug, Clone)]
 pub struct WriterBuilder {
     schema: Schema,
     codec: Option<CompressionCodec>,
+    row_capacity: Option<usize>,
     capacity: usize,
     fingerprint_strategy: Option<FingerprintStrategy>,
 }
@@ -99,6 +334,7 @@ impl WriterBuilder {
         Self {
             schema,
             codec: None,
+            row_capacity: None,
             capacity: 1024,
             fingerprint_strategy: None,
         }
@@ -117,30 +353,34 @@ impl WriterBuilder {
         self
     }
 
-    /// Sets the capacity for the given object and returns the modified 
instance.
+    /// Sets the expected capacity (in bytes) for internal buffers.
+    ///
+    /// This is used as a hint to pre-allocate staging buffers for writing.
     pub fn with_capacity(mut self, capacity: usize) -> Self {
         self.capacity = capacity;
         self
     }
 
-    /// Create a new `Writer` with specified `AvroFormat` and builder options.
-    /// Performs one‑time startup (header/stream init, encoder plan).
-    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
-    where
-        W: Write,
-        F: AvroFormat,
-    {
-        let mut format = F::default();
+    /// Sets the expected byte size for each encoded row.
+    ///
+    /// This setting affects [`Encoder`] created via 
[`build_encoder`](Self::build_encoder).
+    /// It is used as a hint to reduce reallocations when the typical encoded 
row size is known.
+    pub fn with_row_capacity(mut self, capacity: usize) -> Self {
+        self.row_capacity = Some(capacity);
+        self
+    }
+
+    fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>, 
RecordEncoder), AvroError> {
         let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
             Some(json) => AvroSchema::new(json.clone()),
             None => AvroSchema::try_from(&self.schema)?,
         };
         let maybe_fingerprint = if F::NEEDS_PREFIX {
-            match self.fingerprint_strategy {
-                Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
-                Some(FingerprintStrategy::Id64(id)) => 
Some(Fingerprint::Id64(id)),
+            match &self.fingerprint_strategy {
+                Some(FingerprintStrategy::Id(id)) => 
Some(Fingerprint::Id(*id)),
+                Some(FingerprintStrategy::Id64(id)) => 
Some(Fingerprint::Id64(*id)),
                 Some(strategy) => {
-                    
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
+                    
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
                 }
                 None => Some(
                     avro_schema
@@ -156,11 +396,48 @@ impl WriterBuilder {
             avro_schema.clone().json_string,
         );
         let schema = 
Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
-        format.start_stream(&mut writer, &schema, self.codec)?;
         let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
         let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
             .with_fingerprint(maybe_fingerprint)
             .build()?;
+        Ok((schema, encoder))
+    }
+
+    /// Build a new [`Encoder`] for the given [`AvroFormat`].
+    ///
+    /// `Encoder` only supports stream formats (no OCF sync markers). 
Attempting to build an
+    /// encoder with an OCF format (e.g. [`AvroOcfFormat`]) will return an 
error.
+    pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
+        if F::default().sync_marker().is_some() {
+            return Err(AvroError::InvalidArgument(
+                "Encoder only supports stream formats (no OCF header/sync 
marker)".to_string(),
+            ));
+        }
+        let (schema, encoder) = self.prepare_encoder::<F>()?;
+        Ok(Encoder {
+            schema,
+            encoder,
+            row_capacity: self.row_capacity,
+            buffer: BytesMut::with_capacity(self.capacity),
+            offsets: vec![0],
+        })
+    }
+
+    /// Build a new [`Writer`] with the specified [`AvroFormat`] and builder 
options.
+    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
+    where
+        W: Write,
+        F: AvroFormat,
+    {
+        let mut format = F::default();
+        if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
+            return Err(AvroError::InvalidArgument(
+                "AvroBinaryFormat is only supported with Encoder, use 
build_encoder instead"
+                    .to_string(),
+            ));
+        }
+        let (schema, encoder) = self.prepare_encoder::<F>()?;
+        format.start_stream(&mut writer, &schema, self.codec)?;
         Ok(Writer {
             writer,
             schema,
@@ -172,6 +449,110 @@ impl WriterBuilder {
     }
 }
 
+/// A row-by-row encoder for Avro *stream/message* formats (SOE / registry 
wire formats / raw binary).
+///
+/// Unlike [`Writer`], which emits a single continuous byte stream to a 
[`std::io::Write`] sink,
+/// `Encoder` tracks row boundaries during encoding and returns an 
[`EncodedRows`] containing:
+/// - one backing buffer (`Bytes`)
+/// - row boundary offsets
+///
+/// This enables zero-copy per-row payloads (for instance, one Kafka message 
per Arrow row) without
+/// re-encoding or decoding the byte stream to recover record boundaries.
+///
+/// ### Example
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
+/// use arrow_avro::schema::FingerprintStrategy;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// let schema = Schema::new(vec![Field::new("value", DataType::Int32, 
false)]);
+/// let batch = RecordBatch::try_new(
+///     Arc::new(schema.clone()),
+///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+/// )?;
+///
+/// // Configure the encoder (here: Confluent Wire Format with schema ID 100)
+/// let mut encoder = WriterBuilder::new(schema)
+///     .with_fingerprint_strategy(FingerprintStrategy::Id(100))
+///     .build_encoder::<AvroSoeFormat>()?;
+///
+/// // Encode the batch
+/// encoder.encode(&batch)?;
+///
+/// // Get the encoded rows
+/// let rows = encoder.flush();
+///
+/// // Convert to owned Vec<u8> payloads (e.g., for a Kafka producer)
+/// let payloads: Vec<Vec<u8>> = rows.iter().map(|row| row.to_vec()).collect();
+///
+/// assert_eq!(payloads.len(), 3);
+/// assert_eq!(payloads[0][0], 0x00); // Magic byte
+/// # Ok(())
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct Encoder {
+    schema: SchemaRef,
+    encoder: RecordEncoder,
+    row_capacity: Option<usize>,
+    buffer: BytesMut,
+    offsets: Vec<usize>,
+}
+
+impl Encoder {
+    /// Serialize one [`RecordBatch`] into the internal buffer.
+    pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
+        if batch.schema().fields() != self.schema.fields() {
+            return Err(AvroError::SchemaError(
+                "Schema of RecordBatch differs from Writer schema".to_string(),
+            ));
+        }
+        self.encoder.encode_rows(
+            batch,
+            self.row_capacity.unwrap_or(0),
+            &mut self.buffer,
+            &mut self.offsets,
+        )?;
+        Ok(())
+    }
+
+    /// A convenience method to write a slice of [`RecordBatch`] values.
+    pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), 
AvroError> {
+        for b in batches {
+            self.encode(b)?;
+        }
+        Ok(())
+    }
+
+    /// Drain and return all currently buffered encoded rows.
+    ///
+    /// The returned [`EncodedRows`] provides per-row payloads as `Bytes` 
slices.
+    pub fn flush(&mut self) -> EncodedRows {
+        let data = self.buffer.split().freeze();
+        let mut offsets = Vec::with_capacity(self.offsets.len());
+        offsets.append(&mut self.offsets);
+        self.offsets.push(0);
+        EncodedRows::new(data, offsets)
+    }
+
+    /// Returns the Arrow schema used by this encoder.
+    ///
+    /// The returned schema includes metadata with the Avro schema JSON under
+    /// the `avro.schema` key.
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Returns the number of encoded rows currently buffered.
+    pub fn buffered_len(&self) -> usize {
+        self.offsets.len().saturating_sub(1)
+    }
+}
+
 /// Generic Avro writer.
 ///
 /// This type is generic over the output Write sink (`W`) and the Avro format 
(`F`).
@@ -182,7 +563,7 @@ impl WriterBuilder {
 #[derive(Debug)]
 pub struct Writer<W: Write, F: AvroFormat> {
     writer: W,
-    schema: Arc<Schema>,
+    schema: SchemaRef,
     format: F,
     compression: Option<CompressionCodec>,
     capacity: usize,
@@ -290,7 +671,7 @@ impl<W: Write> Writer<W, AvroOcfFormat> {
     /// assert!(!bytes.is_empty());
     /// # Ok(()) }
     /// ```
-    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
+    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
         WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
     }
 
@@ -328,16 +709,16 @@ impl<W: Write> Writer<W, AvroSoeFormat> {
     /// assert!(!bytes.is_empty());
     /// # Ok(()) }
     /// ```
-    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
+    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
         WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
     }
 }
 
 impl<W: Write, F: AvroFormat> Writer<W, F> {
     /// Serialize one [`RecordBatch`] to the output.
-    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
         if batch.schema().fields() != self.schema.fields() {
-            return Err(ArrowError::SchemaError(
+            return Err(AvroError::SchemaError(
                 "Schema of RecordBatch differs from Writer schema".to_string(),
             ));
         }
@@ -350,7 +731,7 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
     /// A convenience method to write a slice of [`RecordBatch`].
     ///
     /// This is equivalent to calling `write` for each batch in the slice.
-    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), 
ArrowError> {
+    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), 
AvroError> {
         for b in batches {
             self.write(b)?;
         }
@@ -358,10 +739,10 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
     }
 
     /// Flush remaining buffered data and (for OCF) ensure the header is 
present.
-    pub fn finish(&mut self) -> Result<(), ArrowError> {
+    pub fn finish(&mut self) -> Result<(), AvroError> {
         self.writer
             .flush()
-            .map_err(|e| ArrowError::IoError(format!("Error flushing writer: 
{e}"), e))
+            .map_err(|e| AvroError::IoError(format!("Error flushing writer: 
{e}"), e))
     }
 
     /// Consume the writer, returning the underlying output object.
@@ -369,7 +750,7 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
         self.writer
     }
 
-    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> 
Result<(), ArrowError> {
+    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> 
Result<(), AvroError> {
         let mut buf = Vec::<u8>::with_capacity(self.capacity);
         self.encoder.encode(&mut buf, batch)?;
         let encoded = match self.compression {
@@ -380,14 +761,14 @@ impl<W: Write, F: AvroFormat> Writer<W, F> {
         write_long(&mut self.writer, encoded.len() as i64)?;
         self.writer
             .write_all(&encoded)
-            .map_err(|e| ArrowError::IoError(format!("Error writing Avro 
block: {e}"), e))?;
+            .map_err(|e| AvroError::IoError(format!("Error writing Avro block: 
{e}"), e))?;
         self.writer
             .write_all(sync)
-            .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: 
{e}"), e))?;
+            .map_err(|e| AvroError::IoError(format!("Error writing Avro sync: 
{e}"), e))?;
         Ok(())
     }
 
-    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
+    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
         self.encoder.encode(&mut self.writer, batch)?;
         Ok(())
     }
@@ -401,6 +782,8 @@ mod tests {
     use crate::schema::{AvroSchema, SchemaStore};
     use crate::test_util::arrow_test_data;
     use arrow::datatypes::TimeUnit;
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_array::builder::{Int32Builder, ListBuilder};
     #[cfg(feature = "avro_custom_types")]
     use arrow_array::types::{Int16Type, Int32Type, Int64Type};
     use arrow_array::types::{
@@ -408,16 +791,17 @@ mod tests {
         TimestampMillisecondType, TimestampNanosecondType,
     };
     use arrow_array::{
-        Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, 
RecordBatch,
-        StringArray, StructArray, UnionArray,
+        Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Int32Array, 
Int64Array,
+        PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray,
     };
     #[cfg(feature = "avro_custom_types")]
-    use arrow_array::{Int16Array, Int64Array, RunArray};
+    use arrow_array::{Int16Array, RunArray};
     use arrow_schema::UnionMode;
     #[cfg(not(feature = "avro_custom_types"))]
     use arrow_schema::{DataType, Field, Schema};
     #[cfg(feature = "avro_custom_types")]
     use arrow_schema::{DataType, Field, Schema};
+    use bytes::BytesMut;
     use std::collections::HashMap;
     use std::collections::HashSet;
     use std::fs::File;
@@ -461,7 +845,7 @@ mod tests {
     }
 
     #[test]
-    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> 
{
+    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
         let batch = RecordBatch::try_new(
             Arc::new(schema.clone()),
@@ -679,7 +1063,7 @@ mod tests {
     }
 
     #[test]
-    fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
+    fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
         use arrow_array::UnionArray;
         use arrow_buffer::Buffer;
         use arrow_schema::UnionFields;
@@ -724,7 +1108,7 @@ mod tests {
     }
 
     #[test]
-    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
+    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
         let batch = RecordBatch::try_new(
             Arc::new(schema.clone()),
@@ -758,7 +1142,7 @@ mod tests {
     }
 
     #[test]
-    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError> 
{
+    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
         let batch = RecordBatch::try_new(
             Arc::new(schema.clone()),
@@ -792,7 +1176,7 @@ mod tests {
     }
 
     #[test]
-    fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
+    fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
         let batch = make_batch();
         let buffer: Vec<u8> = Vec::new();
         let mut writer = AvroWriter::new(buffer, make_schema())?;
@@ -812,11 +1196,11 @@ mod tests {
         let buffer = Vec::<u8>::new();
         let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
         let err = writer.write(&batch).unwrap_err();
-        assert!(matches!(err, ArrowError::SchemaError(_)));
+        assert!(matches!(err, AvroError::SchemaError(_)));
     }
 
     #[test]
-    fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
+    fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
         let batch1 = make_batch();
         let batch2 = make_batch();
         let buffer = Vec::<u8>::new();
@@ -829,7 +1213,7 @@ mod tests {
     }
 
     #[test]
-    fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
+    fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
         let buffer = Vec::<u8>::new();
         let mut writer = AvroWriter::new(buffer, make_schema())?;
         writer.finish()?;
@@ -839,7 +1223,7 @@ mod tests {
     }
 
     #[test]
-    fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
+    fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
         let mut buf = Vec::new();
         write_long(&mut buf, 0)?;
         write_long(&mut buf, -1)?;
@@ -854,7 +1238,7 @@ mod tests {
     }
 
     #[test]
-    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
+    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
         for rel in files() {
             let path = arrow_test_data(rel);
             let rdr_file = File::open(&path).expect("open input avro");
@@ -903,7 +1287,7 @@ mod tests {
     }
 
     #[test]
-    fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
+    fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/nested_records.avro");
         let rdr_file = File::open(&path).expect("open nested_records.avro");
         let reader = ReaderBuilder::new()
@@ -937,7 +1321,7 @@ mod tests {
 
     #[test]
     #[cfg(feature = "snappy")]
-    fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
+    fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/nested_lists.snappy.avro");
         let rdr_file = File::open(&path).expect("open 
nested_lists.snappy.avro");
         let reader = ReaderBuilder::new()
@@ -972,7 +1356,7 @@ mod tests {
     }
 
     #[test]
-    fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
+    fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/simple_fixed.avro");
         let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
         let reader = ReaderBuilder::new()
@@ -1003,7 +1387,7 @@ mod tests {
     // Strict equality (schema + values) only when canonical extension types 
are enabled
     #[test]
     #[cfg(feature = "canonical_extension_types")]
-    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
+    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
         use arrow_schema::{DataType, IntervalUnit};
         let in_file =
             File::open("test/data/duration_uuid.avro").expect("open 
test/data/duration_uuid.avro");
@@ -1051,8 +1435,7 @@ mod tests {
     // Feature OFF: only values are asserted equal; schema may legitimately 
differ (uuid as fixed(16))
     #[test]
     #[cfg(not(feature = "canonical_extension_types"))]
-    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> 
Result<(), ArrowError>
-    {
+    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> 
Result<(), AvroError> {
         use arrow::datatypes::{DataType, IntervalUnit};
         use std::io::BufReader;
 
@@ -1133,7 +1516,7 @@ mod tests {
     #[test]
     // TODO: avoid requiring snappy for this file
     #[cfg(feature = "snappy")]
-    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
+    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
         // Load source Avro with Map fields
         let path = arrow_test_data("avro/nonnullable.impala.avro");
         let rdr_file = File::open(&path).expect("open 
avro/nonnullable.impala.avro");
@@ -1180,7 +1563,7 @@ mod tests {
     #[test]
     // TODO: avoid requiring snappy for these files
     #[cfg(feature = "snappy")]
-    fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
+    fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
         // (file, resolve via ARROW_TEST_DATA?)
         let files: [(&str, bool); 8] = [
             ("avro/fixed_length_decimal.avro", true), // fixed-backed -> 
Decimal128(25,2)
@@ -1229,7 +1612,7 @@ mod tests {
     }
 
     #[test]
-    fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
+    fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
         // 1. Read the new, more complex named references file.
         let path =
             
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
@@ -1526,7 +1909,7 @@ mod tests {
     }
 
     #[test]
-    fn test_union_roundtrip() -> Result<(), ArrowError> {
+    fn test_union_roundtrip() -> Result<(), AvroError> {
         let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
             .join("test/data/union_fields.avro")
             .to_string_lossy()
@@ -1560,7 +1943,7 @@ mod tests {
     }
 
     #[test]
-    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
+    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
         // Read the known-good enum file (same as reader::test_simple)
         let path = arrow_test_data("avro/simple_enum.avro");
         let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
@@ -1603,7 +1986,7 @@ mod tests {
     }
 
     #[test]
-    fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
+    fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
         let cap = 64 * 1024;
         let buffer = Vec::<u8>::new();
         let mut writer = WriterBuilder::new(make_schema())
@@ -1619,7 +2002,7 @@ mod tests {
     }
 
     #[test]
-    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), 
ArrowError> {
+    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), 
AvroError> {
         use arrow_array::{ArrayRef, Int32Array};
         use arrow_schema::{DataType, Field, Schema};
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
@@ -1639,7 +2022,7 @@ mod tests {
 
     #[cfg(feature = "avro_custom_types")]
     #[test]
-    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
+    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
         let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
             .join("test/data/duration_logical_types.avro")
             .to_string_lossy()
@@ -1703,7 +2086,7 @@ mod tests {
 
     #[cfg(feature = "avro_custom_types")]
     #[test]
-    fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
+    fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
         let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
         let run_values = Int32Array::from(vec![Some(1), Some(2), None, 
Some(3)]);
         let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
@@ -1747,7 +2130,7 @@ mod tests {
 
     #[cfg(feature = "avro_custom_types")]
     #[test]
-    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> 
Result<(), ArrowError>
+    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> 
Result<(), AvroError>
     {
         let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
         let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
@@ -1790,8 +2173,8 @@ mod tests {
 
     #[cfg(feature = "avro_custom_types")]
     #[test]
-    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
-    -> Result<(), ArrowError> {
+    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() 
-> Result<(), AvroError>
+    {
         let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
         let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
         let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
@@ -1830,7 +2213,7 @@ mod tests {
 
     #[cfg(feature = "avro_custom_types")]
     #[test]
-    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), 
ArrowError> {
+    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError> 
{
         let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
         let run_values = Int32Array::from(vec![Some(1), Some(2), None, 
Some(3)]);
         let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
@@ -1938,7 +2321,7 @@ mod tests {
 
     #[cfg(not(feature = "avro_custom_types"))]
     #[test]
-    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), 
ArrowError> {
+    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), 
AvroError> {
         use arrow_schema::{DataType, Field, Schema};
         let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
         let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), 
None, Some(3)]);
@@ -1983,7 +2366,7 @@ mod tests {
     #[cfg(not(feature = "avro_custom_types"))]
     #[test]
     fn 
test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
-    -> Result<(), ArrowError> {
+    -> Result<(), AvroError> {
         use arrow_schema::{DataType, Field, Schema};
         let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
         let run_values = arrow_array::StringArray::from(vec![Some("a"), None, 
Some("c")]);
@@ -2027,7 +2410,7 @@ mod tests {
     #[cfg(not(feature = "avro_custom_types"))]
     #[test]
     fn 
test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
-    -> Result<(), ArrowError> {
+    -> Result<(), AvroError> {
         use arrow_schema::{DataType, Field, Schema};
         let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
         let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
@@ -2071,7 +2454,7 @@ mod tests {
 
     #[cfg(not(feature = "avro_custom_types"))]
     #[test]
-    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> 
Result<(), ArrowError> {
+    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> 
Result<(), AvroError> {
         use arrow_schema::{DataType, Field, Schema};
         let run_ends = Int32Array::from(vec![2, 4, 6]);
         let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
@@ -2107,7 +2490,7 @@ mod tests {
     #[test]
     // TODO: avoid requiring snappy for this file
     #[cfg(feature = "snappy")]
-    fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
+    fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/nullable.impala.avro");
         let rdr_file = File::open(&path).expect("open 
avro/nullable.impala.avro");
         let reader = ReaderBuilder::new()
@@ -2142,7 +2525,7 @@ mod tests {
 
     #[test]
     #[cfg(feature = "snappy")]
-    fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
+    fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/datapage_v2.snappy.avro");
         let rdr_file = File::open(&path).expect("open 
avro/datapage_v2.snappy.avro");
         let reader = ReaderBuilder::new()
@@ -2172,7 +2555,7 @@ mod tests {
 
     #[test]
     #[cfg(feature = "snappy")]
-    fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
+    fn test_single_nan_roundtrip() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/single_nan.avro");
         let in_file = File::open(&path).expect("open avro/single_nan.avro");
         let reader = ReaderBuilder::new()
@@ -2202,7 +2585,7 @@ mod tests {
     #[test]
     // TODO: avoid requiring snappy for this file
     #[cfg(feature = "snappy")]
-    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
+    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/dict-page-offset-zero.avro");
         let rdr_file = File::open(&path).expect("open 
avro/dict-page-offset-zero.avro");
         let reader = ReaderBuilder::new()
@@ -2233,7 +2616,7 @@ mod tests {
 
     #[test]
     #[cfg(feature = "snappy")]
-    fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
+    fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
         let path = arrow_test_data("avro/repeated_no_annotation.avro");
         let in_file = File::open(&path).expect("open 
avro/repeated_no_annotation.avro");
         let reader = ReaderBuilder::new()
@@ -2262,7 +2645,7 @@ mod tests {
     }
 
     #[test]
-    fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
+    fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
         let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
             .join("test/data/nested_record_reuse.avro")
             .to_string_lossy()
@@ -2294,7 +2677,7 @@ mod tests {
     }
 
     #[test]
-    fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
+    fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
         let path =
             
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
         let rdr_file = std::fs::File::open(&path).expect("open 
test/data/enum_reuse.avro");
@@ -2324,7 +2707,7 @@ mod tests {
     }
 
     #[test]
-    fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
+    fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
         let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
             .join("test/data/comprehensive_e2e.avro");
         let rdr_file = File::open(&path).expect("open 
test/data/comprehensive_e2e.avro");
@@ -2355,7 +2738,7 @@ mod tests {
     }
 
     #[test]
-    fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
+    fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
         let schema = Schema::new(vec![
             Field::new("d32", DataType::Date32, false),
             Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), 
false),
@@ -2410,4 +2793,463 @@ mod tests {
         assert_eq!(roundtrip, batch);
         Ok(())
     }
+
+    fn make_encoder_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ])
+    }
+
+    fn make_encoder_batch(schema: &Schema) -> RecordBatch {
+        let a = Int32Array::from(vec![1, 2, 3]);
+        let b = Int32Array::from(vec![10, 20, 30]);
+        RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
+        )
+        .expect("failed to build test RecordBatch")
+    }
+
+    fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, 
AvroSchema), AvroError> {
+        let avro_json = r#"
+        {
+          "type": "record",
+          "name": "User",
+          "fields": [
+            { "name": "id",     "type": "long" },
+            { "name": "name",   "type": "string" },
+            { "name": "active", "type": "boolean" },
+            { "name": "tags",   "type": { "type": "array", "items": "int" } },
+            { "name": "opt",    "type": ["null", "string"], "default": null }
+          ]
+        }"#;
+        let avro_schema = AvroSchema::new(avro_json.to_string());
+        let mut md = HashMap::new();
+        md.insert(
+            SCHEMA_METADATA_KEY.to_string(),
+            avro_schema.json_string.clone(),
+        );
+        let item_field = Arc::new(Field::new(
+            Field::LIST_FIELD_DEFAULT_NAME,
+            DataType::Int32,
+            false,
+        ));
+        let schema = Schema::new_with_metadata(
+            vec![
+                Field::new("id", DataType::Int64, false),
+                Field::new("name", DataType::Utf8, false),
+                Field::new("active", DataType::Boolean, false),
+                Field::new("tags", DataType::List(item_field.clone()), false),
+                Field::new("opt", DataType::Utf8, true),
+            ],
+            md,
+        );
+        let id = Int64Array::from(vec![1, 2, 3]);
+        let name = StringArray::from(vec!["alice", "bob", "carol"]);
+        let active = BooleanArray::from(vec![true, false, true]);
+        let mut tags_builder = 
ListBuilder::new(Int32Builder::new()).with_field(item_field);
+        tags_builder.values().append_value(1);
+        tags_builder.values().append_value(2);
+        tags_builder.append(true);
+        tags_builder.append(true);
+        tags_builder.values().append_value(3);
+        tags_builder.append(true);
+        let tags = tags_builder.finish();
+        let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(id) as ArrayRef,
+                Arc::new(name) as ArrayRef,
+                Arc::new(active) as ArrayRef,
+                Arc::new(tags) as ArrayRef,
+                Arc::new(opt) as ArrayRef,
+            ],
+        )?;
+        Ok((schema, batch, avro_schema))
+    }
+
+    #[test]
+    fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
+        let schema = make_encoder_schema();
+        let batch = make_encoder_batch(&schema);
+        let mut stream = AvroStreamWriter::new(Vec::<u8>::new(), 
schema.clone())?;
+        stream.write(&batch)?;
+        stream.finish()?;
+        let stream_bytes = stream.into_inner();
+        let mut row_writer = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+        row_writer.encode(&batch)?;
+        let rows = row_writer.flush();
+        let row_bytes: Vec<u8> = rows.bytes().to_vec();
+        assert_eq!(stream_bytes, row_bytes);
+        Ok(())
+    }
+
+    #[test]
+    fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
+        let schema = make_encoder_schema();
+        let batch = make_encoder_batch(&schema);
+        let mut row_writer = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+        row_writer.encode(&batch)?;
+        assert_eq!(row_writer.buffered_len(), batch.num_rows());
+        let out1 = row_writer.flush();
+        assert_eq!(out1.len(), batch.num_rows());
+        assert_eq!(row_writer.buffered_len(), 0);
+        let out2 = row_writer.flush();
+        assert_eq!(out2.len(), 0);
+        Ok(())
+    }
+
+    #[test]
+    fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), 
AvroError> {
+        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+        let mut store = SchemaStore::new();
+        store.register(avro_schema.clone())?;
+        let mut row_writer = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+        row_writer.encode(&batch)?;
+        let rows = row_writer.flush();
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_batch_size(1024)
+            .build_decoder()?;
+        for row in rows.iter() {
+            let consumed = decoder.decode(row.as_ref())?;
+            assert_eq!(
+                consumed,
+                row.len(),
+                "decoder should consume the full row frame"
+            );
+        }
+        let out = decoder.flush()?.expect("decoded batch");
+        let expected = 
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+        let actual = pretty_format_batches(&[out])?.to_string();
+        assert_eq!(expected, actual);
+        Ok(())
+    }
+
+    #[test]
+    fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), 
AvroError> {
+        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+        let mut store = SchemaStore::new();
+        store.register(avro_schema.clone())?;
+        let mut row_writer = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+        row_writer.encode(&batch)?;
+        let rows = row_writer.flush();
+        // Build a contiguous stream and frame boundaries (prefix sums) from 
EncodedRows.
+        let mut stream: Vec<u8> = Vec::new();
+        let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
+        boundaries.push(0usize);
+        for row in rows.iter() {
+            stream.extend_from_slice(row.as_ref());
+            boundaries.push(stream.len());
+        }
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_batch_size(1024)
+            .build_decoder()?;
+        let mut buffered = BytesMut::new();
+        let chunk_rows = [1usize, 2, 3, 1, 4, 2];
+        let mut row_idx = 0usize;
+        let mut i = 0usize;
+        let n_rows = rows.len();
+        while row_idx < n_rows {
+            let take = chunk_rows[i % chunk_rows.len()];
+            i += 1;
+            let end_row = (row_idx + take).min(n_rows);
+            let byte_start = boundaries[row_idx];
+            let byte_end = boundaries[end_row];
+            buffered.extend_from_slice(&stream[byte_start..byte_end]);
+            loop {
+                let consumed = decoder.decode(&buffered)?;
+                if consumed == 0 {
+                    break;
+                }
+                let _ = buffered.split_to(consumed);
+            }
+            assert!(
+                buffered.is_empty(),
+                "expected decoder to consume the entire frame-aligned chunk"
+            );
+            row_idx = end_row;
+        }
+        let out = decoder.flush()?.expect("decoded batch");
+        let expected = 
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+        let actual = pretty_format_batches(&[out])?.to_string();
+        assert_eq!(expected, actual);
+        Ok(())
+    }
+
+    #[test]
+    fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> 
Result<(), AvroError> {
+        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+        let schema_id: u32 = 42;
+        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
+        store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
+        let mut row_writer = WriterBuilder::new(schema)
+            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
+            .build_encoder::<AvroSoeFormat>()?;
+        row_writer.encode(&batch)?;
+        let rows = row_writer.flush();
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_batch_size(1024)
+            .build_decoder()?;
+        for row in rows.iter() {
+            let consumed = decoder.decode(row.as_ref())?;
+            assert_eq!(consumed, row.len());
+        }
+        let out = decoder.flush()?.expect("decoded batch");
+        let expected = 
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+        let actual = pretty_format_batches(&[out])?.to_string();
+        assert_eq!(expected, actual);
+        Ok(())
+    }
+    #[test]
+    fn 
test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
+    -> Result<(), AvroError> {
+        use crate::writer::format::AvroBinaryFormat;
+        use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+        use arrow_schema::{DataType, Field, Schema};
+        use std::sync::Arc;
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let schema_ref = Arc::new(schema.clone());
+        let batch1 = RecordBatch::try_new(
+            schema_ref.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
+                Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
+            ],
+        )?;
+        let batch2 = RecordBatch::try_new(
+            schema_ref,
+            vec![
+                Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
+                Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
+            ],
+        )?;
+        let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
+        let empty = Encoder::flush(&mut encoder);
+        assert_eq!(EncodedRows::len(&empty), 0);
+        assert!(EncodedRows::is_empty(&empty));
+        assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
+        assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
+        assert_eq!(EncodedRows::iter(&empty).count(), 0);
+        let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b| 
b.to_vec()).collect();
+        assert!(empty_vecs.is_empty());
+        let batches = vec![batch1, batch2];
+        Encoder::encode_batches(&mut encoder, &batches)?;
+        assert_eq!(encoder.buffered_len(), 5);
+        let rows = Encoder::flush(&mut encoder);
+        assert_eq!(
+            encoder.buffered_len(),
+            0,
+            "Encoder::flush should reset the internal offsets"
+        );
+        assert_eq!(EncodedRows::len(&rows), 5);
+        assert!(!EncodedRows::is_empty(&rows));
+        let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
+        assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
+        let expected_rows: Vec<Vec<u8>> = vec![
+            vec![2, 20],
+            vec![4, 40],
+            vec![6, 60],
+            vec![8, 80],
+            vec![10, 100],
+        ];
+        let expected_stream: Vec<u8> = expected_rows.concat();
+        assert_eq!(
+            EncodedRows::bytes(&rows).as_ref(),
+            expected_stream.as_slice()
+        );
+        for (i, expected) in expected_rows.iter().enumerate() {
+            assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), 
expected.as_slice());
+        }
+        let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b| 
b.to_vec()).collect();
+        assert_eq!(iter_rows, expected_rows);
+        let recreated = EncodedRows::new(
+            EncodedRows::bytes(&rows).clone(),
+            EncodedRows::offsets(&rows).to_vec(),
+        );
+        assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
+        assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
+        assert_eq!(
+            EncodedRows::offsets(&recreated),
+            EncodedRows::offsets(&rows)
+        );
+        let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b| 
b.to_vec()).collect();
+        assert_eq!(rec_vecs, iter_rows);
+        let empty_again = Encoder::flush(&mut encoder);
+        assert!(EncodedRows::is_empty(&empty_again));
+        Ok(())
+    }
+
+    #[test]
+    fn test_writer_builder_build_rejects_avro_binary_format() {
+        use crate::writer::format::AvroBinaryFormat;
+        use arrow_schema::{DataType, Field, Schema};
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let err = WriterBuilder::new(schema)
+            .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
+            .unwrap_err();
+        match err {
+            AvroError::InvalidArgument(msg) => assert_eq!(
+                msg,
+                "AvroBinaryFormat is only supported with Encoder, use 
build_encoder instead"
+            ),
+            other => panic!("expected InvalidArgumentError, got {:?}", other),
+        }
+    }
+    #[test]
+    fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
+    -> Result<(), AvroError> {
+        use crate::writer::format::AvroBinaryFormat;
+        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+        let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
+        let expected = arrow::compute::concat_batches(&batch.schema(), 
&batches)?;
+        let mut binary_encoder =
+            
WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
+        binary_encoder.encode_batches(&batches)?;
+        let binary_rows = binary_encoder.flush();
+        assert_eq!(
+            binary_rows.len(),
+            expected.num_rows(),
+            "binary encoder row count mismatch"
+        );
+        let mut soe_encoder = 
WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
+        soe_encoder.encode_batches(&batches)?;
+        let soe_rows = soe_encoder.flush();
+        assert_eq!(
+            soe_rows.len(),
+            binary_rows.len(),
+            "SOE vs binary row count mismatch"
+        );
+        let mut store = SchemaStore::new(); // Rabin by default
+        let fp = store.register(avro_schema)?;
+        let fp_le_bytes = match fp {
+            Fingerprint::Rabin(v) => v.to_le_bytes(),
+            other => panic!("expected Rabin fingerprint from 
SchemaStore::new(), got {other:?}"),
+        };
+        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
+        const SOE_PREFIX_LEN: usize = 2 + 8;
+        for i in 0..binary_rows.len() {
+            let body = binary_rows.row(i)?;
+            let soe = soe_rows.row(i)?;
+            assert!(
+                soe.len() >= SOE_PREFIX_LEN,
+                "expected SOE row to include prefix"
+            );
+            assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
+            assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
+            assert_eq!(
+                &soe.as_ref()[SOE_PREFIX_LEN..],
+                body.as_ref(),
+                "SOE body bytes differ from AvroBinaryFormat body bytes (row 
{i})"
+            );
+        }
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_batch_size(1024)
+            .build_decoder()?;
+        for body in binary_rows.iter() {
+            let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
+            framed.extend_from_slice(&SOE_MAGIC);
+            framed.extend_from_slice(&fp_le_bytes);
+            framed.extend_from_slice(body.as_ref());
+            let consumed = decoder.decode(&framed)?;
+            assert_eq!(
+                consumed,
+                framed.len(),
+                "decoder should consume the full SOE-framed message"
+            );
+        }
+        let out = decoder.flush()?.expect("expected a decoded RecordBatch");
+        let expected_str = pretty_format_batches(&[expected])?.to_string();
+        let actual_str = pretty_format_batches(&[out])?.to_string();
+        assert_eq!(expected_str, actual_str);
+        Ok(())
+    }
+
+    #[test]
+    fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
+    -> Result<(), AvroError> {
+        use crate::writer::format::AvroBinaryFormat;
+        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
+        let mut encoder = 
WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
+        encoder.encode(&batch)?;
+        let rows = encoder.flush();
+        let mut store = SchemaStore::new();
+        let fp = store.register(avro_schema)?;
+        let fp_le_bytes = match fp {
+            Fingerprint::Rabin(v) => v.to_le_bytes(),
+            other => panic!("expected Rabin fingerprint from 
SchemaStore::new(), got {other:?}"),
+        };
+        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
+        const SOE_PREFIX_LEN: usize = 2 + 8;
+        let mut stream: Vec<u8> = Vec::new();
+        for body in rows.iter() {
+            let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
+                .try_into()
+                .expect("message length must fit in u32");
+            stream.extend_from_slice(&msg_len.to_le_bytes());
+            stream.extend_from_slice(&SOE_MAGIC);
+            stream.extend_from_slice(&fp_le_bytes);
+            stream.extend_from_slice(body.as_ref());
+        }
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_batch_size(1024)
+            .build_decoder()?;
+        let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
+        let mut pos = 0usize;
+        let mut i = 0usize;
+        let mut buffered = BytesMut::new();
+        let mut decoded_frames = 0usize;
+        while pos < stream.len() {
+            let take = chunk_sizes[i % chunk_sizes.len()];
+            i += 1;
+            let end = (pos + take).min(stream.len());
+            buffered.extend_from_slice(&stream[pos..end]);
+            pos = end;
+            loop {
+                if buffered.len() < 4 {
+                    break;
+                }
+                let msg_len =
+                    u32::from_le_bytes([buffered[0], buffered[1], buffered[2], 
buffered[3]])
+                        as usize;
+                if buffered.len() < 4 + msg_len {
+                    break;
+                }
+                let frame = buffered.split_to(4 + msg_len);
+                let payload = &frame[4..];
+                let consumed = decoder.decode(payload)?;
+                assert_eq!(
+                    consumed,
+                    payload.len(),
+                    "decoder should consume the full SOE-framed message"
+                );
+
+                decoded_frames += 1;
+            }
+        }
+        assert!(
+            buffered.is_empty(),
+            "expected transport framer to consume all bytes; leftover = {}",
+            buffered.len()
+        );
+        assert_eq!(
+            decoded_frames,
+            rows.len(),
+            "expected to decode exactly one frame per encoded row"
+        );
+        let out = decoder.flush()?.expect("expected decoded RecordBatch");
+        let expected_str = 
pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
+        let actual_str = pretty_format_batches(&[out])?.to_string();
+        assert_eq!(expected_str, actual_str);
+        Ok(())
+    }
 }


Reply via email to