devinjdangelo commented on code in PR #9422:
URL: https://github.com/apache/arrow-datafusion/pull/9422#discussion_r1509999834
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -952,31 +932,21 @@ async fn concatenate_parallel_row_groups(
let mut row_count = 0;
while let Some(task) = serialize_rx.recv().await {
- match task.join().await {
- Ok(result) => {
- let mut rg_out = parquet_writer.next_row_group()?;
- let (serialized_columns, cnt) = result?;
- row_count += cnt;
- for chunk in serialized_columns {
- chunk.append_to_row_group(&mut rg_out)?;
- let mut buff_to_flush =
merged_buff.buffer.try_lock().unwrap();
- if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
- object_store_writer
- .write_all(buff_to_flush.as_slice())
- .await?;
- buff_to_flush.clear();
- }
- }
- rg_out.close()?;
- }
- Err(e) => {
- if e.is_panic() {
- std::panic::resume_unwind(e.into_panic());
- } else {
- unreachable!();
- }
+ let result = task.join_unwind().await;
+ let mut rg_out = parquet_writer.next_row_group()?;
+ let (serialized_columns, cnt) = result?;
+ row_count += cnt;
+ for chunk in serialized_columns {
+ chunk.append_to_row_group(&mut rg_out)?;
+ let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
+ if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
+ object_store_writer
+ .write_all(buff_to_flush.as_slice())
+ .await?;
+ buff_to_flush.clear();
Review Comment:
This block in particular is much easier to read and understand now :+1:
##########
datafusion/physical-plan/src/common.rs:
##########
@@ -204,12 +204,24 @@ impl<R: 'static> SpawnedTask<R> {
Self { inner }
}
+ /// Joins the task, returning the result of join (`Result<R, JoinError>`).
pub async fn join(mut self) -> Result<R, JoinError> {
self.inner
.join_next()
.await
.expect("`SpawnedTask` instance always contains exactly 1 task")
}
+
+ /// Joins the task and unwinds the panic if it happens.
+ pub async fn join_unwind(self) -> R {
Review Comment:
I think the existing naming makes sense to me i.e. call `join_unwind` if you
don't want to manually handle the join error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]