Copilot commented on code in PR #9241:
URL: https://github.com/apache/arrow-rs/pull/9241#discussion_r2713080895


##########
arrow-avro/src/writer/async_writer.rs:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `async` API for writing [`RecordBatch`]es to Avro files
+//!
+//! This module provides async versions of the synchronous Avro writer.
+//! See [`crate::writer`] for API details on the synchronous version.
+//!
+//! # Example
+//!
+//! ```no_run
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AsyncAvroWriter;
+//! use bytes::Bytes;
+//!
+//! # #[tokio::main]
+//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! let mut buffer = Vec::new();
+//! let mut writer = AsyncAvroWriter::new(&mut buffer, schema)?;

Review Comment:
   The example code is missing `.await` on the constructor call. The 
`AsyncAvroWriter::new()` method is async and returns a future that must be 
awaited. The line should be: `let mut writer = AsyncAvroWriter::new(&mut 
buffer, schema).await?;`
   ```suggestion
   //! let mut writer = AsyncAvroWriter::new(&mut buffer, schema).await?;
   ```



##########
arrow-avro/src/writer/async_writer.rs:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `async` API for writing [`RecordBatch`]es to Avro files
+//!
+//! This module provides async versions of the synchronous Avro writer.
+//! See [`crate::writer`] for API details on the synchronous version.
+//!
+//! # Example
+//!
+//! ```no_run
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AsyncAvroWriter;
+//! use bytes::Bytes;
+//!
+//! # #[tokio::main]
+//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! let mut buffer = Vec::new();
+//! let mut writer = AsyncAvroWriter::new(&mut buffer, schema)?;
+//! writer.write(&batch).await?;
+//! writer.finish().await?;
+//!
+//! let bytes = buffer.clone();
+//! assert!(!bytes.is_empty());
+//! # Ok(()) }
+//! ```
+//!
+//! # Features
+//!
+//! - **OCF format**: Write Avro Object Container Files with schema, sync 
markers, and optional compression
+//! - **SOE format**: Write Avro Single Object Encoding streams for 
registry-based workflows
+//! - **Flexible sinks**: Works with any `AsyncWrite + Send` type or custom 
`AsyncFileWriter` implementations
+//! - **Compression**: Supports all compression codecs (Deflate, Snappy, 
ZStandard, etc.)
+//! - **Feature-gated**: Requires `async` feature to use
+
+use crate::compression::CompressionCodec;
+use crate::schema::{AvroSchema, FingerprintAlgorithm, FingerprintStrategy, 
SCHEMA_METADATA_KEY};
+use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
+use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
+use arrow_array::RecordBatch;
+use arrow_schema::{ArrowError, Schema};
+use bytes::Bytes;
+use futures::future::{BoxFuture, FutureExt};
+use std::sync::Arc;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+/// The asynchronous interface used by [`AsyncWriter`] to write Avro files.
+pub trait AsyncFileWriter: Send {
+    /// Write the provided bytes to the underlying writer
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>>;
+
+    /// Flush any buffered data and finish the writing process.
+    ///
+    /// After `complete` returns `Ok(())`, the caller SHOULD not call write 
again.
+    fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>>;
+}
+
+impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>> {
+        self.as_mut().write(bs)
+    }
+
+    fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>> {
+        self.as_mut().complete()
+    }
+}
+
+impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>> {
+        async move {
+            self.write_all(&bs)
+                .await
+                .map_err(|e| ArrowError::IoError(format!("Error writing bytes: 
{e}"), e))
+        }
+        .boxed()
+    }
+
+    fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>> {
+        async move {
+            self.flush()
+                .await
+                .map_err(|e| ArrowError::IoError(format!("Error flushing: 
{e}"), e))?;
+            self.shutdown()
+                .await
+                .map_err(|e| ArrowError::IoError(format!("Error closing: 
{e}"), e))
+        }
+        .boxed()
+    }
+}
+
+/// Builder to configure and create an async `AsyncWriter`.
+#[derive(Debug, Clone)]
+pub struct AsyncWriterBuilder {
+    schema: Schema,
+    codec: Option<CompressionCodec>,
+    capacity: usize,
+    fingerprint_strategy: Option<FingerprintStrategy>,
+}
+
+impl AsyncWriterBuilder {
+    /// Create a new builder with default settings.

Review Comment:
   The documentation for `AsyncWriterBuilder::new` should match the detail 
level of the sync `WriterBuilder::new` (lines 103-106 in mod.rs). Consider 
adding documentation that explains how the Avro schema is determined:
   1) If the Arrow schema metadata contains `avro::schema`, that JSON is used 
verbatim.
   2) Otherwise, the Arrow schema is converted to an Avro record schema.
   
   This would improve consistency and help users understand the schema 
resolution behavior.
   ```suggestion
       /// Create a new builder with default settings.
       ///
       /// The provided Arrow [`Schema`] is used to determine the Avro schema 
as follows:
       ///
       /// 1. If the Arrow schema metadata contains the key 
[`SCHEMA_METADATA_KEY`] (typically
       ///    `avro::schema`), its value is interpreted as JSON and used 
verbatim as the Avro
       ///    schema.
       /// 2. Otherwise, the Arrow schema is automatically converted into an 
Avro record schema.
       ///
       /// All other writer settings (compression codec, buffer capacity, 
fingerprint strategy)
       /// are initialized to their defaults and can be customized using the 
corresponding
       /// builder methods.
   ```



##########
arrow-avro/Cargo.toml:
##########
@@ -44,6 +44,7 @@ md5 = ["dep:md5"]
 sha256 = ["dep:sha2"]
 small_decimals = []
 avro_custom_types = ["dep:arrow-select"]
+async = ["tokio", "futures", "bytes"]

Review Comment:
   For consistency with other optional dependency features in this file (e.g., 
`md5 = ["dep:md5"]` on line 43), consider using explicit dependency prefixes: 
`async = ["dep:tokio", "dep:futures", "dep:bytes"]`. This follows the modern 
Cargo convention and makes it clearer that these are dependency activations 
rather than feature activations.
   ```suggestion
   async = ["dep:tokio", "dep:futures", "dep:bytes"]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to