adamreeve commented on code in PR #16738: URL: https://github.com/apache/datafusion/pull/16738#discussion_r2268446276
########## datafusion/datasource-parquet/src/file_format.rs: ########## @@ -1749,16 +1732,16 @@ fn spawn_parquet_parallel_serialization_task( // Do not surface error from closed channel (means something // else hit an error, and the plan is shutting down). if serialize_tx.send(finalize_rg_task).await.is_err() { - return Ok(()); + return Ok(arrow_writer); } current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); + let col_writers = arrow_writer.get_column_writers().unwrap(); Review Comment: I don't think this explains the deadlock, but this won't work correctly. Above in line 1734 the `finalize_rg_task` will send the row group finalization task to a channel but at this line there's a race condition because the writer may or may not have written the row group, and we rely on `self.writer.flushed_row_groups().len()` being correct within `get_column_writers` to determine the row group ordinal. I think we might have to update the arrow-rs API to allow DataFusion to specify the row group index. We should really have a test similar to `write_parquet_with_small_rg_size` that uses encryption and verifies we can read and decrypt the written files with multiple row groups. ########## datafusion/datasource-parquet/src/file_format.rs: ########## @@ -1857,28 +1832,38 @@ async fn output_single_parquet_file_parallelized( let (serialize_tx, serialize_rx) = mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups); + let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); + let writer = ArrowWriter::try_new( + merged_buff.clone(), + Arc::clone(&output_schema), + Some(parquet_props.clone()), + )?; + let arc_props = Arc::new(parquet_props.clone()); let launch_serialization_task = spawn_parquet_parallel_serialization_task( + writer, data, serialize_tx, Arc::clone(&output_schema), Arc::clone(&arc_props), parallel_options, Arc::clone(&pool), ); + + let writer = launch_serialization_task + .join_unwind() + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; Review Comment: The deadlock is caused by this change here. The `(serialize_tx, serialize_rx)` channel for processing row groups has a maximum size which is 1 by default. The `launch_serialization_task` never finishes because it tries to send multiple row groups, but the task that receives the row groups and writes them to the file hasn't started yet so sending the second row group will block. ########## datafusion/datasource-parquet/src/file_format.rs: ########## @@ -1778,44 +1761,36 @@ fn spawn_parquet_parallel_serialization_task( // Do not surface error from closed channel (means something // else hit an error, and the plan is shutting down). if serialize_tx.send(finalize_rg_task).await.is_err() { - return Ok(()); + return Ok(arrow_writer); } } - Ok(()) + Ok(arrow_writer) }) } /// Consume RowGroups serialized by other parallel tasks and concatenate them in /// to the final parquet file, while flushing finalized bytes to an [ObjectStore] async fn concatenate_parallel_row_groups( + mut arrow_writer: ArrowWriter<SharedBuffer>, + merged_buff: SharedBuffer, mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>, - schema: Arc<Schema>, - writer_props: Arc<WriterProperties>, mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>, pool: Arc<dyn MemoryPool>, ) -> Result<FileMetaData> { - let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); - let mut file_reservation = MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); - let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?; - let mut parquet_writer = SerializedFileWriter::new( - merged_buff.clone(), - schema_desc.root_schema_ptr(), - writer_props, - )?; - while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; - let mut rg_out = parquet_writer.next_row_group()?; let (serialized_columns, mut rg_reservation, _cnt) = result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - for chunk in serialized_columns { - chunk.append_to_row_group(&mut rg_out)?; - rg_reservation.free(); + let mut finalized_rg = Vec::with_capacity(serialized_columns.len()); + for task in serialized_columns { + finalized_rg.push(task); + + rg_reservation.free(); Review Comment: This section of the code doesn't look correct as we try to flush the buffer to the file but at this point we haven't actually written anything to the buffer as we've changed from writing to the row group to just appending the serialization result to a vector. The logic for flushing to the object store should move down to below the actual write. Also, is this a problem that we now won't flush to the object store until after we've written a full row group rather than just a single column? That could have quite a big effect on IO behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org