This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b85a39739e Make the BatchSerializer behind Arc to avoid unnecessary
struct creation (#8666)
b85a39739e is described below
commit b85a39739e754576723ff4b1691c518a86335769
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Dec 29 15:51:02 2023 +0300
Make the BatchSerializer behind Arc to avoid unnecessary struct creation
(#8666)
* Make the BatchSerializer behind Arc
* Commenting
* Review
* Incorporate review suggestions
* Use old names
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/core/src/datasource/file_format/csv.rs | 69 ++++++++-----------
datafusion/core/src/datasource/file_format/json.rs | 77 +++++++++-------------
.../core/src/datasource/file_format/write/mod.rs | 16 ++---
.../datasource/file_format/write/orchestration.rs | 74 +++++++++------------
.../src/datasource/physical_plan/file_stream.rs | 12 ++--
5 files changed, 98 insertions(+), 150 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 4033bcd3b5..d4e63904bd 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -19,21 +19,9 @@
use std::any::Any;
use std::collections::HashSet;
-use std::fmt;
-use std::fmt::Debug;
+use std::fmt::{self, Debug};
use std::sync::Arc;
-use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
-use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
-
-use bytes::{Buf, Bytes};
-use datafusion_physical_plan::metrics::MetricsSet;
-use futures::stream::BoxStream;
-use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
-use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-
use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -47,11 +35,20 @@ use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
+use datafusion_physical_plan::metrics::MetricsSet;
use async_trait::async_trait;
+use bytes::{Buf, Bytes};
+use futures::stream::BoxStream;
+use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
+use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
@@ -400,8 +397,6 @@ impl Default for CsvSerializer {
pub struct CsvSerializer {
// CSV writer builder
builder: WriterBuilder,
- // Inner buffer for avoiding reallocation
- buffer: Vec<u8>,
// Flag to indicate whether there will be a header
header: bool,
}
@@ -412,7 +407,6 @@ impl CsvSerializer {
Self {
builder: WriterBuilder::new(),
header: true,
- buffer: Vec::with_capacity(4096),
}
}
@@ -431,21 +425,14 @@ impl CsvSerializer {
#[async_trait]
impl BatchSerializer for CsvSerializer {
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ async fn serialize(&self, batch: RecordBatch, initial: bool) ->
Result<Bytes> {
+ let mut buffer = Vec::with_capacity(4096);
let builder = self.builder.clone();
- let mut writer = builder.with_header(self.header).build(&mut
self.buffer);
+ let header = self.header && initial;
+ let mut writer = builder.with_header(header).build(&mut buffer);
writer.write(&batch)?;
drop(writer);
- self.header = false;
- Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
- }
-
- fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
- let new_self = CsvSerializer::new()
- .with_builder(self.builder.clone())
- .with_header(self.header);
- self.header = false;
- Ok(Box::new(new_self))
+ Ok(Bytes::from(buffer))
}
}
@@ -488,13 +475,11 @@ impl CsvSink {
let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move || {
- let inner_clone = builder_clone.clone();
- let serializer: Box<dyn BatchSerializer> = Box::new(
+ Arc::new(
CsvSerializer::new()
- .with_builder(inner_clone)
+ .with_builder(builder_clone.clone())
.with_header(options_clone.writer_options.header()),
- );
- serializer
+ ) as _
};
stateless_multipart_put(
@@ -541,15 +526,15 @@ mod tests {
use crate::physical_plan::collect;
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
+
use arrow::compute::concat_batches;
- use bytes::Bytes;
- use chrono::DateTime;
use datafusion_common::cast::as_string_array;
- use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
- use datafusion_common::FileType;
- use datafusion_common::GetExt;
+ use datafusion_common::{internal_err, FileType, GetExt};
use datafusion_expr::{col, lit};
+
+ use bytes::Bytes;
+ use chrono::DateTime;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
@@ -836,8 +821,8 @@ mod tests {
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
- let mut serializer = CsvSerializer::new();
- let bytes = serializer.serialize(batch).await?;
+ let serializer = CsvSerializer::new();
+ let bytes = serializer.serialize(batch, true).await?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
@@ -860,8 +845,8 @@ mod tests {
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
- let mut serializer = CsvSerializer::new().with_header(false);
- let bytes = serializer.serialize(batch).await?;
+ let serializer = CsvSerializer::new().with_header(false);
+ let bytes = serializer.serialize(batch, true).await?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index fcb1d5f8e5..3d437bc5fe 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -23,40 +23,34 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
-use super::{FileFormat, FileScanConfig};
-use arrow::datatypes::Schema;
-use arrow::datatypes::SchemaRef;
-use arrow::json;
-use arrow::json::reader::infer_json_schema_from_iterator;
-use arrow::json::reader::ValueIter;
-use arrow_array::RecordBatch;
-use async_trait::async_trait;
-use bytes::Buf;
-
-use bytes::Bytes;
-use datafusion_physical_expr::PhysicalExpr;
-use datafusion_physical_expr::PhysicalSortRequirement;
-use datafusion_physical_plan::ExecutionPlan;
-use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
-
-use crate::datasource::physical_plan::FileGroupDisplay;
-use crate::physical_plan::insert::DataSink;
-use crate::physical_plan::insert::FileSinkExec;
-use crate::physical_plan::SendableRecordBatchStream;
-use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
-
use super::write::orchestration::stateless_multipart_put;
-
+use super::{FileFormat, FileScanConfig};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::FileGroupDisplay;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
use crate::execution::context::SessionState;
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
+use crate::physical_plan::{
+ DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
+};
+use arrow::datatypes::Schema;
+use arrow::datatypes::SchemaRef;
+use arrow::json;
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow_array::RecordBatch;
use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::ExecutionPlan;
+
+use async_trait::async_trait;
+use bytes::{Buf, Bytes};
+use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
@@ -201,31 +195,22 @@ impl Default for JsonSerializer {
}
/// Define a struct for serializing Json records to a stream
-pub struct JsonSerializer {
- // Inner buffer for avoiding reallocation
- buffer: Vec<u8>,
-}
+pub struct JsonSerializer {}
impl JsonSerializer {
/// Constructor for the JsonSerializer object
pub fn new() -> Self {
- Self {
- buffer: Vec::with_capacity(4096),
- }
+ Self {}
}
}
#[async_trait]
impl BatchSerializer for JsonSerializer {
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
- let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+ async fn serialize(&self, batch: RecordBatch, _initial: bool) ->
Result<Bytes> {
+ let mut buffer = Vec::with_capacity(4096);
+ let mut writer = json::LineDelimitedWriter::new(&mut buffer);
writer.write(&batch)?;
- //drop(writer);
- Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
- }
-
- fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
- Ok(Box::new(JsonSerializer::new()))
+ Ok(Bytes::from(buffer))
}
}
@@ -272,10 +257,7 @@ impl JsonSink {
let writer_options =
self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;
- let get_serializer = move || {
- let serializer: Box<dyn BatchSerializer> =
Box::new(JsonSerializer::new());
- serializer
- };
+ let get_serializer = move || Arc::new(JsonSerializer::new()) as _;
stateless_multipart_put(
data,
@@ -312,16 +294,17 @@ impl DataSink for JsonSink {
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
- use datafusion_common::cast::as_int64_array;
- use datafusion_common::stats::Precision;
- use futures::StreamExt;
- use object_store::local::LocalFileSystem;
-
use super::*;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
+ use datafusion_common::cast::as_int64_array;
+ use datafusion_common::stats::Precision;
+
+ use futures::StreamExt;
+ use object_store::local::LocalFileSystem;
+
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs
b/datafusion/core/src/datasource/file_format/write/mod.rs
index 68fe81ce91..c481f2accf 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -24,20 +24,16 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-
use crate::error::Result;
use arrow_array::RecordBatch;
-
use datafusion_common::DataFusionError;
use async_trait::async_trait;
use bytes::Bytes;
-
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{MultipartId, ObjectStore};
-
use tokio::io::AsyncWrite;
pub(crate) mod demux;
@@ -149,15 +145,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
+pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
- /// Duplicates self to support serializing multiple batches in parallel on
multiple cores
- fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
- Err(DataFusionError::NotImplemented(
- "Parallel serialization is not implemented for this file
type".into(),
- ))
- }
+ /// Parameter `initial` signals whether the given batch is the first batch.
+ /// This distinction is important for certain serializers (like CSV).
+ async fn serialize(&self, batch: RecordBatch, initial: bool) ->
Result<Bytes>;
}
/// Returns an [`AbortableWrite`] which writes to the given object store
location
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index 120e27ecf6..9b820a15b2 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -21,28 +21,25 @@
use std::sync::Arc;
+use super::demux::start_demuxer_task;
+use super::{create_writer, AbortableWrite, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
-
-use datafusion_common::DataFusionError;
-
-use bytes::Bytes;
+use datafusion_common::{internal_datafusion_err, internal_err,
DataFusionError};
use datafusion_execution::TaskContext;
+use bytes::Bytes;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::{JoinHandle, JoinSet};
use tokio::try_join;
-use super::demux::start_demuxer_task;
-use super::{create_writer, AbortableWrite, BatchSerializer};
-
type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
-type SerializerType = Box<dyn BatchSerializer>;
+type SerializerType = Arc<dyn BatchSerializer>;
/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
@@ -50,33 +47,28 @@ type SerializerType = Box<dyn BatchSerializer>;
/// so that the caller may handle aborting failed writes.
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
- mut serializer: Box<dyn BatchSerializer>,
+ serializer: Arc<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes),
DataFusionError>>>(100);
-
let serialize_task = tokio::spawn(async move {
+ // Some serializers (like CSV) handle the first batch differently than
+ // subsequent batches, so we track that here.
+ let mut initial = true;
while let Some(batch) = data_rx.recv().await {
- match serializer.duplicate() {
- Ok(mut serializer_clone) => {
- let handle = tokio::spawn(async move {
- let num_rows = batch.num_rows();
- let bytes = serializer_clone.serialize(batch).await?;
- Ok((num_rows, bytes))
- });
- tx.send(handle).await.map_err(|_| {
- DataFusionError::Internal(
- "Unknown error writing to object store".into(),
- )
- })?;
- }
- Err(_) => {
- return Err(DataFusionError::Internal(
- "Unknown error writing to object store".into(),
- ))
- }
+ let serializer_clone = serializer.clone();
+ let handle = tokio::spawn(async move {
+ let num_rows = batch.num_rows();
+ let bytes = serializer_clone.serialize(batch, initial).await?;
+ Ok((num_rows, bytes))
+ });
+ if initial {
+ initial = false;
}
+ tx.send(handle).await.map_err(|_| {
+ internal_datafusion_err!("Unknown error writing to object
store")
+ })?;
}
Ok(())
});
@@ -120,7 +112,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
Err(_) => {
return Err((
writer,
- DataFusionError::Internal("Unknown error writing to object
store".into()),
+ internal_datafusion_err!("Unknown error writing to object
store"),
))
}
};
@@ -171,9 +163,9 @@ pub(crate) async fn stateless_serialize_and_write_files(
// this thread, so we cannot clean it up (hence
any_abort_errors is true)
any_errors = true;
any_abort_errors = true;
- triggering_error = Some(DataFusionError::Internal(format!(
+ triggering_error = Some(internal_datafusion_err!(
"Unexpected join error while serializing file {e}"
- )));
+ ));
}
}
}
@@ -190,24 +182,24 @@ pub(crate) async fn stateless_serialize_and_write_files(
false => {
writer.shutdown()
.await
- .map_err(|_| DataFusionError::Internal("Error encountered
while finalizing writes! Partial results may have been written to
ObjectStore!".into()))?;
+ .map_err(|_| internal_datafusion_err!("Error encountered
while finalizing writes! Partial results may have been written to
ObjectStore!"))?;
}
}
}
if any_errors {
match any_abort_errors{
- true => return Err(DataFusionError::Internal("Error encountered
during writing to ObjectStore and failed to abort all writers. Partial result
may have been written.".into())),
+ true => return internal_err!("Error encountered during writing to
ObjectStore and failed to abort all writers. Partial result may have been
written."),
false => match triggering_error {
Some(e) => return Err(e),
- None => return Err(DataFusionError::Internal("Unknown Error
encountered during writing to ObjectStore. All writers succesfully
aborted.".into()))
+ None => return internal_err!("Unknown Error encountered during
writing to ObjectStore. All writers succesfully aborted.")
}
}
}
tx.send(row_count).map_err(|_| {
- DataFusionError::Internal(
- "Error encountered while sending row count back to file
sink!".into(),
+ internal_datafusion_err!(
+ "Error encountered while sending row count back to file sink!"
)
})?;
Ok(())
@@ -220,7 +212,7 @@ pub(crate) async fn stateless_multipart_put(
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
file_extension: String,
- get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+ get_serializer: Box<dyn Fn() -> Arc<dyn BatchSerializer> + Send>,
config: &FileSinkConfig,
compression: FileCompressionType,
) -> Result<u64> {
@@ -264,8 +256,8 @@ pub(crate) async fn stateless_multipart_put(
.send((rb_stream, serializer, writer))
.await
.map_err(|_| {
- DataFusionError::Internal(
- "Writer receive file bundle channel closed
unexpectedly!".into(),
+ internal_datafusion_err!(
+ "Writer receive file bundle channel closed unexpectedly!"
)
})?;
}
@@ -288,9 +280,7 @@ pub(crate) async fn stateless_multipart_put(
}
let total_count = rx_row_cnt.await.map_err(|_| {
- DataFusionError::Internal(
- "Did not receieve row count from write coordinater".into(),
- )
+ internal_datafusion_err!("Did not receieve row count from write
coordinater")
})?;
Ok(total_count)
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 99fb088b66..bb4c831364 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -518,10 +518,8 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
#[cfg(test)]
mod tests {
- use arrow_schema::Schema;
- use datafusion_common::internal_err;
- use datafusion_common::DataFusionError;
- use datafusion_common::Statistics;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
use super::*;
use crate::datasource::file_format::write::BatchSerializer;
@@ -534,8 +532,8 @@ mod tests {
test::{make_partition, object_store::register_test_store},
};
- use std::sync::atomic::{AtomicUsize, Ordering};
- use std::sync::Arc;
+ use arrow_schema::Schema;
+ use datafusion_common::{internal_err, DataFusionError, Statistics};
use async_trait::async_trait;
use bytes::Bytes;
@@ -993,7 +991,7 @@ mod tests {
#[async_trait]
impl BatchSerializer for TestSerializer {
- async fn serialize(&mut self, _batch: RecordBatch) -> Result<Bytes> {
+ async fn serialize(&self, _batch: RecordBatch, _initial: bool) ->
Result<Bytes> {
Ok(self.bytes.clone())
}
}