devinjdangelo commented on code in PR #7562:
URL: https://github.com/apache/arrow-datafusion/pull/7562#discussion_r1327963128


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -719,55 +718,329 @@ impl DataSink for ParquetSink {
             }
         }
 
+        Ok(writers)
+    }
+
+    /// Creates an object store writer for each output partition
+    /// This is used when parallelizing individual parquet file writes.
+    async fn create_object_store_writers(
+        &self,
+        num_partitions: usize,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>> {
+        let mut writers = Vec::new();
+
+        for _ in 0..num_partitions {
+            let file_path = self.config.table_paths[0].prefix();
+            let object_meta = ObjectMeta {
+                location: file_path.clone(),
+                last_modified: chrono::offset::Utc::now(),
+                size: 0,
+                e_tag: None,
+            };
+            writers.push(
+                create_writer(
+                    FileWriterMode::PutMultipart,
+                    FileCompressionType::UNCOMPRESSED,
+                    object_meta.into(),
+                    object_store.clone(),
+                )
+                .await?,
+            );
+        }
+
+        Ok(writers)
+    }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+    async fn write_all(
+        &self,
+        mut data: Vec<SendableRecordBatchStream>,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let num_partitions = data.len();
+        let parquet_props = self
+            .config
+            .file_type_writer_options
+            .try_into_parquet()?
+            .writer_options();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
         let mut row_count = 0;
 
+        let allow_single_file_parallelism = context
+            .session_config()
+            .options()
+            .execution
+            .parquet
+            .allow_single_file_parallelism;
+
         match self.config.single_file_output {
             false => {
-                let mut join_set: JoinSet<Result<usize, DataFusionError>> =
-                    JoinSet::new();
-                for (mut data_stream, mut writer) in
-                    data.into_iter().zip(writers.into_iter())
-                {
-                    join_set.spawn(async move {
-                        let mut cnt = 0;
+                let writers = self
+                    .create_all_async_arrow_writers(
+                        num_partitions,
+                        parquet_props,
+                        object_store.clone(),
+                    )
+                    .await?;
+                // TODO parallelize individual parquet serialization when 
already outputting multiple parquet files
+                // e.g. if outputting 2 parquet files on a system with 32 
threads, spawn 16 tasks for each individual
+                // file to be serialized.
+                row_count = output_multiple_parquet_files(writers, 
data).await?;
+            }
+            true => {
+                if !allow_single_file_parallelism || data.len() <= 1 {
+                    let mut writer = self
+                        .create_all_async_arrow_writers(
+                            num_partitions,
+                            parquet_props,
+                            object_store.clone(),
+                        )
+                        .await?
+                        .remove(0);
+                    for data_stream in data.iter_mut() {
                         while let Some(batch) = 
data_stream.next().await.transpose()? {
-                            cnt += batch.num_rows();
+                            row_count += batch.num_rows();
                             writer.write(&batch).await?;
                         }
-                        writer.close().await?;
-                        Ok(cnt)
-                    });
+                    }
+
+                    writer.close().await?;
+                } else {
+                    let object_store_writer = self
+                        .create_object_store_writers(1, object_store)
+                        .await?
+                        .remove(0);
+                    row_count = output_single_parquet_file_parallelized(
+                        object_store_writer,
+                        data,
+                        self.config.output_schema.clone(),
+                        parquet_props,
+                    )
+                    .await?;
                 }
-                while let Some(result) = join_set.join_next().await {
-                    match result {
-                        Ok(res) => {
-                            row_count += res?;
-                        } // propagate DataFusion error
-                        Err(e) => {
-                            if e.is_panic() {
-                                std::panic::resume_unwind(e.into_panic());
-                            } else {
-                                unreachable!();
+            }
+        }
+
+        Ok(row_count as u64)
+    }
+}
+
+/// This is the return type when joining subtasks which are serializing 
parquet files
+/// into memory buffers. The first part of the tuple is the parquet bytes and 
the
+/// second is how many rows were written into the file.
+type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+
+/// Parallelizes the serialization of a single parquet file, by first 
serializing N
+/// independent RecordBatch streams in parallel to parquet files in memory. 
Another
+/// task then stitches these independent files back together and streams this 
large
+/// single parquet file to an ObjectStore in multiple parts.
+async fn output_single_parquet_file_parallelized(
+    mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + 
Unpin>>,
+    mut data: Vec<SendableRecordBatchStream>,
+    output_schema: Arc<Schema>,
+    parquet_props: &WriterProperties,
+) -> Result<usize> {
+    let mut row_count = 0;
+    let parallelism = data.len();
+    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
+        Vec::with_capacity(parallelism);
+    for _ in 0..parallelism {
+        let buffer: Vec<u8> = Vec::new();
+        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
+            buffer,
+            output_schema.clone(),
+            Some(parquet_props.clone()),
+        )?;
+        let mut data_stream = data.remove(0);
+        join_handles.push(tokio::spawn(async move {
+            let mut inner_row_count = 0;
+            while let Some(batch) = data_stream.next().await.transpose()? {
+                inner_row_count += batch.num_rows();
+                writer.write(&batch)?;
+            }
+            let out = writer.into_inner()?;
+            Ok((out, inner_row_count))
+        }))
+    }
+
+    let mut writer = None;
+    let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
+        tokio::sync::mpsc::unbounded_channel();
+    let (tx, mut rx) = endpoints;
+    let writer_join_handle: JoinHandle<
+        Result<
+            AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send + 
Unpin>>,
+            DataFusionError,
+        >,
+    > = tokio::task::spawn(async move {
+        while let Some(data) = rx.recv().await {
+            object_store_writer.write_all(data.as_slice()).await?;
+        }
+        Ok(object_store_writer)
+    });
+    let merged_buff = SharedBuffer::new(1048576);
+    for handle in join_handles {
+        let join_result = handle.await;
+        match join_result {
+            Ok(result) => {
+                let (out, num_rows) = result?;
+                let reader = bytes::Bytes::from(out);
+                row_count += num_rows;
+                //let reader = File::open(buffer)?;
+                let metadata = parquet::file::footer::parse_metadata(&reader)?;

Review Comment:
   @tustvold @alamb how hard would it be for this call to work on a stream of 
bytes rather than a fully buffered parquet files? That way we could eliminate 
the need for the parallel tasks to fully buffer the  sub parquet files.
   
   I'm not sure if an api exists for this in arrow-rs already.



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -719,55 +717,300 @@ impl DataSink for ParquetSink {
             }
         }
 
+        Ok(writers)
+    }
+
+    /// Creates an object store writer for each output partition
+    /// This is used when parallelizing individual parquet file writes.
+    async fn create_object_store_writers(
+        &self,
+        num_partitions: usize,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>> {
+        let mut writers = Vec::new();
+
+        for _ in 0..num_partitions {
+            let file_path = self.config.table_paths[0].prefix();
+            let object_meta = ObjectMeta {
+                location: file_path.clone(),
+                last_modified: chrono::offset::Utc::now(),
+                size: 0,
+                e_tag: None,
+            };
+            writers.push(
+                create_writer(
+                    FileWriterMode::PutMultipart,
+                    FileCompressionType::UNCOMPRESSED,
+                    object_meta.into(),
+                    object_store.clone(),
+                )
+                .await?,
+            );
+        }
+
+        Ok(writers)
+    }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+    async fn write_all(
+        &self,
+        mut data: Vec<SendableRecordBatchStream>,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let num_partitions = data.len();
+        let parquet_props = self
+            .config
+            .file_type_writer_options
+            .try_into_parquet()?
+            .writer_options();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
         let mut row_count = 0;
 
+        let allow_single_file_parallelism = context
+            .session_config()
+            .options()
+            .execution
+            .parquet
+            .allow_single_file_parallelism;
+
         match self.config.single_file_output {
             false => {
-                let mut join_set: JoinSet<Result<usize, DataFusionError>> =
-                    JoinSet::new();
-                for (mut data_stream, mut writer) in
-                    data.into_iter().zip(writers.into_iter())
-                {
-                    join_set.spawn(async move {
-                        let mut cnt = 0;
+                let writers = self
+                    .create_all_async_arrow_writers(
+                        num_partitions,
+                        parquet_props,
+                        object_store.clone(),
+                    )
+                    .await?;
+                // TODO parallelize individual parquet serialization when 
already outputting multiple parquet files
+                // e.g. if outputting 2 parquet files on a system with 32 
threads, spawn 16 tasks for each individual
+                // file to be serialized.
+                row_count = output_multiple_parquet_files(writers, 
data).await?;
+            }
+            true => {
+                if !allow_single_file_parallelism || data.len() <= 1 {

Review Comment:
   I think that is a possibility. In the future, I'd like to enable 
parallelizing the serialization of each parquet file when outputting multiple. 
In that case, the number of output partitions is taken to be the number of 
output files. To parallelize each output file, the N incoming RecordBatch 
streams would need to be divided up within ParquetSink itself, similarly to how 
it is currently done in CsvSink and JsonSink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to