This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c491520a9 Serialize function signature simplifications (#8802)
1c491520a9 is described below

commit 1c491520a9d3b0e78a23874b1c2fdb1c000a0bc6
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Sat Jan 13 01:21:40 2024 +0300

    Serialize function signature simplifications (#8802)
    
    * Remove async from batch serializer
    
    * Clippy
    
    * Update orchestration.rs
---
 datafusion/core/src/datasource/file_format/csv.rs                 | 7 +++----
 datafusion/core/src/datasource/file_format/json.rs                | 3 +--
 datafusion/core/src/datasource/file_format/write/mod.rs           | 4 +---
 datafusion/core/src/datasource/file_format/write/orchestration.rs | 2 +-
 datafusion/core/src/datasource/physical_plan/file_stream.rs       | 4 +---
 5 files changed, 7 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 7a0af3ff08..9cae6675e8 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -423,9 +423,8 @@ impl CsvSerializer {
     }
 }
 
-#[async_trait]
 impl BatchSerializer for CsvSerializer {
-    async fn serialize(&self, batch: RecordBatch, initial: bool) -> 
Result<Bytes> {
+    fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
         let mut buffer = Vec::with_capacity(4096);
         let builder = self.builder.clone();
         let header = self.header && initial;
@@ -829,7 +828,7 @@ mod tests {
             .await?;
         let batch = concat_batches(&batches[0].schema(), &batches)?;
         let serializer = CsvSerializer::new();
-        let bytes = serializer.serialize(batch, true).await?;
+        let bytes = serializer.serialize(batch, true)?;
         assert_eq!(
             
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
             String::from_utf8(bytes.into()).unwrap()
@@ -853,7 +852,7 @@ mod tests {
             .await?;
         let batch = concat_batches(&batches[0].schema(), &batches)?;
         let serializer = CsvSerializer::new().with_header(false);
-        let bytes = serializer.serialize(batch, true).await?;
+        let bytes = serializer.serialize(batch, true)?;
         assert_eq!(
             
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
             String::from_utf8(bytes.into()).unwrap()
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 8c02955ad3..0f6d3648d1 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -204,9 +204,8 @@ impl JsonSerializer {
     }
 }
 
-#[async_trait]
 impl BatchSerializer for JsonSerializer {
-    async fn serialize(&self, batch: RecordBatch, _initial: bool) -> 
Result<Bytes> {
+    fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
         let mut buffer = Vec::with_capacity(4096);
         let mut writer = json::LineDelimitedWriter::new(&mut buffer);
         writer.write(&batch)?;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs 
b/datafusion/core/src/datasource/file_format/write/mod.rs
index c481f2accf..410a32a19c 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -29,7 +29,6 @@ use crate::error::Result;
 use arrow_array::RecordBatch;
 use datafusion_common::DataFusionError;
 
-use async_trait::async_trait;
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use object_store::path::Path;
@@ -144,12 +143,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for 
AbortableWrite<W> {
 }
 
 /// A trait that defines the methods required for a RecordBatch serializer.
-#[async_trait]
 pub trait BatchSerializer: Sync + Send {
     /// Asynchronously serializes a `RecordBatch` and returns the serialized 
bytes.
     /// Parameter `initial` signals whether the given batch is the first batch.
     /// This distinction is important for certain serializers (like CSV).
-    async fn serialize(&self, batch: RecordBatch, initial: bool) -> 
Result<Bytes>;
+    fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
 }
 
 /// Returns an [`AbortableWrite`] which writes to the given object store 
location
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 9b820a15b2..106b4e0d50 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -60,7 +60,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
             let serializer_clone = serializer.clone();
             let handle = tokio::spawn(async move {
                 let num_rows = batch.num_rows();
-                let bytes = serializer_clone.serialize(batch, initial).await?;
+                let bytes = serializer_clone.serialize(batch, initial)?;
                 Ok((num_rows, bytes))
             });
             if initial {
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs 
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index bb4c831364..3536623976 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -535,7 +535,6 @@ mod tests {
     use arrow_schema::Schema;
     use datafusion_common::{internal_err, DataFusionError, Statistics};
 
-    use async_trait::async_trait;
     use bytes::Bytes;
     use futures::StreamExt;
 
@@ -989,9 +988,8 @@ mod tests {
         bytes: Bytes,
     }
 
-    #[async_trait]
     impl BatchSerializer for TestSerializer {
-        async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> 
Result<Bytes> {
+        fn serialize(&self, _batch: RecordBatch, _initial: bool) -> 
Result<Bytes> {
             Ok(self.bytes.clone())
         }
     }

Reply via email to