This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1c491520a9 Serialize function signature simplifications (#8802)
1c491520a9 is described below
commit 1c491520a9d3b0e78a23874b1c2fdb1c000a0bc6
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Sat Jan 13 01:21:40 2024 +0300
Serialize function signature simplifications (#8802)
* Remove async from batch serializer
* Clippy
* Update orchestration.rs
---
datafusion/core/src/datasource/file_format/csv.rs | 7 +++----
datafusion/core/src/datasource/file_format/json.rs | 3 +--
datafusion/core/src/datasource/file_format/write/mod.rs | 4 +---
datafusion/core/src/datasource/file_format/write/orchestration.rs | 2 +-
datafusion/core/src/datasource/physical_plan/file_stream.rs | 4 +---
5 files changed, 7 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 7a0af3ff08..9cae6675e8 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -423,9 +423,8 @@ impl CsvSerializer {
}
}
-#[async_trait]
impl BatchSerializer for CsvSerializer {
- async fn serialize(&self, batch: RecordBatch, initial: bool) ->
Result<Bytes> {
+ fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let builder = self.builder.clone();
let header = self.header && initial;
@@ -829,7 +828,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new();
- let bytes = serializer.serialize(batch, true).await?;
+ let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
@@ -853,7 +852,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new().with_header(false);
- let bytes = serializer.serialize(batch, true).await?;
+ let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 8c02955ad3..0f6d3648d1 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -204,9 +204,8 @@ impl JsonSerializer {
}
}
-#[async_trait]
impl BatchSerializer for JsonSerializer {
- async fn serialize(&self, batch: RecordBatch, _initial: bool) ->
Result<Bytes> {
+ fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let mut writer = json::LineDelimitedWriter::new(&mut buffer);
writer.write(&batch)?;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs
b/datafusion/core/src/datasource/file_format/write/mod.rs
index c481f2accf..410a32a19c 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -29,7 +29,6 @@ use crate::error::Result;
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
-use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::path::Path;
@@ -144,12 +143,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
}
/// A trait that defines the methods required for a RecordBatch serializer.
-#[async_trait]
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
/// Parameter `initial` signals whether the given batch is the first batch.
/// This distinction is important for certain serializers (like CSV).
- async fn serialize(&self, batch: RecordBatch, initial: bool) ->
Result<Bytes>;
+ fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}
/// Returns an [`AbortableWrite`] which writes to the given object store
location
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 9b820a15b2..106b4e0d50 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -60,7 +60,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
let serializer_clone = serializer.clone();
let handle = tokio::spawn(async move {
let num_rows = batch.num_rows();
- let bytes = serializer_clone.serialize(batch, initial).await?;
+ let bytes = serializer_clone.serialize(batch, initial)?;
Ok((num_rows, bytes))
});
if initial {
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index bb4c831364..3536623976 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -535,7 +535,6 @@ mod tests {
use arrow_schema::Schema;
use datafusion_common::{internal_err, DataFusionError, Statistics};
- use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
@@ -989,9 +988,8 @@ mod tests {
bytes: Bytes,
}
- #[async_trait]
impl BatchSerializer for TestSerializer {
- async fn serialize(&self, _batch: RecordBatch, _initial: bool) ->
Result<Bytes> {
+ fn serialize(&self, _batch: RecordBatch, _initial: bool) ->
Result<Bytes> {
Ok(self.bytes.clone())
}
}