jecsand838 opened a new issue, #9212:
URL: https://github.com/apache/arrow-rs/issues/9212
**Is your feature request related to a problem or challenge? Please describe
what you are trying to do.**
`arrow-avro` currently provides synchronous writing APIs (`Writer<W:
std::io::Write>` and the aliases `AvroWriter` for OCF and `AvroStreamWriter`
for SOE). In async-first applications (Tokio-based services, DataFusion
execution, writing to `object_store`, HTTP streaming responses, etc.), these
sync APIs force one of the following suboptimal approaches:
* Use blocking I/O (`std::fs::File`) inside an async runtime (risking thread
starvation / reduced throughput).
* Use `spawn_blocking` to isolate blocking writes (adds complexity and makes
backpressure/error handling harder).
* Buffer the entire output into memory (e.g., write to `Vec<u8>`) before
uploading to object storage (bad for large outputs).
In parallel, PR #8930 is adding an `arrow-avro` **AsyncReader** for Avro OCF
(with `object_store` integration). To enable end-to-end async pipelines,
`arrow-avro` should also have a corresponding **AsyncWriter**.
Concrete use cases:
* Write Avro OCF results directly to `tokio::fs::File` without blocking.
* Stream an Avro OCF response body over the network (chunked transfer,
back-pressure aware).
* Write Avro OCF to S3/GCS/Azure via `object_store` multipart upload
(symmetry with the new AsyncReader).
**Describe the solution you'd like**
Add a new `arrow-avro` async writer API that mirrors the patterns used by
Parquet’s async writer (`parquet/src/arrow/async_writer`) and interoperates
with the `arrow-avro` AsyncReader being introduced.
Proposed high-level design:
1. **New module and feature gating**
* Add `arrow-avro/src/writer/async_writer.rs` (or
`writer/async_writer/mod.rs`) behind `cfg(feature = "async")`
* Align feature flags with the AsyncReader PR (`async`, and optionally
`object_store`)
2. **Async sink abstraction (match Parquet’s pattern)**
* Introduce an `AsyncFileWriter` trait analogous to Parquet’s:
* Accepts `bytes::Bytes` payloads
* Returns `futures::future::BoxFuture<'_, Result<(), ArrowError>>`
* Has `write(Bytes)` and `complete()` methods
* Provide:
* `impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_>`
* A blanket impl for Tokio sinks (like Parquet): `impl<T:
tokio::io::AsyncWrite + Unpin + Send> AsyncFileWriter for T`
3. **AsyncWriter types and API parity**
* Implement a generic `AsyncWriter<W, F>` where `W: AsyncFileWriter` and `F:
AvroFormat`
* Provide aliases consistent with the existing sync types:
* `type AsyncAvroWriter<W> = AsyncWriter<W, AvroOcfFormat>`
* `type AsyncAvroStreamWriter<W> = AsyncWriter<W, AvroSoeFormat>`
* Public API should be intentionally close to the existing sync `Writer`:
* `WriterBuilder::build_async::<W, F>(writer)` (preferred: reuses existing
builder options)
* `async fn write(&mut self, batch: &RecordBatch) -> Result<(),
ArrowError>`
* `async fn write_batches(&mut self, batches: &[&RecordBatch]) ->
Result<(), ArrowError>`
* `async fn finish(&mut self) -> Result<(), ArrowError>` (flush +
`complete`)
* (Optional ergonomics) `async fn close(self) -> Result<W, ArrowError>` to
consume and return the underlying writer
4. **Implementation approach (buffer + flush, like Parquet)**
To avoid rewriting the encoder as “async”, reuse the existing synchronous
encoding logic by staging into an in-memory buffer and then pushing the bytes
into the async sink:
* On construction:
* Encode the OCF header via existing `AvroFormat::start_stream`, but write
it into a `Vec<u8>` buffer
* Flush that buffer to `AsyncFileWriter::write(Bytes)`
* On `write`:
* For **OCF**:
* Encode the batch into a `Vec<u8>` using the existing `RecordEncoder`
* Apply block-level compression exactly as the sync writer does
* Append block framing (`count`, `block_size`, `block_bytes`,
`sync_marker`) into a staging buffer
* Flush staging buffer to the async sink
* For **SOE stream**:
* Encode into a staging buffer and flush (preserving the existing prefix
behavior)
* Reuse `WriterBuilder` settings (`with_compression`, `with_capacity`,
`with_row_capacity`, `with_fingerprint_strategy`) to size buffers and control
prefixes/compression.
5. **Interop and test plan (explicit requirement)**
Add tests demonstrating interoperability with the AsyncReader in #8930:
* Roundtrip test: `AsyncAvroWriter` → bytes/file/object_store →
`AsyncAvroReader` yields the same `RecordBatch` values
* Also validate roundtrip with the existing sync `ReaderBuilder` to ensure
backwards compatibility
* Cover both:
* uncompressed OCF
* compressed OCF (as supported by `CompressionCodec`)
6. **object_store writer adapter**
For symmetry with `parquet::arrow::async_writer::ParquetObjectWriter` and
the AsyncReader’s `object_store` integration add an `AvroObjectWriter` (behind
`feature = "object_store"`) implementing `AsyncFileWriter` and performing
multipart upload.
Illustrative API sketch:
```rust
use tokio::fs::File;
use arrow_avro::writer::{WriterBuilder /*, AsyncAvroWriter */};
# async fn example(schema: arrow_schema::Schema, batch:
arrow_array::RecordBatch) -> Result<(), arrow_schema::ArrowError> {
let file = File::create("out.avro").await.map_err(|e|
arrow_schema::ArrowError::IoError(e.to_string(), e))?;
// via builder (preferred)
let mut w = WriterBuilder::new(schema)
.with_compression(None)
.with_capacity(64 * 1024)
.build_async::<_, arrow_avro::writer::AvroOcfFormat>(file)?;
// write and finish
w.write(&batch).await?;
w.finish().await?;
# Ok(())
# }
```
**Describe alternatives you've considered**
* **Use synchronous `AvroWriter` in `spawn_blocking`**
* Avoids blocking the async runtime directly, but complicates control flow
and backpressure, and can still become a bottleneck at scale.
* **Write to `Vec<u8>` with sync `AvroWriter` and then upload**
* Requires buffering entire outputs, increasing memory usage and latency.
**Additional context**
Notes / open questions:
* OCF sync markers are generated randomly today for `AvroOcfFormat`. If we
want any “byte-for-byte equality” tests between sync and async writers, we may
need a way to inject a deterministic sync marker in tests, or rely purely on
decode/roundtrip validation.
* Naming bikeshed: should the terminal method be `finish().await` (matching
the current sync `Writer`) or `close().await` (matching Parquet’s async
writer)? Either is fine, but consistent naming would be nice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]