This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c234833b22 Fix: Avoid recursive external error wrapping (#14371)
c234833b22 is described below
commit c234833b222827390512261775a87fccc110429c
Author: Namgung Chan <[email protected]>
AuthorDate: Sat Feb 8 01:07:14 2025 +0900
Fix: Avoid recursive external error wrapping (#14371)
* Fix: Avoid recursive external error wrapping in type conversion.
* cargo fmt
* test TableProvider
* Revert "test TableProvider"
This reverts commit 127705e2e05523b66ba8416eeac3226217e39f05.
* create WrappedError
* Replace `ExternalError` with `WrappedError` in `RepartitionExec`
* Replace `ExternalError` with `WrappedError` in Join Exec
* sqllogictest
* rename SharedError
* Update comments and rename to DataFusionError::Shared
* Improve API
* fix clippy
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/common/src/error.rs | 70 +++++++++++++++++++++-
datafusion/physical-plan/src/joins/cross_join.rs | 2 +-
datafusion/physical-plan/src/joins/hash_join.rs | 4 +-
.../physical-plan/src/joins/nested_loop_join.rs | 2 +-
datafusion/physical-plan/src/joins/utils.rs | 9 ++-
datafusion/physical-plan/src/repartition/mod.rs | 3 +-
datafusion/sqllogictest/test_files/aggregate.slt | 2 +-
datafusion/sqllogictest/test_files/errors.slt | 9 +++
8 files changed, 88 insertions(+), 13 deletions(-)
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 0b4aab1dc7..af3a774e06 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -136,6 +136,12 @@ pub enum DataFusionError {
/// human-readable messages, and locations in the source query that relate
/// to the error in some way.
Diagnostic(Box<Diagnostic>, Box<DataFusionError>),
+ /// A [`DataFusionError`] which shares an underlying [`DataFusionError`].
+ ///
+ /// This is useful when the same underlying [`DataFusionError`] is passed
+ /// to multiple receivers. For example, when the source of a repartition
+ /// errors and the error is propagated to multiple consumers.
+ Shared(Arc<DataFusionError>),
}
#[macro_export]
@@ -262,6 +268,17 @@ impl From<DataFusionError> for ArrowError {
}
}
+impl From<&Arc<DataFusionError>> for DataFusionError {
+ fn from(e: &Arc<DataFusionError>) -> Self {
+ if let DataFusionError::Shared(e_inner) = e.as_ref() {
+ // don't re-wrap
+ DataFusionError::Shared(Arc::clone(e_inner))
+ } else {
+ DataFusionError::Shared(Arc::clone(e))
+ }
+ }
+}
+
#[cfg(feature = "parquet")]
impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
@@ -298,7 +315,16 @@ impl From<ParserError> for DataFusionError {
impl From<GenericError> for DataFusionError {
fn from(err: GenericError) -> Self {
- DataFusionError::External(err)
+ // If the error is already a DataFusionError, not wrapping it.
+ if err.is::<DataFusionError>() {
+ if let Ok(e) = err.downcast::<DataFusionError>() {
+ *e
+ } else {
+ unreachable!()
+ }
+ } else {
+ DataFusionError::External(err)
+ }
}
}
@@ -334,6 +360,7 @@ impl Error for DataFusionError {
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
+ DataFusionError::Shared(e) => Some(e.as_ref()),
}
}
}
@@ -448,6 +475,7 @@ impl DataFusionError {
DataFusionError::Context(_, _) => "",
DataFusionError::Substrait(_) => "Substrait error: ",
DataFusionError::Diagnostic(_, _) => "",
+ DataFusionError::Shared(_) => "",
}
}
@@ -489,6 +517,7 @@ impl DataFusionError {
}
DataFusionError::Substrait(ref desc) =>
Cow::Owned(desc.to_string()),
DataFusionError::Diagnostic(_, ref err) =>
Cow::Owned(err.to_string()),
+ DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
}
}
@@ -713,7 +742,7 @@ pub fn unqualified_field_not_found(name: &str, schema:
&DFSchema) -> DataFusionE
mod test {
use std::sync::Arc;
- use crate::error::DataFusionError;
+ use crate::error::{DataFusionError, GenericError};
use arrow::error::ArrowError;
#[test]
@@ -867,6 +896,43 @@ mod test {
);
}
+ #[test]
+ fn external_error() {
+ // assert not wrapping DataFusionError
+ let generic_error: GenericError =
+ Box::new(DataFusionError::Plan("test".to_string()));
+ let datafusion_error: DataFusionError = generic_error.into();
+ println!("{}", datafusion_error.strip_backtrace());
+ assert_eq!(
+ datafusion_error.strip_backtrace(),
+ "Error during planning: test"
+ );
+
+ // assert wrapping other Error
+ let generic_error: GenericError =
+ Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io
error"));
+ let datafusion_error: DataFusionError = generic_error.into();
+ println!("{}", datafusion_error.strip_backtrace());
+ assert_eq!(
+ datafusion_error.strip_backtrace(),
+ "External error: io error"
+ );
+ }
+
+ #[test]
+ fn external_error_no_recursive() {
+ let generic_error_1: GenericError =
+ Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io
error"));
+ let external_error_1: DataFusionError = generic_error_1.into();
+ let generic_error_2: GenericError = Box::new(external_error_1);
+ let external_error_2: DataFusionError = generic_error_2.into();
+
+ println!("{}", external_error_2);
+ assert!(external_error_2
+ .to_string()
+ .starts_with("External error: io error"));
+ }
+
/// Model what happens when implementing SendableRecordBatchStream:
/// DataFusion code needs to return an ArrowError
fn return_arrow_error() -> arrow::error::Result<()> {
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index ab94c132a2..cec717a25c 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -867,7 +867,7 @@ mod tests {
assert_contains!(
err.to_string(),
- "External error: Resources exhausted: Additional allocation failed
with top memory consumers (across reservations) as: CrossJoinExec"
+ "Resources exhausted: Additional allocation failed with top memory
consumers (across reservations) as: CrossJoinExec"
);
Ok(())
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 76e535d93b..b4e03b57e8 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -4014,7 +4014,7 @@ mod tests {
// Asserting that operator-level reservation attempting to
overallocate
assert_contains!(
err.to_string(),
- "External error: Resources exhausted: Additional allocation
failed with top memory consumers (across reservations) as: HashJoinInput"
+ "Resources exhausted: Additional allocation failed with top
memory consumers (across reservations) as: HashJoinInput"
);
assert_contains!(
@@ -4095,7 +4095,7 @@ mod tests {
// Asserting that stream-level reservation attempting to
overallocate
assert_contains!(
err.to_string(),
- "External error: Resources exhausted: Additional allocation
failed with top memory consumers (across reservations) as: HashJoinInput[1]"
+ "Resources exhausted: Additional allocation failed with top
memory consumers (across reservations) as: HashJoinInput[1]"
);
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 50c411ccd7..5bd2658dc7 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1514,7 +1514,7 @@ pub(crate) mod tests {
assert_contains!(
err.to_string(),
- "External error: Resources exhausted: Additional allocation
failed with top memory consumers (across reservations) as:
NestedLoopJoinLoad[0]"
+ "Resources exhausted: Additional allocation failed with top
memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
);
}
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index dbe90077bc..61b7d2a06c 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1077,7 +1077,7 @@ impl<T: 'static> OnceFut<T> {
OnceFutState::Ready(r) => Poll::Ready(
r.as_ref()
.map(|r| r.as_ref())
- .map_err(|e|
DataFusionError::External(Box::new(Arc::clone(e)))),
+ .map_err(DataFusionError::from),
),
}
}
@@ -1091,10 +1091,9 @@ impl<T: 'static> OnceFut<T> {
match &self.state {
OnceFutState::Pending(_) => unreachable!(),
- OnceFutState::Ready(r) => Poll::Ready(
- r.clone()
- .map_err(|e| DataFusionError::External(Box::new(e))),
- ),
+ OnceFutState::Ready(r) => {
+ Poll::Ready(r.clone().map_err(DataFusionError::Shared))
+ }
}
}
}
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 63658340f4..c01c0a9564 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -911,11 +911,12 @@ impl RepartitionExec {
}
// Error from running input task
Ok(Err(e)) => {
+ // send the same Arc'd error to all output partitions
let e = Arc::new(e);
for (_, tx) in txs {
// wrap it because need to send error to all output
partitions
- let err =
Err(DataFusionError::External(Box::new(Arc::clone(&e))));
+ let err = Err(DataFusionError::from(&e));
tx.send(Some(err)).await.ok();
}
}
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index d6a8c428ac..7caa81d64e 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -142,7 +142,7 @@ statement error DataFusion error: Error during planning:
Failed to coerce argume
SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100
# csv_query_approx_percentile_cont_with_histogram_bins
-statement error DataFusion error: External error: This feature is not
implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt >
0 literal \(got data type Int64\)\.
+statement error DataFusion error: This feature is not implemented: Tdigest
max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data
type Int64\)\.
SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM
aggregate_test_100 GROUP BY 1 ORDER BY 1
statement error DataFusion error: Error during planning: Failed to coerce
arguments to satisfy a call to 'approx_percentile_cont' function: coercion from
\[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)*
diff --git a/datafusion/sqllogictest/test_files/errors.slt
b/datafusion/sqllogictest/test_files/errors.slt
index c54ba16e97..7e2af5b9cb 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -161,3 +161,12 @@ create table records (timestamp timestamp, value float) as
values (
'2021-01-01 00:00:00', 1.0,
'2021-01-01 00:00:00', 2.0
);
+
+statement ok
+CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER);
+
+statement ok
+INSERT INTO tab0 VALUES(83,0,38);
+
+query error DataFusion error: Arrow error: Divide by zero error
+SELECT DISTINCT - 84 FROM tab0 AS cor0 WHERE NOT + 96 / + col1 <= NULL GROUP
BY col1, col0;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]