This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cb6f7fe3a6 Extend backtrace coverage for `DatafusionError::Plan`
errors errors (#7803)
cb6f7fe3a6 is described below
commit cb6f7fe3a627529d3614cf4ea126a646f1c76557
Author: comphead <[email protected]>
AuthorDate: Mon Oct 16 07:08:20 2023 -0700
Extend backtrace coverage for `DatafusionError::Plan` errors errors (#7803)
* improve backtrace coverag for plan errors
* fix cli
* cli fmt
* fix tests
* docs
* doc fmt
---
datafusion-cli/src/exec.rs | 5 +-
datafusion/common/src/dfschema.rs | 13 ++---
datafusion/common/src/error.rs | 26 +++++++---
datafusion/common/src/functional_dependencies.rs | 25 +++++-----
datafusion/core/src/datasource/listing/table.rs | 14 +++---
datafusion/core/src/execution/context.rs | 29 ++++-------
datafusion/core/src/physical_optimizer/pruning.rs | 4 +-
.../core/src/physical_optimizer/sort_pushdown.rs | 6 +--
datafusion/execution/src/task.rs | 10 ++--
datafusion/expr/src/aggregate_function.rs | 15 +++---
datafusion/expr/src/built_in_function.rs | 17 ++++---
datafusion/expr/src/expr_schema.rs | 8 ++-
datafusion/expr/src/field_util.rs | 6 ++-
datafusion/expr/src/logical_plan/builder.rs | 10 ++--
datafusion/expr/src/type_coercion/binary.rs | 30 +++++------
datafusion/expr/src/utils.rs | 8 +--
datafusion/expr/src/window_function.rs | 15 +++---
datafusion/optimizer/src/analyzer/type_coercion.rs | 30 +++++------
datafusion/optimizer/src/push_down_filter.rs | 6 ++-
datafusion/physical-plan/src/joins/utils.rs | 12 ++---
datafusion/proto/src/bytes/mod.rs | 58 ++++++++++------------
datafusion/proto/src/logical_plan/from_proto.rs | 8 ++-
datafusion/proto/src/logical_plan/mod.rs | 5 +-
datafusion/sql/src/expr/arrow_cast.rs | 8 +--
datafusion/sql/src/expr/function.rs | 6 ++-
datafusion/sql/src/expr/identifier.rs | 7 ++-
datafusion/sql/src/expr/order_by.rs | 16 +++---
datafusion/sql/src/statement.rs | 22 ++++----
datafusion/sql/tests/sql_integration.rs | 35 +++++++------
29 files changed, 236 insertions(+), 218 deletions(-)
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 100d7bce44..b62ad12dbf 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -26,6 +26,7 @@ use crate::{
},
print_options::{MaxRows, PrintOptions},
};
+use datafusion::common::plan_datafusion_err;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
datasource::listing::ListingTableUrl,
@@ -202,11 +203,11 @@ async fn exec_and_print(
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake,
Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
- ))
+ )
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index e015ef5c40..b1aee41978 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -24,7 +24,9 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
-use crate::error::{unqualified_field_not_found, DataFusionError, Result,
SchemaError};
+use crate::error::{
+ unqualified_field_not_found, DataFusionError, Result, SchemaError,
_plan_err,
+};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference,
TableReference,
};
@@ -187,10 +189,10 @@ impl DFSchema {
match &self.fields[i].qualifier {
Some(qualifier) => {
if (qualifier.to_string() + "." +
self.fields[i].name()) == name {
- return Err(DataFusionError::Plan(format!(
+ return _plan_err!(
"Fully qualified field name '{name}' was
supplied to `index_of` \
which is deprecated. Please use
`index_of_column_by_name` instead"
- )));
+ );
}
}
None => (),
@@ -378,12 +380,11 @@ impl DFSchema {
.zip(arrow_schema.fields().iter())
.try_for_each(|(l_field, r_field)| {
if !can_cast_types(r_field.data_type(), l_field.data_type()) {
- Err(DataFusionError::Plan(
- format!("Column {} (type: {}) is not compatible with
column {} (type: {})",
+ _plan_err!("Column {} (type: {}) is not compatible with
column {} (type: {})",
r_field.name(),
r_field.data_type(),
l_field.name(),
- l_field.data_type())))
+ l_field.data_type())
} else {
Ok(())
}
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index d7a0e1b59d..adf58e282e 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -477,12 +477,25 @@ macro_rules! with_dollar_sign {
/// plan_err!("Error {:?}", val)
/// plan_err!("Error {val}")
/// plan_err!("Error {val:?}")
+///
+/// `NAME_ERR` - macro name for wrapping Err(DataFusionError::*)
+/// `NAME_DF_ERR` - macro name for wrapping DataFusionError::*. Needed to
keep backtrace opportunity
+/// in construction where DataFusionError::* used directly, like `map_err`,
`ok_or_else`, etc
macro_rules! make_error {
- ($NAME:ident, $ERR:ident) => {
+ ($NAME_ERR:ident, $NAME_DF_ERR: ident, $ERR:ident) => {
with_dollar_sign! {
($d:tt) => {
+ /// Macro wraps `$ERR` to add backtrace feature
+ #[macro_export]
+ macro_rules! $NAME_DF_ERR {
+ ($d($d args:expr),*) => {
+ DataFusionError::$ERR(format!("{}{}", format!($d($d
args),*), DataFusionError::get_back_trace()).into())
+ }
+ }
+
+ /// Macro wraps Err(`$ERR`) to add backtrace feature
#[macro_export]
- macro_rules! $NAME {
+ macro_rules! $NAME_ERR {
($d($d args:expr),*) => {
Err(DataFusionError::$ERR(format!("{}{}",
format!($d($d args),*), DataFusionError::get_back_trace()).into()))
}
@@ -493,16 +506,16 @@ macro_rules! make_error {
}
// Exposes a macro to create `DataFusionError::Plan`
-make_error!(plan_err, Plan);
+make_error!(plan_err, plan_datafusion_err, Plan);
// Exposes a macro to create `DataFusionError::Internal`
-make_error!(internal_err, Internal);
+make_error!(internal_err, internal_datafusion_err, Internal);
// Exposes a macro to create `DataFusionError::NotImplemented`
-make_error!(not_impl_err, NotImplemented);
+make_error!(not_impl_err, not_impl_datafusion_err, NotImplemented);
// Exposes a macro to create `DataFusionError::Execution`
-make_error!(exec_err, Execution);
+make_error!(exec_err, exec_datafusion_err, Execution);
// Exposes a macro to create `DataFusionError::SQL`
#[macro_export]
@@ -517,6 +530,7 @@ macro_rules! sql_err {
pub use exec_err as _exec_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
+pub use plan_err as _plan_err;
#[cfg(test)]
mod test {
diff --git a/datafusion/common/src/functional_dependencies.rs
b/datafusion/common/src/functional_dependencies.rs
index 324374f557..869709bc8d 100644
--- a/datafusion/common/src/functional_dependencies.rs
+++ b/datafusion/common/src/functional_dependencies.rs
@@ -23,6 +23,7 @@ use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;
+use crate::error::_plan_err;
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
use sqlparser::ast::TableConstraint;
@@ -95,18 +96,18 @@ impl Constraints {
Constraint::Unique(indices)
})
}
- TableConstraint::ForeignKey { .. } =>
Err(DataFusionError::Plan(
- "Foreign key constraints are not currently
supported".to_string(),
- )),
- TableConstraint::Check { .. } => Err(DataFusionError::Plan(
- "Check constraints are not currently
supported".to_string(),
- )),
- TableConstraint::Index { .. } => Err(DataFusionError::Plan(
- "Indexes are not currently supported".to_string(),
- )),
- TableConstraint::FulltextOrSpatial { .. } =>
Err(DataFusionError::Plan(
- "Indexes are not currently supported".to_string(),
- )),
+ TableConstraint::ForeignKey { .. } => {
+ _plan_err!("Foreign key constraints are not currently
supported")
+ }
+ TableConstraint::Check { .. } => {
+ _plan_err!("Check constraints are not currently supported")
+ }
+ TableConstraint::Index { .. } => {
+ _plan_err!("Indexes are not currently supported")
+ }
+ TableConstraint::FulltextOrSpatial { .. } => {
+ _plan_err!("Indexes are not currently supported")
+ }
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index b6870ad2a8..0646243669 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -233,10 +233,10 @@ impl FromStr for ListingTableInsertMode {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
- _ => Err(DataFusionError::Plan(format!(
+ _ => plan_err!(
"Unknown or unsupported insert mode {s}. Supported options are
\
append_to_file, append_new_files, and error."
- ))),
+ ),
}
}
}
@@ -865,10 +865,10 @@ impl TableProvider for ListingTable {
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
- return Err(DataFusionError::Plan(format!(
+ return plan_err!(
"Cannot append {input_partitions} partitions to {}
files!",
file_groups.len()
- )));
+ );
}
crate::datasource::file_format::write::FileWriterMode::Append
@@ -919,9 +919,9 @@ impl TableProvider for ListingTable {
self.options().insert_mode,
ListingTableInsertMode::AppendToFile
) {
- return Err(DataFusionError::Plan(
- "Cannot insert into a sorted ListingTable with mode
append!".into(),
- ));
+ return plan_err!(
+ "Cannot insert into a sorted ListingTable with mode
append!"
+ );
}
// Multiple sort orders in outer vec are equivalent, so we pass
only the first one
let ordering = self
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index ca6da6cfa0..b393d363ca 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,7 @@ use crate::{
};
use datafusion_common::{
alias::AliasGenerator,
- exec_err, not_impl_err, plan_err,
+ exec_err, not_impl_err, plan_datafusion_err, plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
};
use datafusion_execution::registry::SerializerRegistry;
@@ -1577,17 +1577,14 @@ impl SessionState {
self.catalog_list
.catalog(&resolved_ref.catalog)
.ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"failed to resolve catalog: {}",
resolved_ref.catalog
- ))
+ )
})?
.schema(&resolved_ref.schema)
.ok_or_else(|| {
- DataFusionError::Plan(format!(
- "failed to resolve schema: {}",
- resolved_ref.schema
- ))
+ plan_datafusion_err!("failed to resolve schema: {}",
resolved_ref.schema)
})
}
@@ -1689,11 +1686,11 @@ impl SessionState {
dialect: &str,
) -> Result<datafusion_sql::parser::Statement> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake,
Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
- ))
+ )
})?;
let mut statements = DFParser::parse_sql_with_dialect(sql,
dialect.as_ref())?;
if statements.len() > 1 {
@@ -2022,7 +2019,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
self.tables
.get(&name)
.cloned()
- .ok_or_else(|| DataFusionError::Plan(format!("table '{name}' not
found")))
+ .ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
@@ -2069,9 +2066,7 @@ impl FunctionRegistry for SessionState {
let result = self.scalar_functions.get(name);
result.cloned().ok_or_else(|| {
- DataFusionError::Plan(format!(
- "There is no UDF named \"{name}\" in the registry"
- ))
+ plan_datafusion_err!("There is no UDF named \"{name}\" in the
registry")
})
}
@@ -2079,9 +2074,7 @@ impl FunctionRegistry for SessionState {
let result = self.aggregate_functions.get(name);
result.cloned().ok_or_else(|| {
- DataFusionError::Plan(format!(
- "There is no UDAF named \"{name}\" in the registry"
- ))
+ plan_datafusion_err!("There is no UDAF named \"{name}\" in the
registry")
})
}
@@ -2089,9 +2082,7 @@ impl FunctionRegistry for SessionState {
let result = self.window_functions.get(name);
result.cloned().ok_or_else(|| {
- DataFusionError::Plan(format!(
- "There is no UDWF named \"{name}\" in the registry"
- ))
+ plan_datafusion_err!("There is no UDWF named \"{name}\" in the
registry")
})
}
}
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 43a7dd8afe..de508327fa 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -35,7 +35,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_common::{downcast_value, ScalarValue};
+use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue};
use datafusion_common::{
internal_err, plan_err,
tree_node::{Transformed, TreeNode},
@@ -451,7 +451,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
);
RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
- DataFusionError::Plan(format!("Can not create statistics record batch:
{err}"))
+ plan_datafusion_err!("Can not create statistics record batch: {err}")
})
}
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 629011cb0f..bb991115ec 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
@@ -127,7 +127,7 @@ pub(crate) fn pushdown_sorts(
let plan = &requirements.plan;
let parent_required = requirements.required_ordering.as_deref();
const ERR_MSG: &str = "Expects parent requirement to contain something";
- let err = || DataFusionError::Plan(ERR_MSG.to_string());
+ let err = || plan_datafusion_err!("{}", ERR_MSG);
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let mut new_plan = plan.clone();
if !ordering_satisfy_requirement(
@@ -199,7 +199,7 @@ fn pushdown_requirement_to_children(
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
- let err = || DataFusionError::Plan(ERR_MSG.to_string());
+ let err = || plan_datafusion_err!("{}", ERR_MSG);
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 72d804d7bb..52c183b161 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -22,7 +22,7 @@ use std::{
use datafusion_common::{
config::{ConfigOptions, Extensions},
- DataFusionError, Result,
+ plan_datafusion_err, DataFusionError, Result,
};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
@@ -182,9 +182,7 @@ impl FunctionRegistry for TaskContext {
let result = self.scalar_functions.get(name);
result.cloned().ok_or_else(|| {
- DataFusionError::Plan(format!(
- "There is no UDF named \"{name}\" in the TaskContext"
- ))
+ plan_datafusion_err!("There is no UDF named \"{name}\" in the
TaskContext")
})
}
@@ -192,9 +190,7 @@ impl FunctionRegistry for TaskContext {
let result = self.aggregate_functions.get(name);
result.cloned().ok_or_else(|| {
- DataFusionError::Plan(format!(
- "There is no UDAF named \"{name}\" in the TaskContext"
- ))
+ plan_datafusion_err!("There is no UDAF named \"{name}\" in the
TaskContext")
})
}
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index 1c8f34ec1d..eaf4ff5ad8 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -20,7 +20,7 @@
use crate::utils;
use crate::{type_coercion::aggregates::*, Signature, TypeSignature,
Volatility};
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
use std::sync::Arc;
use std::{fmt, str::FromStr};
use strum_macros::EnumIter;
@@ -232,11 +232,14 @@ impl AggregateFunction {
// original errors are all related to wrong function signature
// aggregate them for better error message
.map_err(|_| {
- DataFusionError::Plan(utils::generate_signature_error_msg(
- &format!("{self}"),
- self.signature(),
- input_expr_types,
- ))
+ plan_datafusion_err!(
+ "{}",
+ utils::generate_signature_error_msg(
+ &format!("{self}"),
+ self.signature(),
+ input_expr_types,
+ )
+ )
})?;
match self {
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index 991ad03220..d7cfd9f420 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -25,7 +25,9 @@ use crate::{
TypeSignature, Volatility,
};
use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit};
-use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
+use datafusion_common::{
+ internal_err, plan_datafusion_err, plan_err, DataFusionError, Result,
+};
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
@@ -501,11 +503,14 @@ impl BuiltinScalarFunction {
// verify that this is a valid set of data types for this function
data_types(input_expr_types, &self.signature()).map_err(|_| {
- DataFusionError::Plan(utils::generate_signature_error_msg(
- &format!("{self}"),
- self.signature(),
- input_expr_types,
- ))
+ plan_datafusion_err!(
+ "{}",
+ utils::generate_signature_error_msg(
+ &format!("{self}"),
+ self.signature(),
+ input_expr_types,
+ )
+ )
})?;
// the return type of the built in function.
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 9651b377c5..025b74eb50 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -27,8 +27,8 @@ use crate::{LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{
- internal_err, plan_err, Column, DFField, DFSchema, DataFusionError,
ExprSchema,
- Result,
+ internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema,
+ DataFusionError, ExprSchema, Result,
};
use std::collections::HashMap;
use std::sync::Arc;
@@ -141,9 +141,7 @@ impl ExprSchemable for Expr {
Expr::Like { .. } | Expr::SimilarTo { .. } =>
Ok(DataType::Boolean),
Expr::Placeholder(Placeholder { data_type, .. }) => {
data_type.clone().ok_or_else(|| {
- DataFusionError::Plan(
- "Placeholder type could not be resolved".to_owned(),
- )
+ plan_datafusion_err!("Placeholder type could not be
resolved")
})
}
Expr::Wildcard => {
diff --git a/datafusion/expr/src/field_util.rs
b/datafusion/expr/src/field_util.rs
index 23260ea9c2..96fa901c2c 100644
--- a/datafusion/expr/src/field_util.rs
+++ b/datafusion/expr/src/field_util.rs
@@ -18,7 +18,9 @@
//! Utility functions for complex field access
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{
+ plan_datafusion_err, plan_err, DataFusionError, Result, ScalarValue,
+};
/// Types of the field access expression of a nested type, such as `Field` or
`List`
pub enum GetFieldAccessSchema {
@@ -52,7 +54,7 @@ impl GetFieldAccessSchema {
)
} else {
let field = fields.iter().find(|f| f.name() == s);
- field.ok_or(DataFusionError::Plan(format!("Field
{s} not found in struct"))).map(|f| f.as_ref().clone())
+ field.ok_or(plan_datafusion_err!("Field {s} not
found in struct")).map(|f| f.as_ref().clone())
}
}
(DataType::Struct(_), _) => plan_err!(
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index c3f576195e..770f39be47 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -43,13 +43,13 @@ use crate::{
Expr, ExprSchemable, TableSource,
};
use arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion_common::plan_err;
use datafusion_common::UnnestOptions;
use datafusion_common::{
display::ToStringifiedPlan, Column, DFField, DFSchema, DFSchemaRef,
DataFusionError,
FileType, FunctionalDependencies, OwnedTableReference, Result, ScalarValue,
TableReference, ToDFSchema,
};
+use datafusion_common::{plan_datafusion_err, plan_err};
use std::any::Any;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
@@ -1075,9 +1075,9 @@ impl LogicalPlanBuilder {
self.plan.schema().clone(),
right.schema().clone(),
)?.ok_or_else(||
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"can't create join plan, join key should belong to
one input, error key: ({normalized_left_key},{normalized_right_key})"
- )))
+ ))
})
.collect::<Result<Vec<_>>>()?;
@@ -1255,13 +1255,13 @@ pub fn union(left_plan: LogicalPlan, right_plan:
LogicalPlan) -> Result<LogicalP
let data_type =
comparison_coercion(left_field.data_type(),
right_field.data_type())
.ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"UNION Column {} (type: {}) is not compatible with column {}
(type: {})",
right_field.name(),
right_field.data_type(),
left_field.name(),
left_field.data_type()
- ))
+ )
})?;
Ok(DFField::new(
diff --git a/datafusion/expr/src/type_coercion/binary.rs
b/datafusion/expr/src/type_coercion/binary.rs
index 19bb16651d..ba3c21a15d 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -24,7 +24,7 @@ use arrow::datatypes::{
DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE,
};
-use datafusion_common::Result;
+use datafusion_common::{plan_datafusion_err, Result};
use datafusion_common::{plan_err, DataFusionError};
use crate::Operator;
@@ -71,9 +71,9 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType)
-> Result<Signature>
Operator::IsDistinctFrom |
Operator::IsNotDistinctFrom => {
comparison_coercion(lhs,
rhs).map(Signature::comparison).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot infer common argument type for comparison
operation {lhs} {op} {rhs}"
- ))
+ )
})
}
Operator::And | Operator::Or => match (lhs, rhs) {
@@ -91,9 +91,9 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType)
-> Result<Signature>
Operator::RegexNotMatch |
Operator::RegexNotIMatch => {
regex_coercion(lhs, rhs).map(Signature::comparison).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot infer common argument type for regex operation
{lhs} {op} {rhs}"
- ))
+ )
})
}
Operator::BitwiseAnd
@@ -102,24 +102,24 @@ fn signature(lhs: &DataType, op: &Operator, rhs:
&DataType) -> Result<Signature>
| Operator::BitwiseShiftRight
| Operator::BitwiseShiftLeft => {
bitwise_coercion(lhs, rhs).map(Signature::uniform).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot infer common type for bitwise operation {lhs} {op}
{rhs}"
- ))
+ )
})
}
Operator::StringConcat => {
string_concat_coercion(lhs,
rhs).map(Signature::uniform).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot infer common string type for string concat
operation {lhs} {op} {rhs}"
- ))
+ )
})
}
Operator::AtArrow
| Operator::ArrowAt => {
array_coercion(lhs, rhs).map(Signature::uniform).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot infer common array type for arrow operation {lhs}
{op} {rhs}"
- ))
+ )
})
}
Operator::Plus |
@@ -154,9 +154,9 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType)
-> Result<Signature>
// Temporal arithmetic by first coercing to a common time
representation
// e.g. Date32 - Timestamp
let ret = get_result(&coerced, &coerced).map_err(|e| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot get result type for temporal operation
{coerced} {op} {coerced}: {e}"
- ))
+ )
})?;
Ok(Signature{
lhs: coerced.clone(),
@@ -166,9 +166,9 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType)
-> Result<Signature>
} else if let Some((lhs, rhs)) = math_decimal_coercion(lhs, rhs) {
// Decimal arithmetic, e.g. Decimal(10, 2) + Decimal(10, 0)
let ret = get_result(&lhs, &rhs).map_err(|e| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Cannot get result type for decimal operation {lhs}
{op} {rhs}: {e}"
- ))
+ )
})?;
Ok(Signature{
lhs,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 54a1ce348b..5fc5b5b3f9 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -24,8 +24,8 @@ use crate::{Cast, Expr, ExprSchemable, GroupingSet,
LogicalPlan, TryCast};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{
- internal_err, plan_err, Column, DFField, DFSchema, DFSchemaRef,
DataFusionError,
- Result, ScalarValue, TableReference,
+ internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema,
DFSchemaRef,
+ DataFusionError, Result, ScalarValue, TableReference,
};
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem,
WildcardAdditionalOptions};
use std::cmp::Ordering;
@@ -198,8 +198,8 @@ pub fn enumerate_grouping_sets(group_expr: Vec<Expr>) ->
Result<Vec<Expr>> {
grouping_sets.iter().map(|e| e.iter().collect()).collect()
}
Expr::GroupingSet(GroupingSet::Cube(group_exprs)) => {
- let grouping_sets =
- powerset(group_exprs).map_err(DataFusionError::Plan)?;
+ let grouping_sets = powerset(group_exprs)
+ .map_err(|e| plan_datafusion_err!("{}", e))?;
check_grouping_sets_size_limit(grouping_sets.len())?;
grouping_sets
}
diff --git a/datafusion/expr/src/window_function.rs
b/datafusion/expr/src/window_function.rs
index 1f36ebdd6b..e5b00c8f29 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -25,7 +25,7 @@ use crate::type_coercion::functions::data_types;
use crate::utils;
use crate::{AggregateUDF, Signature, TypeSignature, Volatility, WindowUDF};
use arrow::datatypes::DataType;
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
use std::sync::Arc;
use std::{fmt, str::FromStr};
use strum_macros::EnumIter;
@@ -192,11 +192,14 @@ impl BuiltInWindowFunction {
// original errors are all related to wrong function signature
// aggregate them for better error message
.map_err(|_| {
- DataFusionError::Plan(utils::generate_signature_error_msg(
- &format!("{self}"),
- self.signature(),
- input_expr_types,
- ))
+ plan_datafusion_err!(
+ "{}",
+ utils::generate_signature_error_msg(
+ &format!("{self}"),
+ self.signature(),
+ input_expr_types,
+ )
+ )
})?;
match self {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 3b86617385..bfdbec3901 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -24,8 +24,8 @@ use arrow::datatypes::{DataType, IntervalUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{RewriteRecursion, TreeNodeRewriter};
use datafusion_common::{
- exec_err, internal_err, plan_err, DFSchema, DFSchemaRef, DataFusionError,
Result,
- ScalarValue,
+ exec_err, internal_err, plan_datafusion_err, plan_err, DFSchema,
DFSchemaRef,
+ DataFusionError, Result, ScalarValue,
};
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
ScalarFunction,
@@ -162,11 +162,10 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
let new_plan = analyze_internal(&self.schema,
&subquery.subquery)?;
let expr_type = expr.get_type(&self.schema)?;
let subquery_type = new_plan.schema().field(0).data_type();
- let common_type = comparison_coercion(&expr_type,
subquery_type).ok_or(DataFusionError::Plan(
- format!(
+ let common_type = comparison_coercion(&expr_type,
subquery_type).ok_or(plan_datafusion_err!(
"expr type {expr_type:?} can't cast to
{subquery_type:?} in InSubquery"
),
- ))?;
+ )?;
let new_subquery = Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
@@ -218,9 +217,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
} else {
"LIKE"
};
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"There isn't a common type to coerce {left_type} and
{right_type} in {op_name} expression"
- ))
+ )
})?;
let expr = Box::new(expr.cast_to(&coerced_type,
&self.schema)?);
let pattern = Box::new(pattern.cast_to(&coerced_type,
&self.schema)?);
@@ -709,20 +708,20 @@ fn coerce_case_expression(case: Case, schema:
&DFSchemaRef) -> Result<Case> {
let coerced_type =
get_coerce_type_for_case_expression(&when_types,
Some(case_type));
coerced_type.ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Failed to coerce case ({case_type:?}) and when
({when_types:?}) \
to common types in CASE WHEN expression"
- ))
+ )
})
})
.transpose()?;
let then_else_coerce_type =
get_coerce_type_for_case_expression(&then_types,
else_type.as_ref()).ok_or_else(
|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Failed to coerce then ({then_types:?}) and else
({else_type:?}) \
to common types in CASE WHEN expression"
- ))
+ )
},
)?;
@@ -991,10 +990,13 @@ mod test {
None,
None,
));
- let err = Projection::try_new(vec![agg_expr], empty).err().unwrap();
+ let err = Projection::try_new(vec![agg_expr], empty)
+ .err()
+ .unwrap()
+ .strip_backtrace();
assert_eq!(
- "Plan(\"No function matches the given name and argument types
'AVG(Utf8)'. You might need to add explicit type casts.\\n\\tCandidate
functions:\\n\\tAVG(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64)\")",
- &format!("{err:?}")
+ "Error during planning: No function matches the given name and
argument types 'AVG(Utf8)'. You might need to add explicit type
casts.\n\tCandidate
functions:\n\tAVG(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64)",
+ err
);
Ok(())
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index ee7b37979d..4c5cd3ab28 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -18,7 +18,9 @@ use crate::optimizer::ApplyOrder;
use crate::utils::{conjunction, split_conjunction};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{internal_err, Column, DFSchema, DataFusionError,
Result};
+use datafusion_common::{
+ internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError,
Result,
+};
use datafusion_expr::expr::Alias;
use datafusion_expr::{
and,
@@ -609,7 +611,7 @@ impl OptimizerRule for PushDownFilter {
.map(|e| (*e).clone())
.collect::<Vec<_>>();
let new_predicate = conjunction(new_predicates).ok_or_else(|| {
- DataFusionError::Plan("at least one expression
exists".to_string())
+ plan_datafusion_err!("at least one expression exists")
})?;
let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index daaa16e055..1dbcf76173 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -41,7 +41,8 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
- exec_err, plan_err, DataFusionError, JoinType, Result, ScalarValue,
SharedResult,
+ exec_err, plan_datafusion_err, plan_err, DataFusionError, JoinType, Result,
+ ScalarValue, SharedResult,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
@@ -363,11 +364,7 @@ pub fn combine_join_ordering_equivalence_properties(
let left_maintains = maintains_input_order[0];
let right_maintains = maintains_input_order[1];
match (left_maintains, right_maintains) {
- (true, true) => {
- return Err(DataFusionError::Plan(
- "Cannot maintain ordering of both sides".to_string(),
- ))
- }
+ (true, true) => return plan_err!("Cannot maintain ordering of both
sides"),
(true, false) => {
new_properties.extend(left_oeq_properties.oeq_class().cloned());
// In this special case, right side ordering can be prefixed with
left side ordering.
@@ -1345,8 +1342,7 @@ pub fn prepare_sorted_exprs(
right_sort_exprs: &[PhysicalSortExpr],
) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> {
// Build the filter order for the left side
- let err =
- || DataFusionError::Plan("Filter does not include the child
order".to_owned());
+ let err = || plan_datafusion_err!("Filter does not include the child
order");
let left_temp_sorted_filter_expr = build_filter_input_order(
JoinSide::Left,
diff --git a/datafusion/proto/src/bytes/mod.rs
b/datafusion/proto/src/bytes/mod.rs
index d3ac33bab5..9377501499 100644
--- a/datafusion/proto/src/bytes/mod.rs
+++ b/datafusion/proto/src/bytes/mod.rs
@@ -24,7 +24,7 @@ use crate::physical_plan::{
};
use crate::protobuf;
use datafusion::physical_plan::functions::make_scalar_function;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::{
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan,
Volatility,
WindowUDF,
@@ -88,13 +88,13 @@ pub trait Serializeable: Sized {
impl Serializeable for Expr {
fn to_bytes(&self) -> Result<Bytes> {
let mut buffer = BytesMut::new();
- let protobuf: protobuf::LogicalExprNode = self.try_into().map_err(|e| {
- DataFusionError::Plan(format!("Error encoding expr as protobuf:
{e}"))
- })?;
+ let protobuf: protobuf::LogicalExprNode = self
+ .try_into()
+ .map_err(|e| plan_datafusion_err!("Error encoding expr as
protobuf: {e}"))?;
- protobuf.encode(&mut buffer).map_err(|e| {
- DataFusionError::Plan(format!("Error encoding protobuf as bytes:
{e}"))
- })?;
+ protobuf
+ .encode(&mut buffer)
+ .map_err(|e| plan_datafusion_err!("Error encoding protobuf as
bytes: {e}"))?;
let bytes: Bytes = buffer.into();
@@ -151,13 +151,11 @@ impl Serializeable for Expr {
bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self> {
- let protobuf = protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
- DataFusionError::Plan(format!("Error decoding expr as protobuf:
{e}"))
- })?;
+ let protobuf = protobuf::LogicalExprNode::decode(bytes)
+ .map_err(|e| plan_datafusion_err!("Error decoding expr as
protobuf: {e}"))?;
- logical_plan::from_proto::parse_expr(&protobuf, registry).map_err(|e| {
- DataFusionError::Plan(format!("Error parsing protobuf into Expr:
{e}"))
- })
+ logical_plan::from_proto::parse_expr(&protobuf, registry)
+ .map_err(|e| plan_datafusion_err!("Error parsing protobuf into
Expr: {e}"))
}
}
@@ -173,9 +171,9 @@ pub fn logical_plan_to_json(plan: &LogicalPlan) ->
Result<String> {
let extension_codec = DefaultLogicalExtensionCodec {};
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan,
&extension_codec)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing
plan: {e}")))?;
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{e}")))
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
/// Serialize a LogicalPlan as bytes, using the provided extension codec
@@ -186,9 +184,9 @@ pub fn logical_plan_to_bytes_with_extension_codec(
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan,
extension_codec)?;
let mut buffer = BytesMut::new();
- protobuf.encode(&mut buffer).map_err(|e| {
- DataFusionError::Plan(format!("Error encoding protobuf as bytes: {e}"))
- })?;
+ protobuf
+ .encode(&mut buffer)
+ .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes:
{e}"))?;
Ok(buffer.into())
}
@@ -196,7 +194,7 @@ pub fn logical_plan_to_bytes_with_extension_codec(
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) ->
Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{e}")))?;
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultLogicalExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec)
}
@@ -216,9 +214,8 @@ pub fn logical_plan_from_bytes_with_extension_codec(
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
- let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
- DataFusionError::Plan(format!("Error decoding expr as protobuf: {e}"))
- })?;
+ let protobuf = protobuf::LogicalPlanNode::decode(bytes)
+ .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf:
{e}"))?;
protobuf.try_into_logical_plan(ctx, extension_codec)
}
@@ -234,9 +231,9 @@ pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>)
-> Result<String> {
let extension_codec = DefaultPhysicalExtensionCodec {};
let protobuf =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan,
&extension_codec)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing
plan: {e}")))?;
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{e}")))
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
/// Serialize a PhysicalPlan as bytes, using the provided extension codec
@@ -247,9 +244,9 @@ pub fn physical_plan_to_bytes_with_extension_codec(
let protobuf =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan,
extension_codec)?;
let mut buffer = BytesMut::new();
- protobuf.encode(&mut buffer).map_err(|e| {
- DataFusionError::Plan(format!("Error encoding protobuf as bytes: {e}"))
- })?;
+ protobuf
+ .encode(&mut buffer)
+ .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes:
{e}"))?;
Ok(buffer.into())
}
@@ -260,7 +257,7 @@ pub fn physical_plan_from_json(
ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
- .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{e}")))?;
+ .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
}
@@ -280,8 +277,7 @@ pub fn physical_plan_from_bytes_with_extension_codec(
ctx: &SessionContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
- let protobuf = protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
- DataFusionError::Plan(format!("Error decoding expr as protobuf: {e}"))
- })?;
+ let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
+ .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf:
{e}"))?;
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index aa56c7b19d..2203016e08 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -31,8 +31,8 @@ use arrow::datatypes::{
};
use datafusion::execution::registry::FunctionRegistry;
use datafusion_common::{
- internal_err, Column, Constraint, Constraints, DFField, DFSchema,
DFSchemaRef,
- DataFusionError, OwnedTableReference, Result, ScalarValue,
+ internal_err, plan_datafusion_err, Column, Constraint, Constraints,
DFField,
+ DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result,
ScalarValue,
};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims,
array_element,
@@ -1740,9 +1740,7 @@ fn parse_vec_expr(
) -> Result<Option<Vec<Expr>>, Error> {
let res = p
.iter()
- .map(|elem| {
- parse_expr(elem, registry).map_err(|e|
DataFusionError::Plan(e.to_string()))
- })
+ .map(|elem| parse_expr(elem, registry).map_err(|e|
plan_datafusion_err!("{}", e)))
.collect::<Result<Vec<_>>>()?;
// Convert empty vector to None.
Ok((!res.is_empty()).then_some(res))
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 76d44186b5..f0999871f5 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -43,6 +43,7 @@ use datafusion::{
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
};
+use datafusion_common::plan_datafusion_err;
use datafusion_common::{
context, internal_err, not_impl_err, parsers::CompressionTypeVariant,
DataFusionError, OwnedTableReference, Result,
@@ -65,13 +66,13 @@ pub mod to_proto;
impl From<from_proto::Error> for DataFusionError {
fn from(e: from_proto::Error) -> Self {
- DataFusionError::Plan(e.to_string())
+ plan_datafusion_err!("{}", e)
}
}
impl From<to_proto::Error> for DataFusionError {
fn from(e: to_proto::Error) -> Self {
- DataFusionError::Plan(e.to_string())
+ plan_datafusion_err!("{}", e)
}
}
diff --git a/datafusion/sql/src/expr/arrow_cast.rs
b/datafusion/sql/src/expr/arrow_cast.rs
index 549d46c5e2..8c0184b6d1 100644
--- a/datafusion/sql/src/expr/arrow_cast.rs
+++ b/datafusion/sql/src/expr/arrow_cast.rs
@@ -21,7 +21,9 @@
use std::{fmt::Display, iter::Peekable, str::Chars, sync::Arc};
use arrow_schema::{DataType, Field, IntervalUnit, TimeUnit};
-use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_common::{
+ plan_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
+};
use datafusion_common::plan_err;
use datafusion_expr::{Expr, ExprSchemable};
@@ -98,9 +100,7 @@ pub fn parse_data_type(val: &str) -> Result<DataType> {
}
fn make_error(val: &str, msg: &str) -> DataFusionError {
- DataFusionError::Plan(
- format!("Unsupported type '{val}'. Must be a supported arrow type name
such as 'Int32' or 'Timestamp(Nanosecond, None)'. Error {msg}" )
- )
+ plan_datafusion_err!("Unsupported type '{val}'. Must be a supported arrow
type name such as 'Int32' or 'Timestamp(Nanosecond, None)'. Error {msg}" )
}
fn make_error_expected(val: &str, expected: &Token, actual: &Token) ->
DataFusionError {
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 3861b4848d..ab70853949 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -16,7 +16,9 @@
// under the License.
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
-use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError,
Result};
+use datafusion_common::{
+ not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError,
Result,
+};
use datafusion_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion_expr::function::suggest_valid_function;
use datafusion_expr::window_frame::regularize;
@@ -189,7 +191,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map(WindowFunction::WindowUDF)
})
.ok_or_else(|| {
- DataFusionError::Plan(format!("There is no window function
named {name}"))
+ plan_datafusion_err!("There is no window function named
{name}")
})
}
diff --git a/datafusion/sql/src/expr/identifier.rs
b/datafusion/sql/src/expr/identifier.rs
index 5e03f14e53..4033c28148 100644
--- a/datafusion/sql/src/expr/identifier.rs
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -17,7 +17,8 @@
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{
- internal_err, Column, DFField, DFSchema, DataFusionError, Result,
TableReference,
+ internal_err, plan_datafusion_err, Column, DFField, DFSchema,
DataFusionError,
+ Result, TableReference,
};
use datafusion_expr::{Case, Expr};
use sqlparser::ast::{Expr as SQLExpr, Ident};
@@ -36,9 +37,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.schema_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
- DataFusionError::Plan(format!(
- "variable {var_names:?} has no type information"
- ))
+ plan_datafusion_err!("variable {var_names:?} has no type
information")
})?;
Ok(Expr::ScalarVariable(ty, var_names))
} else {
diff --git a/datafusion/sql/src/expr/order_by.rs
b/datafusion/sql/src/expr/order_by.rs
index b32388f1bc..1dccc2376f 100644
--- a/datafusion/sql/src/expr/order_by.rs
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -16,7 +16,9 @@
// under the License.
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
-use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_common::{
+ plan_datafusion_err, plan_err, DFSchema, DataFusionError, Result,
+};
use datafusion_expr::expr::Sort;
use datafusion_expr::Expr;
use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
@@ -41,18 +43,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Value(Value::Number(v, _)) => {
let field_index = v
.parse::<usize>()
- .map_err(|err|
DataFusionError::Plan(err.to_string()))?;
+ .map_err(|err| plan_datafusion_err!("{}", err))?;
if field_index == 0 {
- return Err(DataFusionError::Plan(
- "Order by index starts at 1 for column
indexes".to_string(),
- ));
+ return plan_err!(
+ "Order by index starts at 1 for column indexes"
+ );
} else if schema.fields().len() < field_index {
- return Err(DataFusionError::Plan(format!(
+ return plan_err!(
"Order by column out of bounds, specified: {},
max: {}",
field_index,
schema.fields().len()
- )));
+ );
}
let field = schema.field(field_index - 1);
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 059a32be44..5ae1e2001d 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -31,9 +31,9 @@ use arrow_schema::DataType;
use datafusion_common::file_options::StatementOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
- not_impl_err, plan_err, unqualified_field_not_found, Column, Constraints,
DFField,
- DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference,
Result,
- SchemaReference, TableReference, ToDFSchema,
+ not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found,
Column,
+ Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
+ OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema,
};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr::Placeholder;
@@ -980,9 +980,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut assign_map = assignments
.iter()
.map(|assign| {
- let col_name: &Ident = assign.id.iter().last().ok_or_else(|| {
- DataFusionError::Plan("Empty column id".to_string())
- })?;
+ let col_name: &Ident = assign
+ .id
+ .iter()
+ .last()
+ .ok_or_else(|| plan_datafusion_err!("Empty column id"))?;
// Validate that the assignment target column exists
table_schema.field_with_unqualified_name(&col_name.value)?;
Ok((col_name.value.clone(), assign.value.clone()))
@@ -1113,15 +1115,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if let ast::Expr::Value(Value::Placeholder(name)) = val {
let name =
name.replace('$', "").parse::<usize>().map_err(|_|
{
- DataFusionError::Plan(format!(
- "Can't parse placeholder: {name}"
- ))
+ plan_datafusion_err!("Can't parse placeholder:
{name}")
})? - 1;
let field = fields.get(idx).ok_or_else(|| {
- DataFusionError::Plan(format!(
+ plan_datafusion_err!(
"Placeholder ${} refers to a non existent
column",
idx + 1
- ))
+ )
})?;
let dt = field.field().data_type().clone();
let _ = prepare_param_data_types.insert(name, dt);
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 661890e125..d95598cc3d 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -1687,20 +1687,24 @@ fn select_order_by_multiple_index() {
#[test]
fn select_order_by_index_of_0() {
let sql = "SELECT id FROM person ORDER BY 0";
- let err = logical_plan(sql).expect_err("query should have failed");
+ let err = logical_plan(sql)
+ .expect_err("query should have failed")
+ .strip_backtrace();
assert_eq!(
- "Plan(\"Order by index starts at 1 for column indexes\")",
- format!("{err:?}")
+ "Error during planning: Order by index starts at 1 for column indexes",
+ err
);
}
#[test]
fn select_order_by_index_oob() {
let sql = "SELECT id FROM person ORDER BY 2";
- let err = logical_plan(sql).expect_err("query should have failed");
+ let err = logical_plan(sql)
+ .expect_err("query should have failed")
+ .strip_backtrace();
assert_eq!(
- "Plan(\"Order by column out of bounds, specified: 2, max: 1\")",
- format!("{err:?}")
+ "Error during planning: Order by column out of bounds, specified: 2,
max: 1",
+ err
);
}
@@ -2086,13 +2090,12 @@ fn union_values_with_no_alias() {
#[test]
fn union_with_incompatible_data_type() {
let sql = "SELECT interval '1 year 1 day' UNION ALL SELECT 1";
- let err = logical_plan(sql).expect_err("query should have failed");
+ let err = logical_plan(sql)
+ .expect_err("query should have failed")
+ .strip_backtrace();
assert_eq!(
- "Plan(\"UNION Column Int64(1) (type: Int64) is \
- not compatible with column IntervalMonthDayNano\
- (\\\"950737950189618795196236955648\\\") \
- (type: Interval(MonthDayNano))\")",
- format!("{err:?}")
+ "Error during planning: UNION Column Int64(1) (type: Int64) is not
compatible with column IntervalMonthDayNano(\"950737950189618795196236955648\")
(type: Interval(MonthDayNano))",
+ err
);
}
@@ -2195,10 +2198,12 @@ fn union_with_aliases() {
#[test]
fn union_with_incompatible_data_types() {
let sql = "SELECT 'a' a UNION ALL SELECT true a";
- let err = logical_plan(sql).expect_err("query should have failed");
+ let err = logical_plan(sql)
+ .expect_err("query should have failed")
+ .strip_backtrace();
assert_eq!(
- "Plan(\"UNION Column a (type: Boolean) is not compatible with column a
(type: Utf8)\")",
- format!("{err:?}")
+ "Error during planning: UNION Column a (type: Boolean) is not
compatible with column a (type: Utf8)",
+ err
);
}