devinjdangelo commented on code in PR #7562:
URL: https://github.com/apache/arrow-datafusion/pull/7562#discussion_r1327952617


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -719,55 +717,300 @@ impl DataSink for ParquetSink {
             }
         }
 
+        Ok(writers)
+    }
+
+    /// Creates an object store writer for each output partition
+    /// This is used when parallelizing individual parquet file writes.
+    async fn create_object_store_writers(
+        &self,
+        num_partitions: usize,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>> {
+        let mut writers = Vec::new();
+
+        for _ in 0..num_partitions {
+            let file_path = self.config.table_paths[0].prefix();
+            let object_meta = ObjectMeta {
+                location: file_path.clone(),
+                last_modified: chrono::offset::Utc::now(),
+                size: 0,
+                e_tag: None,
+            };
+            writers.push(
+                create_writer(
+                    FileWriterMode::PutMultipart,
+                    FileCompressionType::UNCOMPRESSED,
+                    object_meta.into(),
+                    object_store.clone(),
+                )
+                .await?,
+            );
+        }
+
+        Ok(writers)
+    }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+    async fn write_all(
+        &self,
+        mut data: Vec<SendableRecordBatchStream>,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let num_partitions = data.len();
+        let parquet_props = self
+            .config
+            .file_type_writer_options
+            .try_into_parquet()?
+            .writer_options();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
         let mut row_count = 0;
 
+        let allow_single_file_parallelism = context
+            .session_config()
+            .options()
+            .execution
+            .parquet
+            .allow_single_file_parallelism;
+
         match self.config.single_file_output {
             false => {
-                let mut join_set: JoinSet<Result<usize, DataFusionError>> =
-                    JoinSet::new();
-                for (mut data_stream, mut writer) in
-                    data.into_iter().zip(writers.into_iter())
-                {
-                    join_set.spawn(async move {
-                        let mut cnt = 0;
+                let writers = self
+                    .create_all_async_arrow_writers(
+                        num_partitions,
+                        parquet_props,
+                        object_store.clone(),
+                    )
+                    .await?;
+                // TODO parallelize individual parquet serialization when 
already outputting multiple parquet files
+                // e.g. if outputting 2 parquet files on a system with 32 
threads, spawn 16 tasks for each individual
+                // file to be serialized.
+                row_count = output_multiple_parquet_files(writers, 
data).await?;
+            }
+            true => {
+                if !allow_single_file_parallelism || data.len() <= 1 {

Review Comment:
   Yes, that is how it is implemented currently. FileSinkExec currently tells 
the optimizer that it does not benefit from partitioning which is now not true 
in this specific case.
   
   I think that it would be better to not rely on the input partitioning to 
decide the parallelism (since the partitions are usually taken to mean the 
number of desired output files) but I'm not quite sure how to accomplish that 
yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to