This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1dfac86a89 fix(11397): surface proper errors in ParquetSink (#11399)
1dfac86a89 is described below
commit 1dfac86a89750193491cf3e04917e37b92c64ffa
Author: wiedld <[email protected]>
AuthorDate: Fri Jul 12 04:04:42 2024 -0700
fix(11397): surface proper errors in ParquetSink (#11399)
* fix(11397): do not surface errors for closed channels, and instead let
the task join errors be surfaced
* fix(11397): terminate early on channel send failure
---
.../core/src/datasource/file_format/parquet.rs | 32 +++++++++++-----------
datafusion/core/tests/memory_limit/mod.rs | 4 +--
2 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 694c949285..6271d8af37 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -893,12 +893,12 @@ async fn send_arrays_to_col_writers(
let mut next_channel = 0;
for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
- col_array_channels[next_channel]
- .send(c)
- .await
- .map_err(|_| {
- DataFusionError::Internal("Unable to send array to
writer!".into())
- })?;
+ // Do not surface error from closed channel (means something
+ // else hit an error, and the plan is shutting down).
+ if col_array_channels[next_channel].send(c).await.is_err() {
+ return Ok(());
+ }
+
next_channel += 1;
}
}
@@ -984,11 +984,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);
- serialize_tx.send(finalize_rg_task).await.map_err(|_| {
- DataFusionError::Internal(
- "Unable to send closed RG to concat task!".into(),
- )
- })?;
+ // Do not surface error from closed channel (means
something
+ // else hit an error, and the plan is shutting down).
+ if serialize_tx.send(finalize_rg_task).await.is_err() {
+ return Ok(());
+ }
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
@@ -1013,11 +1013,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);
- serialize_tx.send(finalize_rg_task).await.map_err(|_| {
- DataFusionError::Internal(
- "Unable to send closed RG to concat task!".into(),
- )
- })?;
+ // Do not surface error from closed channel (means something
+ // else hit an error, and the plan is shutting down).
+ if serialize_tx.send(finalize_rg_task).await.is_err() {
+ return Ok(());
+ }
}
Ok(())
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index f7402357d1..7ef24609e2 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -340,8 +340,8 @@ async fn oom_parquet_sink() {
path.to_string_lossy()
))
.with_expected_errors(vec![
- // TODO: update error handling in ParquetSink
- "Unable to send array to writer!",
+ "Failed to allocate additional",
+ "for ParquetSink(ArrowColumnWriter)",
])
.with_memory_limit(200_000)
.run()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]