jecsand838 opened a new issue, #9212:
URL: https://github.com/apache/arrow-rs/issues/9212

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   `arrow-avro` currently provides synchronous writing APIs (`Writer<W: 
std::io::Write>` and the aliases `AvroWriter` for OCF and `AvroStreamWriter` 
for SOE). In async-first applications (Tokio-based services, DataFusion 
execution, writing to `object_store`, HTTP streaming responses, etc.), these 
sync APIs force one of the following suboptimal approaches:
   * Use blocking I/O (`std::fs::File`) inside an async runtime (risking thread 
starvation / reduced throughput).
   * Use `spawn_blocking` to isolate blocking writes (adds complexity and makes 
backpressure/error handling harder).
   * Buffer the entire output into memory (e.g., write to `Vec<u8>`) before 
uploading to object storage (bad for large outputs).
   
   In parallel, PR #8930 is adding an `arrow-avro` **AsyncReader** for Avro OCF 
(with `object_store` integration). To enable end-to-end async pipelines, 
`arrow-avro` should also have a corresponding **AsyncWriter**.
   
   Concrete use cases:
   
   * Write Avro OCF results directly to `tokio::fs::File` without blocking.
   * Stream an Avro OCF response body over the network (chunked transfer, 
back-pressure aware).
   * Write Avro OCF to S3/GCS/Azure via `object_store` multipart upload 
(symmetry with the new AsyncReader).
   
   **Describe the solution you'd like**
   
   Add a new `arrow-avro` async writer API that mirrors the patterns used by 
Parquet’s async writer (`parquet/src/arrow/async_writer`) and interoperates 
with the `arrow-avro` AsyncReader being introduced.
   
   Proposed high-level design:
   
   1. **New module and feature gating**
   
   * Add `arrow-avro/src/writer/async_writer.rs` (or 
`writer/async_writer/mod.rs`) behind `cfg(feature = "async")`
   * Align feature flags with the AsyncReader PR (`async`, and optionally 
`object_store`)
   
   2. **Async sink abstraction (match Parquet’s pattern)**
   
   * Introduce an `AsyncFileWriter` trait analogous to Parquet’s:
     * Accepts `bytes::Bytes` payloads
     * Returns `futures::future::BoxFuture<'_, Result<(), ArrowError>>`
     * Has `write(Bytes)` and `complete()` methods
   * Provide:
   
     * `impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_>`
     * A blanket impl for Tokio sinks (like Parquet): `impl<T: 
tokio::io::AsyncWrite + Unpin + Send> AsyncFileWriter for T`
   
   3. **AsyncWriter types and API parity**
   
   * Implement a generic `AsyncWriter<W, F>` where `W: AsyncFileWriter` and `F: 
AvroFormat`
   * Provide aliases consistent with the existing sync types:
     * `type AsyncAvroWriter<W> = AsyncWriter<W, AvroOcfFormat>`
     * `type AsyncAvroStreamWriter<W> = AsyncWriter<W, AvroSoeFormat>`
   * Public API should be intentionally close to the existing sync `Writer`:
     * `WriterBuilder::build_async::<W, F>(writer)` (preferred: reuses existing 
builder options)
     * `async fn write(&mut self, batch: &RecordBatch) -> Result<(), 
ArrowError>`
     * `async fn write_batches(&mut self, batches: &[&RecordBatch]) -> 
Result<(), ArrowError>`
     * `async fn finish(&mut self) -> Result<(), ArrowError>` (flush + 
`complete`)
     * (Optional ergonomics) `async fn close(self) -> Result<W, ArrowError>` to 
consume and return the underlying writer
   
   4. **Implementation approach (buffer + flush, like Parquet)**
   
      To avoid rewriting the encoder as “async”, reuse the existing synchronous 
encoding logic by staging into an in-memory buffer and then pushing the bytes 
into the async sink:
   * On construction:
     * Encode the OCF header via existing `AvroFormat::start_stream`, but write 
it into a `Vec<u8>` buffer
     * Flush that buffer to `AsyncFileWriter::write(Bytes)`
   * On `write`:
     * For **OCF**:
       * Encode the batch into a `Vec<u8>` using the existing `RecordEncoder`
       * Apply block-level compression exactly as the sync writer does
       * Append block framing (`count`, `block_size`, `block_bytes`, 
`sync_marker`) into a staging buffer
       * Flush staging buffer to the async sink
     * For **SOE stream**:
       * Encode into a staging buffer and flush (preserving the existing prefix 
behavior)
   * Reuse `WriterBuilder` settings (`with_compression`, `with_capacity`, 
`with_row_capacity`, `with_fingerprint_strategy`) to size buffers and control 
prefixes/compression.
   
   5. **Interop and test plan (explicit requirement)**
   
      Add tests demonstrating interoperability with the AsyncReader in #8930:
   * Roundtrip test: `AsyncAvroWriter` → bytes/file/object_store → 
`AsyncAvroReader` yields the same `RecordBatch` values
   * Also validate roundtrip with the existing sync `ReaderBuilder` to ensure 
backwards compatibility
   * Cover both:
     * uncompressed OCF
     * compressed OCF (as supported by `CompressionCodec`)
   
   6. **object_store writer adapter**
   
      For symmetry with `parquet::arrow::async_writer::ParquetObjectWriter` and 
the AsyncReader’s `object_store` integration add an `AvroObjectWriter` (behind 
`feature = "object_store"`) implementing `AsyncFileWriter` and performing 
multipart upload.
   
   Illustrative API sketch:
   
   ```rust
   use tokio::fs::File;
   use arrow_avro::writer::{WriterBuilder /*, AsyncAvroWriter */};
   
   # async fn example(schema: arrow_schema::Schema, batch: 
arrow_array::RecordBatch) -> Result<(), arrow_schema::ArrowError> {
   let file = File::create("out.avro").await.map_err(|e| 
arrow_schema::ArrowError::IoError(e.to_string(), e))?;
   
   // via builder (preferred)
   let mut w = WriterBuilder::new(schema)
       .with_compression(None)
       .with_capacity(64 * 1024)
       .build_async::<_, arrow_avro::writer::AvroOcfFormat>(file)?;
   
   // write and finish
   w.write(&batch).await?;
   w.finish().await?;
   # Ok(())
   # }
   ```
   
   **Describe alternatives you've considered**
   
   * **Use synchronous `AvroWriter` in `spawn_blocking`**
   
     * Avoids blocking the async runtime directly, but complicates control flow 
and backpressure, and can still become a bottleneck at scale.
   * **Write to `Vec<u8>` with sync `AvroWriter` and then upload**
     * Requires buffering entire outputs, increasing memory usage and latency.
   
   **Additional context**
   
   Notes / open questions:
   * OCF sync markers are generated randomly today for `AvroOcfFormat`. If we 
want any “byte-for-byte equality” tests between sync and async writers, we may 
need a way to inject a deterministic sync marker in tests, or rely purely on 
decode/roundtrip validation.
   * Naming bikeshed: should the terminal method be `finish().await` (matching 
the current sync `Writer`) or `close().await` (matching Parquet’s async 
writer)? Either is fine, but consistent naming would be nice.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to