This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b85a39739e Make the BatchSerializer behind Arc to avoid unnecessary 
struct creation (#8666)
b85a39739e is described below

commit b85a39739e754576723ff4b1691c518a86335769
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Dec 29 15:51:02 2023 +0300

    Make the BatchSerializer behind Arc to avoid unnecessary struct creation 
(#8666)
    
    * Make the BatchSerializer behind Arc
    
    * Commenting
    
    * Review
    
    * Incorporate review suggestions
    
    * Use old names
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/datasource/file_format/csv.rs  | 69 ++++++++-----------
 datafusion/core/src/datasource/file_format/json.rs | 77 +++++++++-------------
 .../core/src/datasource/file_format/write/mod.rs   | 16 ++---
 .../datasource/file_format/write/orchestration.rs  | 74 +++++++++------------
 .../src/datasource/physical_plan/file_stream.rs    | 12 ++--
 5 files changed, 98 insertions(+), 150 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 4033bcd3b5..d4e63904bd 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -19,21 +19,9 @@
 
 use std::any::Any;
 use std::collections::HashSet;
-use std::fmt;
-use std::fmt::Debug;
+use std::fmt::{self, Debug};
 use std::sync::Arc;
 
-use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
-use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
-
-use bytes::{Buf, Bytes};
-use datafusion_physical_plan::metrics::MetricsSet;
-use futures::stream::BoxStream;
-use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
-use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
-
 use super::write::orchestration::stateless_multipart_put;
 use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -47,11 +35,20 @@ use crate::physical_plan::insert::{DataSink, FileSinkExec};
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
 
+use arrow::array::RecordBatch;
 use arrow::csv::WriterBuilder;
 use arrow::datatypes::{DataType, Field, Fields, Schema};
 use arrow::{self, datatypes::SchemaRef};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
+use datafusion_physical_plan::metrics::MetricsSet;
 
 use async_trait::async_trait;
+use bytes::{Buf, Bytes};
+use futures::stream::BoxStream;
+use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
+use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
 /// Character Separated Value `FileFormat` implementation.
 #[derive(Debug)]
@@ -400,8 +397,6 @@ impl Default for CsvSerializer {
 pub struct CsvSerializer {
     // CSV writer builder
     builder: WriterBuilder,
-    // Inner buffer for avoiding reallocation
-    buffer: Vec<u8>,
     // Flag to indicate whether there will be a header
     header: bool,
 }
@@ -412,7 +407,6 @@ impl CsvSerializer {
         Self {
             builder: WriterBuilder::new(),
             header: true,
-            buffer: Vec::with_capacity(4096),
         }
     }
 
@@ -431,21 +425,14 @@ impl CsvSerializer {
 
 #[async_trait]
 impl BatchSerializer for CsvSerializer {
-    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+    async fn serialize(&self, batch: RecordBatch, initial: bool) -> 
Result<Bytes> {
+        let mut buffer = Vec::with_capacity(4096);
         let builder = self.builder.clone();
-        let mut writer = builder.with_header(self.header).build(&mut 
self.buffer);
+        let header = self.header && initial;
+        let mut writer = builder.with_header(header).build(&mut buffer);
         writer.write(&batch)?;
         drop(writer);
-        self.header = false;
-        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
-    }
-
-    fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
-        let new_self = CsvSerializer::new()
-            .with_builder(self.builder.clone())
-            .with_header(self.header);
-        self.header = false;
-        Ok(Box::new(new_self))
+        Ok(Bytes::from(buffer))
     }
 }
 
@@ -488,13 +475,11 @@ impl CsvSink {
         let builder_clone = builder.clone();
         let options_clone = writer_options.clone();
         let get_serializer = move || {
-            let inner_clone = builder_clone.clone();
-            let serializer: Box<dyn BatchSerializer> = Box::new(
+            Arc::new(
                 CsvSerializer::new()
-                    .with_builder(inner_clone)
+                    .with_builder(builder_clone.clone())
                     .with_header(options_clone.writer_options.header()),
-            );
-            serializer
+            ) as _
         };
 
         stateless_multipart_put(
@@ -541,15 +526,15 @@ mod tests {
     use crate::physical_plan::collect;
     use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
     use crate::test_util::arrow_test_data;
+
     use arrow::compute::concat_batches;
-    use bytes::Bytes;
-    use chrono::DateTime;
     use datafusion_common::cast::as_string_array;
-    use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
-    use datafusion_common::FileType;
-    use datafusion_common::GetExt;
+    use datafusion_common::{internal_err, FileType, GetExt};
     use datafusion_expr::{col, lit};
+
+    use bytes::Bytes;
+    use chrono::DateTime;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
@@ -836,8 +821,8 @@ mod tests {
             .collect()
             .await?;
         let batch = concat_batches(&batches[0].schema(), &batches)?;
-        let mut serializer = CsvSerializer::new();
-        let bytes = serializer.serialize(batch).await?;
+        let serializer = CsvSerializer::new();
+        let bytes = serializer.serialize(batch, true).await?;
         assert_eq!(
             
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
             String::from_utf8(bytes.into()).unwrap()
@@ -860,8 +845,8 @@ mod tests {
             .collect()
             .await?;
         let batch = concat_batches(&batches[0].schema(), &batches)?;
-        let mut serializer = CsvSerializer::new().with_header(false);
-        let bytes = serializer.serialize(batch).await?;
+        let serializer = CsvSerializer::new().with_header(false);
+        let bytes = serializer.serialize(batch, true).await?;
         assert_eq!(
             
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
             String::from_utf8(bytes.into()).unwrap()
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index fcb1d5f8e5..3d437bc5fe 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -23,40 +23,34 @@ use std::fmt::Debug;
 use std::io::BufReader;
 use std::sync::Arc;
 
-use super::{FileFormat, FileScanConfig};
-use arrow::datatypes::Schema;
-use arrow::datatypes::SchemaRef;
-use arrow::json;
-use arrow::json::reader::infer_json_schema_from_iterator;
-use arrow::json::reader::ValueIter;
-use arrow_array::RecordBatch;
-use async_trait::async_trait;
-use bytes::Buf;
-
-use bytes::Bytes;
-use datafusion_physical_expr::PhysicalExpr;
-use datafusion_physical_expr::PhysicalSortRequirement;
-use datafusion_physical_plan::ExecutionPlan;
-use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
-
-use crate::datasource::physical_plan::FileGroupDisplay;
-use crate::physical_plan::insert::DataSink;
-use crate::physical_plan::insert::FileSinkExec;
-use crate::physical_plan::SendableRecordBatchStream;
-use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
-
 use super::write::orchestration::stateless_multipart_put;
-
+use super::{FileFormat, FileScanConfig};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::write::BatchSerializer;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::FileGroupDisplay;
 use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
 use crate::error::Result;
 use crate::execution::context::SessionState;
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
+use crate::physical_plan::{
+    DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
+};
 
+use arrow::datatypes::Schema;
+use arrow::datatypes::SchemaRef;
+use arrow::json;
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow_array::RecordBatch;
 use datafusion_common::{not_impl_err, DataFusionError, FileType};
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::ExecutionPlan;
+
+use async_trait::async_trait;
+use bytes::{Buf, Bytes};
+use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
 /// New line delimited JSON `FileFormat` implementation.
 #[derive(Debug)]
@@ -201,31 +195,22 @@ impl Default for JsonSerializer {
 }
 
 /// Define a struct for serializing Json records to a stream
-pub struct JsonSerializer {
-    // Inner buffer for avoiding reallocation
-    buffer: Vec<u8>,
-}
+pub struct JsonSerializer {}
 
 impl JsonSerializer {
     /// Constructor for the JsonSerializer object
     pub fn new() -> Self {
-        Self {
-            buffer: Vec::with_capacity(4096),
-        }
+        Self {}
     }
 }
 
 #[async_trait]
 impl BatchSerializer for JsonSerializer {
-    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
-        let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+    async fn serialize(&self, batch: RecordBatch, _initial: bool) -> 
Result<Bytes> {
+        let mut buffer = Vec::with_capacity(4096);
+        let mut writer = json::LineDelimitedWriter::new(&mut buffer);
         writer.write(&batch)?;
-        //drop(writer);
-        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
-    }
-
-    fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
-        Ok(Box::new(JsonSerializer::new()))
+        Ok(Bytes::from(buffer))
     }
 }
 
@@ -272,10 +257,7 @@ impl JsonSink {
         let writer_options = 
self.config.file_type_writer_options.try_into_json()?;
         let compression = &writer_options.compression;
 
-        let get_serializer = move || {
-            let serializer: Box<dyn BatchSerializer> = 
Box::new(JsonSerializer::new());
-            serializer
-        };
+        let get_serializer = move || Arc::new(JsonSerializer::new()) as _;
 
         stateless_multipart_put(
             data,
@@ -312,16 +294,17 @@ impl DataSink for JsonSink {
 #[cfg(test)]
 mod tests {
     use super::super::test_util::scan_format;
-    use datafusion_common::cast::as_int64_array;
-    use datafusion_common::stats::Precision;
-    use futures::StreamExt;
-    use object_store::local::LocalFileSystem;
-
     use super::*;
     use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
 
+    use datafusion_common::cast::as_int64_array;
+    use datafusion_common::stats::Precision;
+
+    use futures::StreamExt;
+    use object_store::local::LocalFileSystem;
+
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
         let config = SessionConfig::new().with_batch_size(2);
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs 
b/datafusion/core/src/datasource/file_format/write/mod.rs
index 68fe81ce91..c481f2accf 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -24,20 +24,16 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-
 use crate::error::Result;
 
 use arrow_array::RecordBatch;
-
 use datafusion_common::DataFusionError;
 
 use async_trait::async_trait;
 use bytes::Bytes;
-
 use futures::future::BoxFuture;
 use object_store::path::Path;
 use object_store::{MultipartId, ObjectStore};
-
 use tokio::io::AsyncWrite;
 
 pub(crate) mod demux;
@@ -149,15 +145,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for 
AbortableWrite<W> {
 
 /// A trait that defines the methods required for a RecordBatch serializer.
 #[async_trait]
-pub trait BatchSerializer: Unpin + Send {
+pub trait BatchSerializer: Sync + Send {
     /// Asynchronously serializes a `RecordBatch` and returns the serialized 
bytes.
-    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
-    /// Duplicates self to support serializing multiple batches in parallel on 
multiple cores
-    fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
-        Err(DataFusionError::NotImplemented(
-            "Parallel serialization is not implemented for this file 
type".into(),
-        ))
-    }
+    /// Parameter `initial` signals whether the given batch is the first batch.
+    /// This distinction is important for certain serializers (like CSV).
+    async fn serialize(&self, batch: RecordBatch, initial: bool) -> 
Result<Bytes>;
 }
 
 /// Returns an [`AbortableWrite`] which writes to the given object store 
location
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 120e27ecf6..9b820a15b2 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -21,28 +21,25 @@
 
 use std::sync::Arc;
 
+use super::demux::start_demuxer_task;
+use super::{create_writer, AbortableWrite, BatchSerializer};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::physical_plan::FileSinkConfig;
 use crate::error::Result;
 use crate::physical_plan::SendableRecordBatchStream;
 
 use arrow_array::RecordBatch;
-
-use datafusion_common::DataFusionError;
-
-use bytes::Bytes;
+use datafusion_common::{internal_datafusion_err, internal_err, 
DataFusionError};
 use datafusion_execution::TaskContext;
 
+use bytes::Bytes;
 use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{self, Receiver};
 use tokio::task::{JoinHandle, JoinSet};
 use tokio::try_join;
 
-use super::demux::start_demuxer_task;
-use super::{create_writer, AbortableWrite, BatchSerializer};
-
 type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
-type SerializerType = Box<dyn BatchSerializer>;
+type SerializerType = Arc<dyn BatchSerializer>;
 
 /// Serializes a single data stream in parallel and writes to an ObjectStore
 /// concurrently. Data order is preserved. In the event of an error,
@@ -50,33 +47,28 @@ type SerializerType = Box<dyn BatchSerializer>;
 /// so that the caller may handle aborting failed writes.
 pub(crate) async fn serialize_rb_stream_to_object_store(
     mut data_rx: Receiver<RecordBatch>,
-    mut serializer: Box<dyn BatchSerializer>,
+    serializer: Arc<dyn BatchSerializer>,
     mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
 ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
     let (tx, mut rx) =
         mpsc::channel::<JoinHandle<Result<(usize, Bytes), 
DataFusionError>>>(100);
-
     let serialize_task = tokio::spawn(async move {
+        // Some serializers (like CSV) handle the first batch differently than
+        // subsequent batches, so we track that here.
+        let mut initial = true;
         while let Some(batch) = data_rx.recv().await {
-            match serializer.duplicate() {
-                Ok(mut serializer_clone) => {
-                    let handle = tokio::spawn(async move {
-                        let num_rows = batch.num_rows();
-                        let bytes = serializer_clone.serialize(batch).await?;
-                        Ok((num_rows, bytes))
-                    });
-                    tx.send(handle).await.map_err(|_| {
-                        DataFusionError::Internal(
-                            "Unknown error writing to object store".into(),
-                        )
-                    })?;
-                }
-                Err(_) => {
-                    return Err(DataFusionError::Internal(
-                        "Unknown error writing to object store".into(),
-                    ))
-                }
+            let serializer_clone = serializer.clone();
+            let handle = tokio::spawn(async move {
+                let num_rows = batch.num_rows();
+                let bytes = serializer_clone.serialize(batch, initial).await?;
+                Ok((num_rows, bytes))
+            });
+            if initial {
+                initial = false;
             }
+            tx.send(handle).await.map_err(|_| {
+                internal_datafusion_err!("Unknown error writing to object 
store")
+            })?;
         }
         Ok(())
     });
@@ -120,7 +112,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
         Err(_) => {
             return Err((
                 writer,
-                DataFusionError::Internal("Unknown error writing to object 
store".into()),
+                internal_datafusion_err!("Unknown error writing to object 
store"),
             ))
         }
     };
@@ -171,9 +163,9 @@ pub(crate) async fn stateless_serialize_and_write_files(
                 // this thread, so we cannot clean it up (hence 
any_abort_errors is true)
                 any_errors = true;
                 any_abort_errors = true;
-                triggering_error = Some(DataFusionError::Internal(format!(
+                triggering_error = Some(internal_datafusion_err!(
                     "Unexpected join error while serializing file {e}"
-                )));
+                ));
             }
         }
     }
@@ -190,24 +182,24 @@ pub(crate) async fn stateless_serialize_and_write_files(
             false => {
                 writer.shutdown()
                     .await
-                    .map_err(|_| DataFusionError::Internal("Error encountered 
while finalizing writes! Partial results may have been written to 
ObjectStore!".into()))?;
+                    .map_err(|_| internal_datafusion_err!("Error encountered 
while finalizing writes! Partial results may have been written to 
ObjectStore!"))?;
             }
         }
     }
 
     if any_errors {
         match any_abort_errors{
-            true => return Err(DataFusionError::Internal("Error encountered 
during writing to ObjectStore and failed to abort all writers. Partial result 
may have been written.".into())),
+            true => return internal_err!("Error encountered during writing to 
ObjectStore and failed to abort all writers. Partial result may have been 
written."),
             false => match triggering_error {
                 Some(e) => return Err(e),
-                None => return Err(DataFusionError::Internal("Unknown Error 
encountered during writing to ObjectStore. All writers succesfully 
aborted.".into()))
+                None => return internal_err!("Unknown Error encountered during 
writing to ObjectStore. All writers succesfully aborted.")
             }
         }
     }
 
     tx.send(row_count).map_err(|_| {
-        DataFusionError::Internal(
-            "Error encountered while sending row count back to file 
sink!".into(),
+        internal_datafusion_err!(
+            "Error encountered while sending row count back to file sink!"
         )
     })?;
     Ok(())
@@ -220,7 +212,7 @@ pub(crate) async fn stateless_multipart_put(
     data: SendableRecordBatchStream,
     context: &Arc<TaskContext>,
     file_extension: String,
-    get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+    get_serializer: Box<dyn Fn() -> Arc<dyn BatchSerializer> + Send>,
     config: &FileSinkConfig,
     compression: FileCompressionType,
 ) -> Result<u64> {
@@ -264,8 +256,8 @@ pub(crate) async fn stateless_multipart_put(
             .send((rb_stream, serializer, writer))
             .await
             .map_err(|_| {
-                DataFusionError::Internal(
-                    "Writer receive file bundle channel closed 
unexpectedly!".into(),
+                internal_datafusion_err!(
+                    "Writer receive file bundle channel closed unexpectedly!"
                 )
             })?;
     }
@@ -288,9 +280,7 @@ pub(crate) async fn stateless_multipart_put(
     }
 
     let total_count = rx_row_cnt.await.map_err(|_| {
-        DataFusionError::Internal(
-            "Did not receieve row count from write coordinater".into(),
-        )
+        internal_datafusion_err!("Did not receieve row count from write 
coordinater")
     })?;
 
     Ok(total_count)
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs 
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 99fb088b66..bb4c831364 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -518,10 +518,8 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
 
 #[cfg(test)]
 mod tests {
-    use arrow_schema::Schema;
-    use datafusion_common::internal_err;
-    use datafusion_common::DataFusionError;
-    use datafusion_common::Statistics;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use std::sync::Arc;
 
     use super::*;
     use crate::datasource::file_format::write::BatchSerializer;
@@ -534,8 +532,8 @@ mod tests {
         test::{make_partition, object_store::register_test_store},
     };
 
-    use std::sync::atomic::{AtomicUsize, Ordering};
-    use std::sync::Arc;
+    use arrow_schema::Schema;
+    use datafusion_common::{internal_err, DataFusionError, Statistics};
 
     use async_trait::async_trait;
     use bytes::Bytes;
@@ -993,7 +991,7 @@ mod tests {
 
     #[async_trait]
     impl BatchSerializer for TestSerializer {
-        async fn serialize(&mut self, _batch: RecordBatch) -> Result<Bytes> {
+        async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> 
Result<Bytes> {
             Ok(self.bytes.clone())
         }
     }

Reply via email to