This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c234833b22 Fix: Avoid recursive external error wrapping (#14371)
c234833b22 is described below

commit c234833b222827390512261775a87fccc110429c
Author: Namgung Chan <[email protected]>
AuthorDate: Sat Feb 8 01:07:14 2025 +0900

    Fix: Avoid recursive external error wrapping (#14371)
    
    * Fix: Avoid recursive external error wrapping in type conversion.
    
    * cargo fmt
    
    * test TableProvider
    
    * Revert "test TableProvider"
    
    This reverts commit 127705e2e05523b66ba8416eeac3226217e39f05.
    
    * create WrappedError
    
    * Replace `ExternalError` with `WrappedError` in `RepartitionExec`
    
    * Replace `ExternalError` with `WrappedError` in Join Exec
    
    * sqllogictest
    
    * rename SharedError
    
    * Update comments and rename to DataFusionError::Shared
    
    * Improve API
    
    * fix clippy
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/common/src/error.rs                     | 70 +++++++++++++++++++++-
 datafusion/physical-plan/src/joins/cross_join.rs   |  2 +-
 datafusion/physical-plan/src/joins/hash_join.rs    |  4 +-
 .../physical-plan/src/joins/nested_loop_join.rs    |  2 +-
 datafusion/physical-plan/src/joins/utils.rs        |  9 ++-
 datafusion/physical-plan/src/repartition/mod.rs    |  3 +-
 datafusion/sqllogictest/test_files/aggregate.slt   |  2 +-
 datafusion/sqllogictest/test_files/errors.slt      |  9 +++
 8 files changed, 88 insertions(+), 13 deletions(-)

diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 0b4aab1dc7..af3a774e06 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -136,6 +136,12 @@ pub enum DataFusionError {
     /// human-readable messages, and locations in the source query that relate
     /// to the error in some way.
     Diagnostic(Box<Diagnostic>, Box<DataFusionError>),
+    /// A [`DataFusionError`] which shares an underlying [`DataFusionError`].
+    ///
+    /// This is useful when the same underlying [`DataFusionError`] is passed
+    /// to multiple receivers. For example, when the source of a repartition
+    /// errors and the error is propagated to multiple consumers.
+    Shared(Arc<DataFusionError>),
 }
 
 #[macro_export]
@@ -262,6 +268,17 @@ impl From<DataFusionError> for ArrowError {
     }
 }
 
+impl From<&Arc<DataFusionError>> for DataFusionError {
+    fn from(e: &Arc<DataFusionError>) -> Self {
+        if let DataFusionError::Shared(e_inner) = e.as_ref() {
+            // don't re-wrap
+            DataFusionError::Shared(Arc::clone(e_inner))
+        } else {
+            DataFusionError::Shared(Arc::clone(e))
+        }
+    }
+}
+
 #[cfg(feature = "parquet")]
 impl From<ParquetError> for DataFusionError {
     fn from(e: ParquetError) -> Self {
@@ -298,7 +315,16 @@ impl From<ParserError> for DataFusionError {
 
 impl From<GenericError> for DataFusionError {
     fn from(err: GenericError) -> Self {
-        DataFusionError::External(err)
+        // If the error is already a DataFusionError, not wrapping it.
+        if err.is::<DataFusionError>() {
+            if let Ok(e) = err.downcast::<DataFusionError>() {
+                *e
+            } else {
+                unreachable!()
+            }
+        } else {
+            DataFusionError::External(err)
+        }
     }
 }
 
@@ -334,6 +360,7 @@ impl Error for DataFusionError {
             DataFusionError::Context(_, e) => Some(e.as_ref()),
             DataFusionError::Substrait(_) => None,
             DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
+            DataFusionError::Shared(e) => Some(e.as_ref()),
         }
     }
 }
@@ -448,6 +475,7 @@ impl DataFusionError {
             DataFusionError::Context(_, _) => "",
             DataFusionError::Substrait(_) => "Substrait error: ",
             DataFusionError::Diagnostic(_, _) => "",
+            DataFusionError::Shared(_) => "",
         }
     }
 
@@ -489,6 +517,7 @@ impl DataFusionError {
             }
             DataFusionError::Substrait(ref desc) => 
Cow::Owned(desc.to_string()),
             DataFusionError::Diagnostic(_, ref err) => 
Cow::Owned(err.to_string()),
+            DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
         }
     }
 
@@ -713,7 +742,7 @@ pub fn unqualified_field_not_found(name: &str, schema: 
&DFSchema) -> DataFusionE
 mod test {
     use std::sync::Arc;
 
-    use crate::error::DataFusionError;
+    use crate::error::{DataFusionError, GenericError};
     use arrow::error::ArrowError;
 
     #[test]
@@ -867,6 +896,43 @@ mod test {
         );
     }
 
+    #[test]
+    fn external_error() {
+        // assert not wrapping DataFusionError
+        let generic_error: GenericError =
+            Box::new(DataFusionError::Plan("test".to_string()));
+        let datafusion_error: DataFusionError = generic_error.into();
+        println!("{}", datafusion_error.strip_backtrace());
+        assert_eq!(
+            datafusion_error.strip_backtrace(),
+            "Error during planning: test"
+        );
+
+        // assert wrapping other Error
+        let generic_error: GenericError =
+            Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io 
error"));
+        let datafusion_error: DataFusionError = generic_error.into();
+        println!("{}", datafusion_error.strip_backtrace());
+        assert_eq!(
+            datafusion_error.strip_backtrace(),
+            "External error: io error"
+        );
+    }
+
+    #[test]
+    fn external_error_no_recursive() {
+        let generic_error_1: GenericError =
+            Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io 
error"));
+        let external_error_1: DataFusionError = generic_error_1.into();
+        let generic_error_2: GenericError = Box::new(external_error_1);
+        let external_error_2: DataFusionError = generic_error_2.into();
+
+        println!("{}", external_error_2);
+        assert!(external_error_2
+            .to_string()
+            .starts_with("External error: io error"));
+    }
+
     /// Model what happens when implementing SendableRecordBatchStream:
     /// DataFusion code needs to return an ArrowError
     fn return_arrow_error() -> arrow::error::Result<()> {
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index ab94c132a2..cec717a25c 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -867,7 +867,7 @@ mod tests {
 
         assert_contains!(
             err.to_string(),
-            "External error: Resources exhausted: Additional allocation failed 
with top memory consumers (across reservations) as: CrossJoinExec"
+            "Resources exhausted: Additional allocation failed with top memory 
consumers (across reservations) as: CrossJoinExec"
         );
 
         Ok(())
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 76e535d93b..b4e03b57e8 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -4014,7 +4014,7 @@ mod tests {
             // Asserting that operator-level reservation attempting to 
overallocate
             assert_contains!(
                 err.to_string(),
-                "External error: Resources exhausted: Additional allocation 
failed with top memory consumers (across reservations) as: HashJoinInput"
+                "Resources exhausted: Additional allocation failed with top 
memory consumers (across reservations) as: HashJoinInput"
             );
 
             assert_contains!(
@@ -4095,7 +4095,7 @@ mod tests {
             // Asserting that stream-level reservation attempting to 
overallocate
             assert_contains!(
                 err.to_string(),
-                "External error: Resources exhausted: Additional allocation 
failed with top memory consumers (across reservations) as: HashJoinInput[1]"
+                "Resources exhausted: Additional allocation failed with top 
memory consumers (across reservations) as: HashJoinInput[1]"
 
             );
 
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 50c411ccd7..5bd2658dc7 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1514,7 +1514,7 @@ pub(crate) mod tests {
 
             assert_contains!(
                 err.to_string(),
-                "External error: Resources exhausted: Additional allocation 
failed with top memory consumers (across reservations) as: 
NestedLoopJoinLoad[0]"
+                "Resources exhausted: Additional allocation failed with top 
memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
             );
         }
 
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index dbe90077bc..61b7d2a06c 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1077,7 +1077,7 @@ impl<T: 'static> OnceFut<T> {
             OnceFutState::Ready(r) => Poll::Ready(
                 r.as_ref()
                     .map(|r| r.as_ref())
-                    .map_err(|e| 
DataFusionError::External(Box::new(Arc::clone(e)))),
+                    .map_err(DataFusionError::from),
             ),
         }
     }
@@ -1091,10 +1091,9 @@ impl<T: 'static> OnceFut<T> {
 
         match &self.state {
             OnceFutState::Pending(_) => unreachable!(),
-            OnceFutState::Ready(r) => Poll::Ready(
-                r.clone()
-                    .map_err(|e| DataFusionError::External(Box::new(e))),
-            ),
+            OnceFutState::Ready(r) => {
+                Poll::Ready(r.clone().map_err(DataFusionError::Shared))
+            }
         }
     }
 }
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 63658340f4..c01c0a9564 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -911,11 +911,12 @@ impl RepartitionExec {
             }
             // Error from running input task
             Ok(Err(e)) => {
+                // send the same Arc'd error to all output partitions
                 let e = Arc::new(e);
 
                 for (_, tx) in txs {
                     // wrap it because need to send error to all output 
partitions
-                    let err = 
Err(DataFusionError::External(Box::new(Arc::clone(&e))));
+                    let err = Err(DataFusionError::from(&e));
                     tx.send(Some(err)).await.ok();
                 }
             }
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index d6a8c428ac..7caa81d64e 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -142,7 +142,7 @@ statement error DataFusion error: Error during planning: 
Failed to coerce argume
 SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100
 
 # csv_query_approx_percentile_cont_with_histogram_bins
-statement error DataFusion error: External error: This feature is not 
implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 
0 literal \(got data type Int64\)\.
+statement error DataFusion error: This feature is not implemented: Tdigest 
max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data 
type Int64\)\.
 SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM 
aggregate_test_100 GROUP BY 1 ORDER BY 1
 
 statement error DataFusion error: Error during planning: Failed to coerce 
arguments to satisfy a call to 'approx_percentile_cont' function: coercion from 
\[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)*
diff --git a/datafusion/sqllogictest/test_files/errors.slt 
b/datafusion/sqllogictest/test_files/errors.slt
index c54ba16e97..7e2af5b9cb 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -161,3 +161,12 @@ create table records (timestamp timestamp, value float) as 
values (
     '2021-01-01 00:00:00', 1.0,
     '2021-01-01 00:00:00', 2.0
 );
+
+statement ok
+CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER);
+
+statement ok
+INSERT INTO tab0 VALUES(83,0,38);
+
+query error DataFusion error: Arrow error: Divide by zero error
+SELECT DISTINCT - 84 FROM tab0 AS cor0 WHERE NOT + 96 / + col1 <= NULL GROUP 
BY col1, col0;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to