GaneshPatil7517 opened a new pull request, #9241:
URL: https://github.com/apache/arrow-rs/pull/9241
## Summary
This PR implements a fully functional async Avro writer for `arrow-avro`,
providing a symmetric and idiomatic async API that mirrors the existing
synchronous `Writer` while following Arrow's established async patterns
(consistent with Parquet's async writer).
**Fixes: apache/arrow-rs#9212**
## Design Overview
### New Types
- **`AsyncFileWriter` trait**: Minimal abstraction for async I/O sinks
- `write(Bytes) -> BoxFuture<Result<()>>`
- `complete() -> BoxFuture<Result<()>>`
- Blanket impl for `tokio::io::AsyncWrite + Unpin + Send`
- Matches Parquet async_writer pattern
- **`AsyncWriter<W, F>` struct**: Generic async writer
- `W`: Any `AsyncFileWriter` (tokio types, custom implementations, etc.)
- `F`: Any `AvroFormat` (OCF or SOE)
- Full API parity with sync `Writer`
- **Type aliases**:
- `AsyncAvroWriter<W>` - OCF (Object Container File) format
- `AsyncAvroStreamWriter<W>` - SOE (Single Object Encoding) format
- **`AsyncWriterBuilder`**: Configuration builder
- `with_compression()` - All codecs (Deflate, Snappy, ZStandard, etc.)
- `with_fingerprint_strategy()` - SOE fingerprinting
- `with_capacity()` - Buffer sizing
- Async `build()` method
### Key Implementation Details
1. **Sync Encoding Reuse**: Leverages existing `RecordEncoder` - no
re-implementation of Avro encoding
2. **Buffer Staging**: Encodes to `Vec<u8>`, converts to `Bytes`, flushes
asynchronously
3. **Header on Construction**: OCF headers written and flushed immediately
in builder
4. **Compression Preserved**: Identical logic to sync writer for compression
application
5. **Feature Gated**: Requires `async` feature with `tokio`, `futures`,
`bytes` dependencies
## API Parity
The async writer provides identical methods to the sync writer:
```rust
// Create writers
let mut writer = AsyncAvroWriter::new(sink, schema).await?;
let mut writer = AsyncAvroStreamWriter::new(sink, schema).await?;
// Write batches
writer.write(&batch).await?;
writer.write_batches(&[&batch1, &batch2]).await?;
// Finish and retrieve sink
writer.finish().await?;
let sink = writer.into_inner();
```
## Test Coverage
7 comprehensive tests covering:
- ✅ OCF round-trip with sync reader
- ✅ SOE stream writing
- ✅ Multiple batch accumulation
- ✅ Builder configuration
- ✅ Writer consumption with `into_inner()`
- ✅ Schema mismatch error handling
- ✅ Deflate compression
All tests verify data integrity through round-trip with sync
`ReaderBuilder`, not byte-for-byte equality (OCF sync markers are random).
## Feature Gating
```toml
[features]
async = ["tokio", "futures", "bytes"]
[dependencies]
tokio = { version = "1", features = ["io-util"], optional = true }
futures = { version = "0.3.31", optional = true }
bytes = { version = "1.10.1", optional = true }
```
All async code is guarded with `#[cfg(feature = "async")]`.
## Commits
1. `[arrow-avro] add async writer module with feature gating` - Core
implementation
2. `[arrow-avro] add comprehensive async writer tests` - Test coverage
3. `[arrow-avro] format code with rustfmt` - Formatting
4. `[arrow-avro] improve async writer documentation` - Docs
## Testing
```bash
# Run async writer tests
cargo test -p arrow-avro --lib --features async async_writer
# Build with all features
cargo build -p arrow-avro --all-features
# Check documentation
cargo doc -p arrow-avro --features async --no-deps
```
**All tests pass: ✅**
**Clippy: No warnings ✅**
**Rustfmt: Clean ✅**
## Files Modified
- `arrow-avro/Cargo.toml` - Added async feature and dependencies
- `arrow-avro/src/writer/mod.rs` - Module exports
- `arrow-avro/src/writer/async_writer.rs` - New module (486 lines, 7 tests)
## Future Work
- Optional `object_store` feature for cloud storage integration (S3, GCS,
Azure)
- Would follow same pattern as Parquet's `ParquetObjectWriter`
- Can be added in follow-up PR
## Example Usage
```rust
use arrow_avro::writer::AsyncAvroWriter;
use tokio::fs::File;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = File::create("output.avro").await?;
let mut writer = AsyncAvroWriter::new(file, schema).await?;
writer.write(&batch1).await?;
writer.write(&batch2).await?;
writer.finish().await?;
Ok(())
}
```
## References
- Issue: apache/arrow-rs#9212
- Similar async pattern: Parquet `AsyncArrowWriter`
(parquet/src/arrow/async_writer/)
- Sync API: arrow-avro/src/writer/mod.rs
- Related: AsyncAvroReader PR #8930
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]