This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1dfac86a89 fix(11397): surface proper errors in ParquetSink (#11399)
1dfac86a89 is described below

commit 1dfac86a89750193491cf3e04917e37b92c64ffa
Author: wiedld <[email protected]>
AuthorDate: Fri Jul 12 04:04:42 2024 -0700

    fix(11397): surface proper errors in ParquetSink (#11399)
    
    * fix(11397): do not surface errors for closed channels, and instead let 
the task join errors be surfaced
    
    * fix(11397): terminate early on channel send failure
---
 .../core/src/datasource/file_format/parquet.rs     | 32 +++++++++++-----------
 datafusion/core/tests/memory_limit/mod.rs          |  4 +--
 2 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 694c949285..6271d8af37 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -893,12 +893,12 @@ async fn send_arrays_to_col_writers(
     let mut next_channel = 0;
     for (array, field) in rb.columns().iter().zip(schema.fields()) {
         for c in compute_leaves(field, array)? {
-            col_array_channels[next_channel]
-                .send(c)
-                .await
-                .map_err(|_| {
-                    DataFusionError::Internal("Unable to send array to 
writer!".into())
-                })?;
+            // Do not surface error from closed channel (means something
+            // else hit an error, and the plan is shutting down).
+            if col_array_channels[next_channel].send(c).await.is_err() {
+                return Ok(());
+            }
+
             next_channel += 1;
         }
     }
@@ -984,11 +984,11 @@ fn spawn_parquet_parallel_serialization_task(
                         &pool,
                     );
 
-                    serialize_tx.send(finalize_rg_task).await.map_err(|_| {
-                        DataFusionError::Internal(
-                            "Unable to send closed RG to concat task!".into(),
-                        )
-                    })?;
+                    // Do not surface error from closed channel (means 
something
+                    // else hit an error, and the plan is shutting down).
+                    if serialize_tx.send(finalize_rg_task).await.is_err() {
+                        return Ok(());
+                    }
 
                     current_rg_rows = 0;
                     rb = rb.slice(rows_left, rb.num_rows() - rows_left);
@@ -1013,11 +1013,11 @@ fn spawn_parquet_parallel_serialization_task(
                 &pool,
             );
 
-            serialize_tx.send(finalize_rg_task).await.map_err(|_| {
-                DataFusionError::Internal(
-                    "Unable to send closed RG to concat task!".into(),
-                )
-            })?;
+            // Do not surface error from closed channel (means something
+            // else hit an error, and the plan is shutting down).
+            if serialize_tx.send(finalize_rg_task).await.is_err() {
+                return Ok(());
+            }
         }
 
         Ok(())
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index f7402357d1..7ef24609e2 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -340,8 +340,8 @@ async fn oom_parquet_sink() {
             path.to_string_lossy()
         ))
         .with_expected_errors(vec![
-            // TODO: update error handling in ParquetSink
-            "Unable to send array to writer!",
+            "Failed to allocate additional",
+            "for ParquetSink(ArrowColumnWriter)",
         ])
         .with_memory_limit(200_000)
         .run()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to