alamb commented on code in PR #8666:
URL: https://github.com/apache/arrow-datafusion/pull/8666#discussion_r1437854445
##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -149,15 +145,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
+pub trait SerializationSchema: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
- /// Duplicates self to support serializing multiple batches in parallel on
multiple cores
- fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
- Err(DataFusionError::NotImplemented(
- "Parallel serialization is not implemented for this file
type".into(),
- ))
- }
+ async fn serialize(&self, batch: RecordBatch) -> Result<Bytes>;
+
+ /// Duplicates itself (sans header configuration) to support serializing
+ /// multiple batches in parallel on multiple cores. Unless we are
serializing
+ /// a CSV file, this method is no-op.
+ fn duplicate_headerless(&self) -> Arc<dyn SerializationSchema>;
Review Comment:
```suggestion
/// Duplicates self to support serializing multiple batches in parallel
on multiple cores.
/// For formats such as CSV, the returned serializer should not print a
header (column names)
fn duplicate_headerless(&self) -> Arc<dyn SerializationSchema>;
```
##########
datafusion/core/src/datasource/file_format/write/orchestration.rs:
##########
@@ -21,62 +21,55 @@
use std::sync::Arc;
+use super::demux::start_demuxer_task;
+use super::{create_writer, AbortableWrite, SerializationSchema};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
-
-use datafusion_common::DataFusionError;
-
-use bytes::Bytes;
+use datafusion_common::{internal_datafusion_err, internal_err,
DataFusionError};
use datafusion_execution::TaskContext;
+use bytes::Bytes;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::{JoinHandle, JoinSet};
use tokio::try_join;
-use super::demux::start_demuxer_task;
-use super::{create_writer, AbortableWrite, BatchSerializer};
-
type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
-type SerializerType = Box<dyn BatchSerializer>;
+type SerializerType = Arc<dyn SerializationSchema>;
/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
- mut serializer: Box<dyn BatchSerializer>,
+ mut serializer: Arc<dyn SerializationSchema>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes),
DataFusionError>>>(100);
-
+ // Initially, has_header can be true for CSV use cases. Then, we must turn
+ // it off to maintain the integrity of the writing process.
Review Comment:
```suggestion
// Some serializers (like CSV) handle the first batch differently than
subsequent batches
// so we track that here
```
##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -149,15 +145,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
+pub trait SerializationSchema: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized
bytes.
- async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
- /// Duplicates self to support serializing multiple batches in parallel on
multiple cores
- fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
- Err(DataFusionError::NotImplemented(
- "Parallel serialization is not implemented for this file
type".into(),
- ))
- }
+ async fn serialize(&self, batch: RecordBatch) -> Result<Bytes>;
Review Comment:
Instead of `duplicate_headerless` another pattern could be to pass in
another parameter here, which matches how the calculation is done in the
serialization code
```suggestion
// is_initial is true if this is the first batch to be serialized by this
serializer
async fn serialize(&self,
batch: RecordBatch,
is_initial: bool,
) -> Result<Bytes>;
```
##########
datafusion/core/src/datasource/file_format/write/orchestration.rs:
##########
@@ -171,9 +164,9 @@ pub(crate) async fn stateless_serialize_and_write_files(
// this thread, so we cannot clean it up (hence
any_abort_errors is true)
any_errors = true;
any_abort_errors = true;
- triggering_error = Some(DataFusionError::Internal(format!(
+ triggering_error = Some(internal_datafusion_err!(
Review Comment:
👍 nice cleanup
##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -149,15 +145,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
-pub trait BatchSerializer: Unpin + Send {
+pub trait SerializationSchema: Sync + Send {
Review Comment:
What is the rationale for renaming this trait? It doesn't seem directly
related to a `Schema` 🤔 I think the original name `BatchSerializer` better
matches what the trait does
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]