adamreeve commented on code in PR #16738:
URL: https://github.com/apache/datafusion/pull/16738#discussion_r2268446276


##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1749,16 +1732,16 @@ fn spawn_parquet_parallel_serialization_task(
                     // Do not surface error from closed channel (means 
something
                     // else hit an error, and the plan is shutting down).
                     if serialize_tx.send(finalize_rg_task).await.is_err() {
-                        return Ok(());
+                        return Ok(arrow_writer);
                     }
 
                     current_rg_rows = 0;
                     rb = rb.slice(rows_left, rb.num_rows() - rows_left);
 
+                    let col_writers = 
arrow_writer.get_column_writers().unwrap();

Review Comment:
   I don't think this explains the deadlock, but this won't work correctly. 
Above in line 1734 the `finalize_rg_task` will send the row group finalization 
task to a channel but at this line there's a race condition because the writer 
may or may not have written the row group, and we rely on 
`self.writer.flushed_row_groups().len()` being correct within 
`get_column_writers` to determine the row group ordinal.
   
   I think we might have to update the arrow-rs API to allow DataFusion to 
specify the row group index.
   
   We should really have a test similar to `write_parquet_with_small_rg_size` 
that uses encryption and verifies we can read and decrypt the written files 
with multiple row groups.



##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1857,28 +1832,38 @@ async fn output_single_parquet_file_parallelized(
     let (serialize_tx, serialize_rx) =
         mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
 
+    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
+    let writer = ArrowWriter::try_new(
+        merged_buff.clone(),
+        Arc::clone(&output_schema),
+        Some(parquet_props.clone()),
+    )?;
+
     let arc_props = Arc::new(parquet_props.clone());
     let launch_serialization_task = spawn_parquet_parallel_serialization_task(
+        writer,
         data,
         serialize_tx,
         Arc::clone(&output_schema),
         Arc::clone(&arc_props),
         parallel_options,
         Arc::clone(&pool),
     );
+
+    let writer = launch_serialization_task
+        .join_unwind()
+        .await
+        .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

Review Comment:
   The deadlock is caused by this change here. The `(serialize_tx, 
serialize_rx)` channel for processing row groups has a maximum size which is 1 
by default. The `launch_serialization_task` never finishes because it tries to 
send multiple row groups, but the task that receives the row groups and writes 
them to the file hasn't started yet so sending the second row group will block.



##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1778,44 +1761,36 @@ fn spawn_parquet_parallel_serialization_task(
             // Do not surface error from closed channel (means something
             // else hit an error, and the plan is shutting down).
             if serialize_tx.send(finalize_rg_task).await.is_err() {
-                return Ok(());
+                return Ok(arrow_writer);
             }
         }
 
-        Ok(())
+        Ok(arrow_writer)
     })
 }
 
 /// Consume RowGroups serialized by other parallel tasks and concatenate them 
in
 /// to the final parquet file, while flushing finalized bytes to an 
[ObjectStore]
 async fn concatenate_parallel_row_groups(
+    mut arrow_writer: ArrowWriter<SharedBuffer>,
+    merged_buff: SharedBuffer,
     mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
-    schema: Arc<Schema>,
-    writer_props: Arc<WriterProperties>,
     mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
     pool: Arc<dyn MemoryPool>,
 ) -> Result<FileMetaData> {
-    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
-
     let mut file_reservation =
         
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
 
-    let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
-    let mut parquet_writer = SerializedFileWriter::new(
-        merged_buff.clone(),
-        schema_desc.root_schema_ptr(),
-        writer_props,
-    )?;
-
     while let Some(task) = serialize_rx.recv().await {
         let result = task.join_unwind().await;
-        let mut rg_out = parquet_writer.next_row_group()?;
         let (serialized_columns, mut rg_reservation, _cnt) =
             result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
-        for chunk in serialized_columns {
-            chunk.append_to_row_group(&mut rg_out)?;
-            rg_reservation.free();
 
+        let mut finalized_rg = Vec::with_capacity(serialized_columns.len());
+        for task in serialized_columns {
+            finalized_rg.push(task);
+
+            rg_reservation.free();

Review Comment:
   This section of the code doesn't look correct as we try to flush the buffer 
to the file but at this point we haven't actually written anything to the 
buffer as we've changed from writing to the row group to just appending the 
serialization result to a vector. The logic for flushing to the object store 
should move down to below the actual write.
   
   Also, is this a problem that we now won't flush to the object store until 
after we've written a full row group rather than just a single column? That 
could have quite a big effect on IO behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to