This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 46cdb8c2dc Add `exec_err!` error macro (#7361)
46cdb8c2dc is described below
commit 46cdb8c2dc495e8063a0adc5c3f9ac82b136e72e
Author: comphead <[email protected]>
AuthorDate: Wed Aug 23 10:28:57 2023 -0700
Add `exec_err!` error macro (#7361)
* Add `exec_err!` error macro
* fmt
* fmt
* clippy
* fmt
---
benchmarks/src/clickbench.rs | 5 +-
benchmarks/src/tpch/run.rs | 5 +-
datafusion-cli/src/command.rs | 22 +++-----
datafusion/common/src/error.rs | 4 ++
datafusion/common/src/scalar.rs | 16 ++----
datafusion/core/src/catalog/mod.rs | 6 +-
datafusion/core/src/catalog/schema.rs | 13 ++---
datafusion/core/src/datasource/file_format/csv.rs | 8 +--
.../core/src/datasource/file_format/parquet.rs | 11 ++--
.../core/src/datasource/file_format/write.rs | 7 +--
.../datasource/physical_plan/file_scan_config.rs | 9 ++-
datafusion/core/src/execution/context.rs | 47 +++++----------
datafusion/core/src/physical_plan/insert.rs | 10 ++--
.../core/src/physical_plan/joins/hash_join.rs | 12 ++--
.../src/physical_plan/joins/nested_loop_join.rs | 8 +--
datafusion/core/src/physical_plan/joins/utils.rs | 6 +-
.../core/src/physical_plan/repartition/mod.rs | 3 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 6 +-
datafusion/core/src/physical_plan/stream.rs | 12 ++--
datafusion/core/src/physical_plan/union.rs | 10 +---
datafusion/core/src/physical_plan/unnest.rs | 16 ++----
.../windows/bounded_window_agg_exec.rs | 8 +--
datafusion/core/src/physical_planner.rs | 20 +++----
datafusion/core/tests/fifo.rs | 6 +-
.../tests/user_defined/user_defined_aggregates.rs | 8 +--
datafusion/execution/src/object_store.rs | 6 +-
datafusion/expr/src/partition_evaluator.rs | 6 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 9 +--
.../src/simplify_expressions/expr_simplifier.rs | 6 +-
.../src/aggregate/approx_percentile_cont.rs | 8 +--
.../src/aggregate/array_agg_ordered.rs | 19 +++----
datafusion/physical-expr/src/aggregate/utils.rs | 14 ++---
datafusion/physical-expr/src/array_expressions.rs | 8 +--
.../physical-expr/src/datetime_expressions.rs | 66 ++++++++--------------
datafusion/physical-expr/src/expressions/case.rs | 5 +-
.../src/expressions/get_indexed_field.rs | 41 +++++++-------
.../physical-expr/src/expressions/in_list.rs | 7 ++-
datafusion/physical-expr/src/functions.rs | 18 ++----
.../src/intervals/interval_aritmetic.rs | 16 ++----
datafusion/physical-expr/src/planner.rs | 7 +--
datafusion/physical-expr/src/string_expressions.rs | 16 ++----
datafusion/physical-expr/src/struct_expressions.rs | 6 +-
.../physical-expr/src/unicode_expressions.rs | 26 ++++-----
datafusion/physical-expr/src/window/nth_value.rs | 6 +-
datafusion/physical-expr/src/window/rank.rs | 8 +--
datafusion/sql/src/utils.rs | 10 +---
datafusion/sqllogictest/bin/sqllogictests.rs | 7 +--
datafusion/substrait/src/logical_plan/producer.rs | 6 +-
48 files changed, 237 insertions(+), 362 deletions(-)
diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs
index e5cdd356fe..98ef6dd805 100644
--- a/benchmarks/src/clickbench.rs
+++ b/benchmarks/src/clickbench.rs
@@ -18,6 +18,7 @@
use std::{path::PathBuf, time::Instant};
use datafusion::{
+ common::exec_err,
error::{DataFusionError, Result},
prelude::SessionContext,
};
@@ -123,9 +124,9 @@ impl RunOpt {
/// Returns the text of query `query_id`
fn get_query(&self, query_id: usize) -> Result<String> {
if query_id > CLICKBENCH_QUERY_END_ID {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Invalid query id {query_id}. Must be between
{CLICKBENCH_QUERY_START_ID} and {CLICKBENCH_QUERY_END_ID}"
- )));
+ );
}
let path = self.queries_path.as_path();
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index a57a2eb7aa..cf5c7b9f67 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -299,6 +299,7 @@ struct QueryResult {
#[cfg(feature = "ci")]
mod tests {
use super::*;
+ use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use std::path::Path;
@@ -311,10 +312,10 @@ mod tests {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_|
"benchmarks/data".to_string());
if !Path::new(&path).exists() {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Benchmark data not found (set TPCH_DATA env var to override):
{}",
path
- )));
+ );
}
Ok(path)
}
diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs
index 5563c31bcc..f7f36b6f9d 100644
--- a/datafusion-cli/src/command.rs
+++ b/datafusion-cli/src/command.rs
@@ -25,6 +25,7 @@ use clap::ArgEnum;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use std::fs::File;
@@ -81,9 +82,7 @@ impl Command {
exec_from_lines(ctx, &mut BufReader::new(file),
print_options).await;
Ok(())
} else {
- Err(DataFusionError::Execution(
- "Required filename argument is missing".into(),
- ))
+ exec_err!("Required filename argument is missing")
}
}
Self::QuietMode(quiet) => {
@@ -101,9 +100,7 @@ impl Command {
}
Ok(())
}
- Self::Quit => Err(DataFusionError::Execution(
- "Unexpected quit, this should be handled outside".into(),
- )),
+ Self::Quit => exec_err!("Unexpected quit, this should be handled
outside"),
Self::ListFunctions => display_all_functions(),
Self::SearchFunctions(function) => {
if let Ok(func) = function.parse::<Function>() {
@@ -111,13 +108,12 @@ impl Command {
println!("{}", details);
Ok(())
} else {
- let msg = format!("{} is not a supported function",
function);
- Err(DataFusionError::Execution(msg))
+ exec_err!("{function} is not a supported function")
}
}
- Self::OutputFormat(_) => Err(DataFusionError::Execution(
- "Unexpected change output format, this should be handled
outside".into(),
- )),
+ Self::OutputFormat(_) => exec_err!(
+ "Unexpected change output format, this should be handled
outside"
+ ),
}
}
@@ -230,11 +226,11 @@ impl OutputFormat {
println!("Output format is {:?}.", print_options.format);
Ok(())
} else {
- Err(DataFusionError::Execution(format!(
+ exec_err!(
"{:?} is not a valid format type [possible values:
{:?}]",
format,
PrintFormat::value_variants()
- )))
+ )
}
}
}
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index f186abc1d2..3e16b045f4 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -454,8 +454,12 @@ make_error!(internal_err, Internal);
// Exposes a macro to create `DataFusionError::NotImplemented`
make_error!(not_impl_err, NotImplemented);
+// Exposes a macro to create `DataFusionError::Execution`
+make_error!(exec_err, Execution);
+
// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
+pub use exec_err as _exec_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 73b71722f9..4cf45afa4f 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -30,7 +30,7 @@ use crate::cast::{
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array,
as_struct_array,
};
use crate::delta::shift_months;
-use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
+use crate::error::{DataFusionError, Result, _exec_err, _internal_err,
_not_impl_err};
use arrow::buffer::NullBuffer;
use arrow::compute::nullif;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
@@ -645,9 +645,7 @@ macro_rules! primitive_checked_op {
if let Some(value) = (*a).$FUNCTION(*b) {
Ok(ScalarValue::$SCALAR(Some(value)))
} else {
- Err(DataFusionError::Execution(
- "Overflow while calculating ScalarValue.".to_string(),
- ))
+ _exec_err!("Overflow while calculating ScalarValue.")
}
}
}
@@ -659,9 +657,7 @@ macro_rules! primitive_checked_right {
if let Some(value) = $TERM.checked_neg() {
Ok(ScalarValue::$SCALAR(Some(value)))
} else {
- Err(DataFusionError::Execution(
- "Overflow while calculating ScalarValue.".to_string(),
- ))
+ _exec_err!("Overflow while calculating ScalarValue.")
}
};
($TERM:expr, $OPERATION:tt, $SCALAR:ident) => {
@@ -1383,9 +1379,9 @@ where
prior.add(Duration::microseconds(*v))
}
ScalarValue::DurationNanosecond(Some(v)) =>
prior.add(Duration::nanoseconds(*v)),
- other => Err(DataFusionError::Execution(format!(
- "DateIntervalExpr does not support non-interval type {other:?}"
- )))?,
+ other => {
+ _exec_err!("DateIntervalExpr does not support non-interval type
{other:?}")?
+ }
})
}
diff --git a/datafusion/core/src/catalog/mod.rs
b/datafusion/core/src/catalog/mod.rs
index d3f4ba8a39..5f3ca81dd7 100644
--- a/datafusion/core/src/catalog/mod.rs
+++ b/datafusion/core/src/catalog/mod.rs
@@ -25,7 +25,7 @@ pub use datafusion_sql::{ResolvedTableReference,
TableReference};
use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
-use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;
@@ -194,11 +194,11 @@ impl CatalogProvider for MemoryCatalogProvider {
let (_, removed) = self.schemas.remove(name).unwrap();
Ok(Some(removed))
}
- (false, false) => Err(DataFusionError::Execution(format!(
+ (false, false) => exec_err!(
"Cannot drop schema {} because other tables depend on it:
{}",
name,
itertools::join(table_names.iter(), ", ")
- ))),
+ ),
}
} else {
Ok(None)
diff --git a/datafusion/core/src/catalog/schema.rs
b/datafusion/core/src/catalog/schema.rs
index 552dccc41a..cd6e29b5f3 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -20,6 +20,7 @@
use async_trait::async_trait;
use dashmap::DashMap;
+use datafusion_common::exec_err;
use std::any::Any;
use std::sync::Arc;
@@ -47,18 +48,14 @@ pub trait SchemaProvider: Sync + Send {
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
- Err(DataFusionError::Execution(
- "schema provider does not support registering tables".to_owned(),
- ))
+ exec_err!("schema provider does not support registering tables")
}
/// If supported by the implementation, removes an existing table from
this schema and returns it.
/// If no table of that name exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
- Err(DataFusionError::Execution(
- "schema provider does not support deregistering tables".to_owned(),
- ))
+ exec_err!("schema provider does not support deregistering tables")
}
/// If supported by the implementation, checks the table exist in the
schema provider or not.
@@ -110,9 +107,7 @@ impl SchemaProvider for MemorySchemaProvider {
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
if self.table_exist(name.as_str()) {
- return Err(DataFusionError::Execution(format!(
- "The table {name} already exists"
- )));
+ return exec_err!("The table {name} already exists");
}
Ok(self.tables.insert(name, table))
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index c7b6852df7..488caced47 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
-use datafusion_common::{not_impl_err, DataFusionError};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
@@ -327,14 +327,12 @@ impl CsvFormat {
first_chunk = false;
} else {
if fields.len() != column_type_possibilities.len() {
- return Err(DataFusionError::Execution(
- format!(
+ return exec_err!(
"Encountered unequal lengths between records on
CSV file whilst inferring schema. \
Expected {} records, found {} records",
column_type_possibilities.len(),
fields.len()
- )
- ));
+ );
}
column_type_possibilities.iter_mut().zip(&fields).for_each(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 73928ce56c..e5bbe7c82a 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -28,7 +28,7 @@ use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
-use datafusion_common::{not_impl_err, plan_err, DataFusionError};
+use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
@@ -414,10 +414,7 @@ pub async fn fetch_parquet_metadata(
size_hint: Option<usize>,
) -> Result<ParquetMetaData> {
if meta.size < 8 {
- return Err(DataFusionError::Execution(format!(
- "file size of {} is less than footer",
- meta.size
- )));
+ return exec_err!("file size of {} is less than footer", meta.size);
}
// If a size hint is provided, read more than the minimum size
@@ -440,11 +437,11 @@ pub async fn fetch_parquet_metadata(
let length = decode_footer(&footer)?;
if meta.size < length + 8 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"file size of {} is less than footer + metadata {}",
meta.size,
length + 8
- )));
+ );
}
// Did not fetch the entire file metadata in the initial read, need to
make a second request
diff --git a/datafusion/core/src/datasource/file_format/write.rs
b/datafusion/core/src/datasource/file_format/write.rs
index 4006014812..901ccd4043 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -29,8 +29,7 @@ use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
-use datafusion_common::internal_err;
-use datafusion_common::{DataFusionError, FileCompressionType};
+use datafusion_common::{exec_err, internal_err, DataFusionError,
FileCompressionType};
use async_trait::async_trait;
use bytes::Bytes;
@@ -179,9 +178,7 @@ impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static,
Result<()>>> {
match &self.mode {
AbortMode::Put => Ok(async { Ok(()) }.boxed()),
- AbortMode::Append => Err(DataFusionError::Execution(
- "Cannot abort in append mode".to_string(),
- )),
+ AbortMode::Append => exec_err!("Cannot abort in append mode"),
AbortMode::MultiPart(MultiPart {
store,
multipart_id,
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index ffb4f0902a..c35283e4fe 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -33,7 +33,10 @@ use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, UInt16Type};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::tree_node::{TreeNode, VisitRecursion};
+use datafusion_common::{
+ exec_err,
+ tree_node::{TreeNode, VisitRecursion},
+};
use datafusion_common::{ColumnStatistics, Statistics};
use datafusion_physical_expr::LexOrdering;
@@ -331,11 +334,11 @@ impl PartitionColumnProjector {
self.projected_schema.fields().len() -
self.projected_partition_indexes.len();
if file_batch.columns().len() != expected_cols {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Unexpected batch schema from file, expected {} cols but got
{}",
expected_cols,
file_batch.columns().len()
- )));
+ );
}
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 586fff30ac..8fcc3583fb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,7 @@ use crate::{
};
use datafusion_common::{
alias::AliasGenerator,
- not_impl_err, plan_err,
+ exec_err, not_impl_err, plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
};
use datafusion_execution::registry::SerializerRegistry;
@@ -543,10 +543,7 @@ impl SessionContext {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
false => {
- return Err(DataFusionError::Execution(format!(
- "Table '{}' already exists",
- cmd.name
- )));
+ return exec_err!("Table '{}' already exists", cmd.name);
}
}
}
@@ -582,9 +579,9 @@ impl SessionContext {
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
- (true, true, Ok(_)) => Err(DataFusionError::Execution(
- "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
- )),
+ (true, true, Ok(_)) => {
+ exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'")
+ }
(_, _, Err(_)) => {
let df_schema = input.schema();
let schema = Arc::new(df_schema.as_ref().into());
@@ -599,9 +596,7 @@ impl SessionContext {
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
- (false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
- "Table '{name}' already exists"
- ))),
+ (false, false, Ok(_)) => exec_err!("Table '{name}' already
exists"),
}
}
@@ -629,9 +624,7 @@ impl SessionContext {
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
- (false, Ok(_)) => Err(DataFusionError::Execution(format!(
- "Table '{name}' already exists"
- ))),
+ (false, Ok(_)) => exec_err!("Table '{name}' already exists"),
}
}
@@ -663,11 +656,7 @@ impl SessionContext {
})?;
(catalog, tokens[1])
}
- _ => {
- return Err(DataFusionError::Execution(format!(
- "Unable to parse catalog from {schema_name}"
- )))
- }
+ _ => return exec_err!("Unable to parse catalog from
{schema_name}"),
};
let schema = catalog.schema(schema_name);
@@ -678,9 +667,7 @@ impl SessionContext {
catalog.register_schema(schema_name, schema)?;
self.return_empty_dataframe()
}
- (false, Some(_)) => Err(DataFusionError::Execution(format!(
- "Schema '{schema_name}' already exists"
- ))),
+ (false, Some(_)) => exec_err!("Schema '{schema_name}' already
exists"),
}
}
@@ -702,9 +689,7 @@ impl SessionContext {
.register_catalog(catalog_name, new_catalog);
self.return_empty_dataframe()
}
- (false, Some(_)) => Err(DataFusionError::Execution(format!(
- "Catalog '{catalog_name}' already exists"
- ))),
+ (false, Some(_)) => exec_err!("Catalog '{catalog_name}' already
exists"),
}
}
@@ -716,9 +701,7 @@ impl SessionContext {
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
- (_, _) => Err(DataFusionError::Execution(format!(
- "Table '{name}' doesn't exist."
- ))),
+ (_, _) => exec_err!("Table '{name}' doesn't exist."),
}
}
@@ -730,9 +713,7 @@ impl SessionContext {
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
- (_, _) => Err(DataFusionError::Execution(format!(
- "View '{name}' doesn't exist."
- ))),
+ (_, _) => exec_err!("View '{name}' doesn't exist."),
}
}
@@ -771,9 +752,7 @@ impl SessionContext {
&self,
schemaref: SchemaReference<'_>,
) -> Result<DataFrame> {
- Err(DataFusionError::Execution(format!(
- "Schema '{schemaref}' doesn't exist."
- )))
+ exec_err!("Schema '{schemaref}' doesn't exist.")
}
async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
diff --git a/datafusion/core/src/physical_plan/insert.rs
b/datafusion/core/src/physical_plan/insert.rs
index b8d652d1cd..8c03fb543f 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -36,7 +36,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion_common::{internal_err, DataFusionError};
+use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_execution::TaskContext;
/// `DataSink` implements writing streams of [`RecordBatch`]es to
@@ -285,18 +285,18 @@ fn check_not_null_contraits(
) -> Result<RecordBatch> {
for &index in column_indices {
if batch.num_columns() <= index {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Invalid batch column count {} expected > {}",
batch.num_columns(),
index
- )));
+ );
}
if batch.column(index).null_count() > 0 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Invalid batch column at '{}' has null but schema specifies
non-nullable",
index
- )));
+ );
}
}
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index ff341ae211..0928998ec8 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -76,7 +76,9 @@ use arrow::{
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
use datafusion_common::cast::{as_dictionary_array, as_string_array};
-use datafusion_common::{internal_err, plan_err, DataFusionError, JoinType,
Result};
+use datafusion_common::{
+ exec_err, internal_err, plan_err, DataFusionError, JoinType, Result,
+};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::OrderingEquivalenceProperties;
@@ -1328,9 +1330,9 @@ impl HashJoinStream {
self.join_metrics.output_rows.add(batch.num_rows());
Some(result)
}
- Err(err) =>
Some(Err(DataFusionError::Execution(format!(
- "Fail to build join indices in HashJoinExec,
error:{err}",
- )))),
+ Err(err) => Some(exec_err!(
+ "Fail to build join indices in HashJoinExec,
error:{err}"
+ )),
};
timer.done();
result
@@ -3061,7 +3063,7 @@ mod tests {
// right input stream returns one good batch and then one error.
// The error should be returned.
- let err = Err(DataFusionError::Execution("bad data
error".to_string()));
+ let err = exec_err!("bad data error");
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2",
&vec![]));
let on = vec![(
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index 60fdf452cf..74a26fbff6 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -38,7 +38,7 @@ use arrow::array::{
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
-use datafusion_common::{DataFusionError, Statistics};
+use datafusion_common::{exec_err, DataFusionError, Statistics};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_expr::JoinType;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr};
@@ -586,9 +586,9 @@ fn join_left_and_right_batch(
let mut left_indices_builder = UInt64Builder::new();
let mut right_indices_builder = UInt32Builder::new();
let left_right_indices = match indices_result {
- Err(err) => Err(DataFusionError::Execution(format!(
- "Fail to build join indices in NestedLoopJoinExec, error:{err}"
- ))),
+ Err(err) => {
+ exec_err!("Fail to build join indices in NestedLoopJoinExec,
error:{err}")
+ }
Ok(indices) => {
for (left_side, right_side) in indices {
left_indices_builder
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index 0b4a30da30..4f7b0023f4 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -41,7 +41,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
- plan_err, DataFusionError, JoinType, Result, ScalarValue, SharedResult,
+ exec_err, plan_err, DataFusionError, JoinType, Result, ScalarValue,
SharedResult,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
@@ -197,9 +197,7 @@ pub fn calculate_join_output_ordering(
};
let output_ordering = match (left_maintains, right_maintains) {
(true, true) => {
- return Err(DataFusionError::Execution(
- "Cannot maintain ordering of both sides".to_string(),
- ))
+ return exec_err!("Cannot maintain ordering of both sides");
}
(true, false) => {
// Special case, we can prefix ordering of right side with the
ordering of left side.
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index d0f109791e..b23b495674 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -905,6 +905,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_string_array;
+ use datafusion_common::exec_err;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use futures::FutureExt;
use std::collections::HashSet;
@@ -1114,7 +1115,7 @@ mod tests {
// input stream returns one good batch and then one error. The
// error should be returned.
- let err = Err(DataFusionError::Execution("bad data
error".to_string()));
+ let err = exec_err!("bad data error");
let schema = batch.schema();
let input = MockExec::new(vec![Ok(batch), err], schema);
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index e085dec90b..5d23e72fe7 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -35,7 +35,7 @@ use arrow::compute::{concat_batches, lexsort_to_indices,
take};
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
@@ -598,9 +598,7 @@ async fn spill_sorted_batches(
let handle = task::spawn_blocking(move || write_sorted(batches, path,
schema));
match handle.await {
Ok(r) => r,
- Err(e) => Err(DataFusionError::Execution(format!(
- "Error occurred while spilling {e}"
- ))),
+ Err(e) => exec_err!("Error occurred while spilling {e}"),
}
}
diff --git a/datafusion/core/src/physical_plan/stream.rs
b/datafusion/core/src/physical_plan/stream.rs
index 695ca35714..1147f28864 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -368,6 +368,7 @@ impl futures::Stream for ObservedStream {
mod test {
use super::*;
use arrow_schema::{DataType, Field, Schema};
+ use datafusion_common::exec_err;
use crate::test::exec::{
assert_strong_count_converges_to_zero, BlockingExec, MockExec,
PanicExec,
@@ -438,14 +439,9 @@ mod test {
let schema = schema();
// make an input that will error twice
- let error_stream = MockExec::new(
- vec![
- Err(DataFusionError::Execution("Test1".to_string())),
- Err(DataFusionError::Execution("Test2".to_string())),
- ],
- schema.clone(),
- )
- .with_use_task(false);
+ let error_stream =
+ MockExec::new(vec![exec_err!("Test1"), exec_err!("Test2")],
schema.clone())
+ .with_use_task(false);
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
builder.run_input(Arc::new(error_stream), 0, task_ctx.clone());
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index d57d3ddeb6..491d24c289 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -29,7 +29,7 @@ use arrow::{
datatypes::{Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_common::{internal_err, DFSchemaRef, DataFusionError};
+use datafusion_common::{exec_err, internal_err, DFSchemaRef, DataFusionError};
use futures::Stream;
use itertools::Itertools;
use log::{debug, trace, warn};
@@ -257,9 +257,7 @@ impl ExecutionPlan for UnionExec {
warn!("Error in Union: Partition {} not found", partition);
- Err(DataFusionError::Execution(format!(
- "Partition {partition} not found in Union"
- )))
+ exec_err!("Partition {partition} not found in Union")
}
fn metrics(&self) -> Option<MetricsSet> {
@@ -433,9 +431,7 @@ impl ExecutionPlan for InterleaveExec {
warn!("Error in InterleaveExec: Partition {} not found", partition);
- Err(DataFusionError::Execution(format!(
- "Partition {partition} not found in InterleaveExec"
- )))
+ exec_err!("Partition {partition} not found in InterleaveExec")
}
fn metrics(&self) -> Option<MetricsSet> {
diff --git a/datafusion/core/src/physical_plan/unnest.rs
b/datafusion/core/src/physical_plan/unnest.rs
index c97cd75558..69ac857429 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -29,7 +29,7 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::UnnestOptions;
use datafusion_common::{
- cast::as_primitive_array, not_impl_err, DataFusionError, Result,
+ cast::as_primitive_array, exec_err, not_impl_err, DataFusionError, Result,
};
use datafusion_execution::TaskContext;
use futures::Stream;
@@ -271,9 +271,7 @@ fn build_batch(
.unwrap();
unnest_batch(batch, schema, column, list_array)
}
- _ => Err(DataFusionError::Execution(format!(
- "Invalid unnest column {column}"
- ))),
+ _ => exec_err!("Invalid unnest column {column}"),
}
}
@@ -322,9 +320,7 @@ where
let indices = create_take_indices(list_lengths,
unnested_array.len());
batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
}
- dt => Err(DataFusionError::Execution(format!(
- "Unnest: unsupported indices type {dt}"
- ))),
+ dt => exec_err!("Unnest: unsupported indices type {dt}"),
}
}
@@ -452,11 +448,7 @@ where
DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
f.data_type()
}
- dt => {
- return Err(DataFusionError::Execution(format!(
- "Cannot unnest array of type {dt}"
- )))
- }
+ dt => return exec_err!("Cannot unnest array of type {dt}"),
};
if list_array.is_empty() {
diff --git
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 4889f667f3..2b4df9e9e4 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -31,7 +31,7 @@ use crate::physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
WindowExpr,
};
-use datafusion_common::{plan_err, Result};
+use datafusion_common::{exec_err, plan_err, Result};
use datafusion_execution::TaskContext;
use ahash::RandomState;
@@ -187,7 +187,7 @@ impl BoundedWindowAggExec {
if self.window_expr()[0].partition_by().len()
!= ordered_partition_by_indices.len()
{
- return Err(DataFusionError::Execution("All partition by
columns should have an ordering in Sorted mode.".to_string()));
+ return exec_err!("All partition by columns should have an
ordering in Sorted mode.");
}
Box::new(SortedSearch {
partition_by_sort_keys,
@@ -1128,9 +1128,9 @@ fn get_aggregate_result_out_column(
}
}
if running_length != len_to_show {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Generated row number should be {len_to_show}, it is
{running_length}"
- )));
+ );
}
result
.ok_or_else(|| DataFusionError::Execution("Should contain
something".to_string()))
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 599817a738..d0759d3b8d 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -73,7 +73,9 @@ use crate::{
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
-use datafusion_common::{internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue};
+use datafusion_common::{
+ exec_err, internal_err, not_impl_err, plan_err, DFSchema, ScalarValue,
+};
use datafusion_expr::expr::{
self, AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, ScalarUDF,
TryCast,
@@ -233,14 +235,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
}) => {
// TODO: Add support for filter and order by in AggregateUDF
if filter.is_some() {
- return Err(DataFusionError::Execution(
- "aggregate expression with filter is not
supported".to_string(),
- ));
+ return exec_err!("aggregate expression with filter is not
supported");
}
if order_by.is_some() {
- return Err(DataFusionError::Execution(
- "aggregate expression with order_by is not
supported".to_string(),
- ));
+ return exec_err!("aggregate expression with order_by is not
supported");
}
let mut names = Vec::with_capacity(args.len());
for e in args {
@@ -606,9 +604,9 @@ impl DefaultPhysicalPlanner {
let input_exec = self.create_initial_plan(input,
session_state).await?;
provider.insert_into(session_state, input_exec,
false).await
} else {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Table '{table_name}' does not exist"
- )));
+ );
}
}
LogicalPlan::Dml(DmlStatement {
@@ -623,9 +621,9 @@ impl DefaultPhysicalPlanner {
let input_exec = self.create_initial_plan(input,
session_state).await?;
provider.insert_into(session_state, input_exec,
true).await
} else {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"Table '{table_name}' does not exist"
- )));
+ );
}
}
LogicalPlan::Values(Values {
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index 76bc487eab..754389a614 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -28,7 +28,7 @@ mod unix_test {
prelude::{CsvReadOptions, SessionConfig, SessionContext},
test_util::{aggr_test_schema, arrow_test_data},
};
- use datafusion_common::{DataFusionError, Result};
+ use datafusion_common::{exec_err, DataFusionError, Result};
use futures::StreamExt;
use itertools::enumerate;
use nix::sys::stat;
@@ -58,7 +58,7 @@ mod unix_test {
let file_path = tmp_dir.path().join(file_name);
// Simulate an infinite environment via a FIFO file
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
- Err(DataFusionError::Execution(e.to_string()))
+ exec_err!("{}", e)
} else {
Ok(file_path)
}
@@ -81,7 +81,7 @@ mod unix_test {
continue;
}
}
- return Err(DataFusionError::Execution(e.to_string()));
+ return exec_err!("{}", e);
}
Ok(())
}
diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs
b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
index 1e8299bfbd..041d4db4c5 100644
--- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs
+++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
@@ -40,7 +40,9 @@ use datafusion::{
prelude::SessionContext,
scalar::ScalarValue,
};
-use datafusion_common::{assert_contains, cast::as_primitive_array,
DataFusionError};
+use datafusion_common::{
+ assert_contains, cast::as_primitive_array, exec_err, DataFusionError,
+};
/// Test to show the contents of the setup
#[tokio::test]
@@ -344,9 +346,7 @@ impl Accumulator for TimeSum {
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if self.test_state.error_on_retract_batch() {
- return Err(DataFusionError::Execution(
- "Error in Retract Batch".to_string(),
- ));
+ return exec_err!("Error in Retract Batch");
}
self.test_state.set_retract_batch();
diff --git a/datafusion/execution/src/object_store.rs
b/datafusion/execution/src/object_store.rs
index 803f703452..7bedf0c865 100644
--- a/datafusion/execution/src/object_store.rs
+++ b/datafusion/execution/src/object_store.rs
@@ -20,7 +20,7 @@
//! and query data inside these systems.
use dashmap::DashMap;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{exec_err, DataFusionError, Result};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use std::sync::Arc;
@@ -40,9 +40,9 @@ impl ObjectStoreUrl {
let remaining = &parsed[url::Position::BeforePath..];
if !remaining.is_empty() && remaining != "/" {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"ObjectStoreUrl must only contain scheme and authority, got:
{remaining}"
- )));
+ );
}
// Always set path for consistency
diff --git a/datafusion/expr/src/partition_evaluator.rs
b/datafusion/expr/src/partition_evaluator.rs
index 1af9f24c41..c9576dd604 100644
--- a/datafusion/expr/src/partition_evaluator.rs
+++ b/datafusion/expr/src/partition_evaluator.rs
@@ -18,7 +18,7 @@
//! Partition evaluation module
use arrow::array::ArrayRef;
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result,
ScalarValue};
use std::fmt::Debug;
use std::ops::Range;
@@ -109,9 +109,7 @@ pub trait PartitionEvaluator: Debug + Send {
/// etc)
fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
if self.uses_window_frame() {
- Err(DataFusionError::Execution(
- "Range should be calculated from window frame".to_string(),
- ))
+ exec_err!("Range should be calculated from window frame")
} else {
Ok(Range {
start: idx,
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 1ebe234840..ba4b5d7b17 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -24,7 +24,8 @@ use arrow::datatypes::{DataType, IntervalUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{RewriteRecursion, TreeNodeRewriter};
use datafusion_common::{
- internal_err, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
+ exec_err, internal_err, plan_err, DFSchema, DFSchemaRef, DataFusionError,
Result,
+ ScalarValue,
};
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
ScalarFunction,
@@ -444,11 +445,7 @@ fn coerce_scalar_range_aware(
// If type coercion fails, check if the largest type in family works:
if let Some(largest_type) = get_widest_type_in_family(target_type) {
coerce_scalar(largest_type, value).map_or_else(
- |_| {
- Err(DataFusionError::Execution(format!(
- "Cannot cast {value:?} to {target_type:?}"
- )))
- },
+ |_| exec_err!("Cannot cast {value:?} to {target_type:?}"),
|_| ScalarValue::try_from(target_type),
)
} else {
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 895432026b..29fd15cc3b 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -32,7 +32,7 @@ use arrow::{
};
use datafusion_common::tree_node::{RewriteRecursion, TreeNode,
TreeNodeRewriter};
use datafusion_common::{
- internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
+ exec_err, internal_err, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr::expr::{InList, InSubquery, ScalarFunction};
use datafusion_expr::{
@@ -317,10 +317,10 @@ impl<'a> ConstEvaluator<'a> {
match col_val {
ColumnarValue::Array(a) => {
if a.len() != 1 {
- Err(DataFusionError::Execution(format!(
+ exec_err!(
"Could not evaluate the expression, found a result of
length {}",
a.len()
- )))
+ )
} else {
Ok(ScalarValue::try_from_array(&a, 0)?)
}
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 1e2458365f..184cada1dc 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -28,8 +28,8 @@ use arrow::{
datatypes::{DataType, Field},
};
use datafusion_common::{
- downcast_value, internal_err, not_impl_err, plan_err, DataFusionError,
Result,
- ScalarValue,
+ downcast_value, exec_err, internal_err, not_impl_err, plan_err,
DataFusionError,
+ Result, ScalarValue,
};
use datafusion_expr::Accumulator;
use std::{any::Any, iter, sync::Arc};
@@ -394,9 +394,7 @@ impl Accumulator for ApproxPercentileAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
if self.digest.count() == 0.0 {
- return Err(DataFusionError::Execution(
- "aggregate function needs at least one non-null
element".to_string(),
- ));
+ return exec_err!("aggregate function needs at least one non-null
element");
}
let q = self.digest.estimate_quantile(self.percentile);
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 77868683e2..e65a2116d3 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -33,7 +33,7 @@ use arrow::datatypes::{DataType, Field};
use arrow_array::{Array, ListArray};
use arrow_schema::{Fields, SortOptions};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
-use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{exec_err, internal_err, DataFusionError, Result,
ScalarValue};
use datafusion_expr::Accumulator;
use itertools::izip;
@@ -236,9 +236,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
self.values = new_values;
self.ordering_values = new_orderings;
} else {
- return Err(DataFusionError::Execution(
- "Expects to receive a list array".to_string(),
- ));
+ return exec_err!("Expects to receive a list array");
}
Ok(())
}
@@ -291,17 +289,17 @@ impl OrderSensitiveArrayAggAccumulator {
if let ScalarValue::Struct(Some(orderings), _fields) =
struct_vals {
Ok(orderings)
} else {
- Err(DataFusionError::Execution(format!(
+ exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _)
but got:{:?}",
struct_vals.get_datatype()
- )))
+ )
}
}).collect::<Result<Vec<_>>>()
} else {
- Err(DataFusionError::Execution(format!(
+ exec_err!(
"Expects to receive ScalarValue::List(Some(..), _) but
got:{:?}",
in_data.get_datatype()
- )))
+ )
}
}
@@ -427,10 +425,9 @@ fn merge_ordered_arrays(
.zip(ordering_values.iter())
.all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
{
- return Err(DataFusionError::Execution(
+ return exec_err!(
"Expects values arguments and/or ordering_values arguments to have
same size"
- .to_string(),
- ));
+ );
}
let n_branch = values.len();
// For each branch we keep track of indices of next will be merged entry
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index 463d8fec18..3ed7905e29 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -23,7 +23,7 @@ use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION,
MIN_DECIMAL_FOR_EACH_PREC
use arrow_array::cast::AsArray;
use arrow_array::types::Decimal128Type;
use arrow_schema::{DataType, Field};
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;
@@ -85,9 +85,7 @@ impl Decimal128Averager {
})
} else {
// can't convert the lit decimal to the returned data type
- Err(DataFusionError::Execution(
- "Arithmetic Overflow in AvgAccumulator".to_string(),
- ))
+ exec_err!("Arithmetic Overflow in AvgAccumulator")
}
}
@@ -104,15 +102,11 @@ impl Decimal128Averager {
if new_value >= self.target_min && new_value <= self.target_max {
Ok(new_value)
} else {
- Err(DataFusionError::Execution(
- "Arithmetic Overflow in AvgAccumulator".to_string(),
- ))
+ exec_err!("Arithmetic Overflow in AvgAccumulator")
}
} else {
// can't convert the lit decimal to the returned data type
- Err(DataFusionError::Execution(
- "Arithmetic Overflow in AvgAccumulator".to_string(),
- ))
+ exec_err!("Arithmetic Overflow in AvgAccumulator")
}
}
}
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 98b14fdbc3..48c9210872 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -24,7 +24,7 @@ use arrow::datatypes::{DataType, Field, UInt64Type};
use arrow_buffer::NullBuffer;
use core::any::type_name;
use datafusion_common::cast::{as_generic_string_array, as_int64_array,
as_list_array};
-use datafusion_common::{internal_err, not_impl_err, plan_err, ScalarValue};
+use datafusion_common::{exec_err, internal_err, not_impl_err, plan_err,
ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use itertools::Itertools;
@@ -1056,11 +1056,7 @@ macro_rules! position {
i - 1
}
}
- None => {
- return Err(DataFusionError::Execution(
- "initial position must not be null".to_string(),
- ))
- }
+ None => return exec_err!("initial position must not be
null"),
};
match arr {
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs
b/datafusion/physical-expr/src/datetime_expressions.rs
index 98fc013c8e..26d9036efd 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -43,7 +43,8 @@ use datafusion_common::cast::{
as_timestamp_nanosecond_array, as_timestamp_second_array,
};
use datafusion_common::{
- internal_err, not_impl_err, DataFusionError, Result, ScalarType,
ScalarValue,
+ exec_err, internal_err, not_impl_err, DataFusionError, Result, ScalarType,
+ ScalarValue,
};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
@@ -267,9 +268,7 @@ fn date_trunc_coarse(granularity: &str, value: i64) ->
Result<i64> {
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month0(0)),
unsupported => {
- return Err(DataFusionError::Execution(format!(
- "Unsupported date_trunc granularity: {unsupported}"
- )));
+ return exec_err!("Unsupported date_trunc granularity:
{unsupported}");
}
};
// `with_x(0)` are infallible because `0` are always a valid
@@ -331,9 +330,7 @@ pub fn date_trunc(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = granularity
{
v.to_lowercase()
} else {
- return Err(DataFusionError::Execution(
- "Granularity of `date_trunc` must be non-null scalar
Utf8".to_string(),
- ));
+ return exec_err!("Granularity of `date_trunc` must be non-null
scalar Utf8");
};
Ok(match array {
@@ -402,9 +399,9 @@ pub fn date_trunc(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
}
}
_ => {
- return Err(DataFusionError::Execution(
- "second argument of `date_trunc` must be nanosecond timestamp
scalar or array".to_string(),
- ));
+ return exec_err!(
+ "second argument of `date_trunc` must be nanosecond timestamp
scalar or array"
+ );
}
})
}
@@ -484,9 +481,7 @@ pub fn date_bin(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
} else if args.len() == 3 {
date_bin_impl(&args[0], &args[1], &args[2])
} else {
- Err(DataFusionError::Execution(
- "DATE_BIN expected two or three arguments".to_string(),
- ))
+ exec_err!("DATE_BIN expected two or three arguments")
}
}
@@ -531,11 +526,7 @@ fn date_bin_impl(
match nanos {
Some(v) => Interval::Nanoseconds(v),
- _ => {
- return Err(DataFusionError::Execution(
- "DATE_BIN stride argument is too large".to_string(),
- ))
- }
+ _ => return exec_err!("DATE_BIN stride argument is too large"),
}
}
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
@@ -556,19 +547,15 @@ fn date_bin_impl(
.num_nanoseconds();
match nanos {
Some(v) => Interval::Nanoseconds(v),
- _ => {
- return Err(DataFusionError::Execution(
- "DATE_BIN stride argument is too
large".to_string(),
- ))
- }
+ _ => return exec_err!("DATE_BIN stride argument is too
large"),
}
}
}
ColumnarValue::Scalar(v) => {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"DATE_BIN expects stride argument to be an INTERVAL but got
{}",
v.get_datatype()
- )))
+ )
}
ColumnarValue::Array(_) => {
return not_impl_err!(
@@ -580,10 +567,10 @@ fn date_bin_impl(
let origin = match origin {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) =>
*v,
ColumnarValue::Scalar(v) => {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"DATE_BIN expects origin argument to be a TIMESTAMP with
nanosececond precision but got {}",
v.get_datatype()
- )))
+ )
}
ColumnarValue::Array(_) => return not_impl_err!(
"DATE_BIN only supports literal values for the origin argument,
not arrays"
@@ -594,9 +581,7 @@ fn date_bin_impl(
// Return error if stride is 0
if stride == 0 {
- return Err(DataFusionError::Execution(
- "DATE_BIN stride must be non-zero".to_string(),
- ));
+ return exec_err!("DATE_BIN stride must be non-zero");
}
let f_nanos = |x: Option<i64>| x.map(|x| stride_fn(stride, x, origin));
@@ -672,17 +657,16 @@ fn date_bin_impl(
ColumnarValue::Array(Arc::new(array))
}
_ => {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"DATE_BIN expects source argument to be a TIMESTAMP but
got {}",
array.data_type()
- )))
+ )
}
},
_ => {
- return Err(DataFusionError::Execution(
+ return exec_err!(
"DATE_BIN expects source argument to be a TIMESTAMP scalar or
array"
- .to_string(),
- ));
+ );
}
})
}
@@ -730,18 +714,14 @@ macro_rules! extract_date_part {
/// DATE_PART SQL function
pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
- return Err(DataFusionError::Execution(
- "Expected two arguments in DATE_PART".to_string(),
- ));
+ return exec_err!("Expected two arguments in DATE_PART");
}
let (date_part, array) = (&args[0], &args[1]);
let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) =
date_part {
v
} else {
- return Err(DataFusionError::Execution(
- "First argument of `DATE_PART` must be non-null scalar
Utf8".to_string(),
- ));
+ return exec_err!("First argument of `DATE_PART` must be non-null
scalar Utf8");
};
let is_scalar = matches!(array, ColumnarValue::Scalar(_));
@@ -766,9 +746,7 @@ pub fn date_part(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
"microsecond" => extract_date_part!(&array, micros),
"nanosecond" => extract_date_part!(&array, nanos),
"epoch" => extract_date_part!(&array, epoch),
- _ => Err(DataFusionError::Execution(format!(
- "Date part '{date_part}' not supported"
- ))),
+ _ => exec_err!("Date part '{date_part}' not supported"),
}?;
Ok(if is_scalar {
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index 8005cc7274..4e37dc4960 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -28,6 +28,7 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, eq_dyn, is_null, not, or, prep_null_mask_filter};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion_common::exec_err;
use datafusion_common::{cast::as_boolean_array, internal_err, DataFusionError,
Result};
use datafusion_expr::ColumnarValue;
@@ -86,9 +87,7 @@ impl CaseExpr {
else_expr: Option<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
if when_then_expr.is_empty() {
- Err(DataFusionError::Execution(
- "There must be at least one WHEN clause".to_string(),
- ))
+ exec_err!("There must be at least one WHEN clause")
} else {
Ok(Self {
expr,
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index 596b8f414f..3a7ce568b3 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -19,6 +19,7 @@
use crate::PhysicalExpr;
use arrow::array::Array;
+use datafusion_common::exec_err;
use crate::array_expressions::{array_element, array_slice};
use crate::physical_expr::down_cast_any_ref;
@@ -166,17 +167,17 @@ impl PhysicalExpr for GetIndexedFieldExpr {
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => {
let as_struct_array = as_struct_array(&array)?;
match as_struct_array.column_by_name(k) {
- None => Err(DataFusionError::Execution(
- format!("get indexed field {k} not found in
struct"))),
+ None => exec_err!(
+ "get indexed field {k} not found in struct"),
Some(col) => Ok(ColumnarValue::Array(col.clone()))
}
}
- (DataType::Struct(_), name) => Err(DataFusionError::Execution(
- format!("get indexed field is only possible on struct with
utf8 indexes. \
- Tried with {name:?} index"))),
- (dt, name) => Err(DataFusionError::Execution(
- format!("get indexed field is only possible on
lists with int64 indexes or struct \
- with utf8 indexes. Tried {dt:?} with
{name:?} index"))),
+ (DataType::Struct(_), name) => exec_err!(
+ "get indexed field is only possible on struct with utf8
indexes. \
+ Tried with {name:?} index"),
+ (dt, name) => exec_err!(
+ "get indexed field is only possible on lists
with int64 indexes or struct \
+ with utf8 indexes. Tried {dt:?} with
{name:?} index"),
},
GetFieldAccessExpr::ListIndex{key} => {
let key = key.evaluate(batch)?.into_array(batch.num_rows());
@@ -184,12 +185,12 @@ impl PhysicalExpr for GetIndexedFieldExpr {
(DataType::List(_), DataType::Int64) =>
Ok(ColumnarValue::Array(array_element(&[
array, key
])?)),
- (DataType::List(_), key) => Err(DataFusionError::Execution(
- format!("get indexed field is only possible on
lists with int64 indexes. \
- Tried with {key:?} index"))),
- (dt, key) => Err(DataFusionError::Execution(
- format!("get indexed field is only
possible on lists with int64 indexes or struct \
- with utf8 indexes. Tried
{dt:?} with {key:?} index"))),
+ (DataType::List(_), key) => exec_err!(
+ "get indexed field is only possible on lists
with int64 indexes. \
+ Tried with {key:?} index"),
+ (dt, key) => exec_err!(
+ "get indexed field is only possible on
lists with int64 indexes or struct \
+ with utf8 indexes. Tried
{dt:?} with {key:?} index"),
}
},
GetFieldAccessExpr::ListRange{start, stop} => {
@@ -199,12 +200,12 @@ impl PhysicalExpr for GetIndexedFieldExpr {
(DataType::List(_), DataType::Int64, DataType::Int64) =>
Ok(ColumnarValue::Array(array_slice(&[
array, start, stop
])?)),
- (DataType::List(_), start, stop) =>
Err(DataFusionError::Execution(
- format!("get indexed field is only possible on lists
with int64 indexes. \
- Tried with {start:?} and {stop:?} indices"))),
- (dt, start, stop) => Err(DataFusionError::Execution(
- format!("get indexed field is only possible on lists
with int64 indexes or struct \
- with utf8 indexes. Tried {dt:?} with
{start:?} and {stop:?} indices"))),
+ (DataType::List(_), start, stop) => exec_err!(
+ "get indexed field is only possible on lists with
int64 indexes. \
+ Tried with {start:?} and {stop:?} indices"),
+ (dt, start, stop) => exec_err!(
+ "get indexed field is only possible on lists with
int64 indexes or struct \
+ with utf8 indexes. Tried {dt:?} with
{start:?} and {stop:?} indices"),
}
},
}
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index 6afc7c6d51..612060aeb6 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -18,6 +18,7 @@
//! Implementation of `InList` expressions: [`InListExpr`]
use ahash::RandomState;
+use datafusion_common::exec_err;
use std::any::Any;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
@@ -207,9 +208,9 @@ fn evaluate_list(
.iter()
.map(|expr| {
expr.evaluate(batch).and_then(|r| match r {
- ColumnarValue::Array(_) => Err(DataFusionError::Execution(
- "InList expression must evaluate to a scalar".to_string(),
- )),
+ ColumnarValue::Array(_) => {
+ exec_err!("InList expression must evaluate to a scalar")
+ }
// Flatten dictionary values
ColumnarValue::Scalar(ScalarValue::Dictionary(_, v)) => Ok(*v),
ColumnarValue::Scalar(s) => Ok(s),
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 420e40e9c4..e481687ae5 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -893,7 +893,7 @@ mod tests {
record_batch::RecordBatch,
};
use datafusion_common::cast::as_uint64_array;
- use datafusion_common::plan_err;
+ use datafusion_common::{exec_err, plan_err};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::functions::data_types;
use datafusion_expr::Signature;
@@ -1147,9 +1147,7 @@ mod tests {
test_function!(
Chr,
&[lit(ScalarValue::Int64(Some(0)))],
- Err(DataFusionError::Execution(
- "null character not permitted.".to_string(),
- )),
+ exec_err!("null character not permitted."),
&str,
Utf8,
StringArray
@@ -1157,9 +1155,7 @@ mod tests {
test_function!(
Chr,
&[lit(ScalarValue::Int64(Some(i64::MAX)))],
- Err(DataFusionError::Execution(
- "requested character too large for encoding.".to_string(),
- )),
+ exec_err!("requested character too large for encoding."),
&str,
Utf8,
StringArray
@@ -2364,9 +2360,7 @@ mod tests {
lit("~@~"),
lit(ScalarValue::Int64(Some(-1))),
],
- Err(DataFusionError::Execution(
- "field position must be greater than zero".to_string(),
- )),
+ exec_err!("field position must be greater than zero"),
&str,
Utf8,
StringArray
@@ -2667,9 +2661,7 @@ mod tests {
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Int64(Some(-1))),
],
- Err(DataFusionError::Execution(
- "negative substring length not allowed: substr(<str>, 1,
-1)".to_string(),
- )),
+ exec_err!("negative substring length not allowed: substr(<str>, 1,
-1)"),
&str,
Utf8,
StringArray
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index a985f34d22..f14c2d552d 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -28,7 +28,7 @@ use crate::intervals::rounding::alter_fp_rounding_mode;
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::DataType;
use arrow_array::ArrowNativeTypeOp;
-use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{exec_err, internal_err, DataFusionError, Result,
ScalarValue};
use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::Operator;
@@ -464,10 +464,7 @@ impl Interval {
diff as u64,
))
} else {
- Err(DataFusionError::Execution(format!(
- "Cardinality cannot be calculated for {:?}",
- self
- )))
+ exec_err!("Cardinality cannot be calculated for {:?}",
self)
}
}
// Ordering floating-point numbers according to their binary
representations
@@ -502,17 +499,14 @@ impl Interval {
self.upper.open,
upper.to_bits().sub_checked(lower.to_bits())?,
)),
- _ => Err(DataFusionError::Execution(format!(
+ _ => exec_err!(
"Cardinality cannot be calculated for the datatype
{:?}",
data_type
- ))),
+ ),
}
}
// If the cardinality cannot be calculated anyway, give an error.
- _ => Err(DataFusionError::Execution(format!(
- "Cardinality cannot be calculated for {:?}",
- self
- ))),
+ _ => exec_err!("Cardinality cannot be calculated for {:?}", self),
}
}
diff --git a/datafusion/physical-expr/src/planner.rs
b/datafusion/physical-expr/src/planner.rs
index 65d4c022ba..9a74c2ca64 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -26,7 +26,8 @@ use crate::{
};
use arrow::datatypes::Schema;
use datafusion_common::{
- internal_err, not_impl_err, plan_err, DFSchema, DataFusionError, Result,
ScalarValue,
+ exec_err, internal_err, not_impl_err, plan_err, DFSchema, DataFusionError,
Result,
+ ScalarValue,
};
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction, ScalarUDF};
use datafusion_expr::{
@@ -198,9 +199,7 @@ pub fn create_physical_expr(
case_insensitive,
}) => {
if escape_char.is_some() {
- return Err(DataFusionError::Execution(
- "LIKE does not support escape_char".to_string(),
- ));
+ return exec_err!("LIKE does not support escape_char");
}
let physical_expr = create_physical_expr(
expr,
diff --git a/datafusion/physical-expr/src/string_expressions.rs
b/datafusion/physical-expr/src/string_expressions.rs
index 39a21a0e44..e6a3d5c331 100644
--- a/datafusion/physical-expr/src/string_expressions.rs
+++ b/datafusion/physical-expr/src/string_expressions.rs
@@ -32,7 +32,7 @@ use datafusion_common::{
cast::{
as_generic_string_array, as_int64_array, as_primitive_array,
as_string_array,
},
- ScalarValue,
+ exec_err, ScalarValue,
};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
@@ -191,15 +191,13 @@ pub fn chr(args: &[ArrayRef]) -> Result<ArrayRef> {
integer
.map(|integer| {
if integer == 0 {
- Err(DataFusionError::Execution(
- "null character not permitted.".to_string(),
- ))
+ exec_err!("null character not permitted.")
} else {
match core::char::from_u32(integer as u32) {
Some(integer) => Ok(integer.to_string()),
- None => Err(DataFusionError::Execution(
- "requested character too large for
encoding.".to_string(),
- )),
+ None => {
+ exec_err!("requested character too large for
encoding.")
+ }
}
}
})
@@ -474,9 +472,7 @@ pub fn split_part<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|((string, delimiter), n)| match (string, delimiter, n) {
(Some(string), Some(delimiter), Some(n)) => {
if n <= 0 {
- Err(DataFusionError::Execution(
- "field position must be greater than zero".to_string(),
- ))
+ exec_err!("field position must be greater than zero")
} else {
let split_string: Vec<&str> =
string.split(delimiter).collect();
match split_string.get(n as usize - 1) {
diff --git a/datafusion/physical-expr/src/struct_expressions.rs
b/datafusion/physical-expr/src/struct_expressions.rs
index ea4f8b862f..baa29d668e 100644
--- a/datafusion/physical-expr/src/struct_expressions.rs
+++ b/datafusion/physical-expr/src/struct_expressions.rs
@@ -19,16 +19,14 @@
use arrow::array::*;
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
- return Err(DataFusionError::Execution(
- "struct requires at least one argument".to_string(),
- ));
+ return exec_err!("struct requires at least one argument");
}
let vec: Vec<_> = args
diff --git a/datafusion/physical-expr/src/unicode_expressions.rs
b/datafusion/physical-expr/src/unicode_expressions.rs
index 92084cf433..e28700a25c 100644
--- a/datafusion/physical-expr/src/unicode_expressions.rs
+++ b/datafusion/physical-expr/src/unicode_expressions.rs
@@ -27,7 +27,7 @@ use arrow::{
};
use datafusion_common::{
cast::{as_generic_string_array, as_int64_array},
- internal_err, DataFusionError, Result,
+ exec_err, internal_err, DataFusionError, Result,
};
use hashbrown::HashMap;
use std::cmp::{max, Ordering};
@@ -102,9 +102,9 @@ pub fn lpad<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|(string, length)| match (string, length) {
(Some(string), Some(length)) => {
if length > i32::MAX as i64 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"lpad requested length {length} too large"
- )));
+ );
}
let length = if length < 0 { 0 } else { length as
usize };
@@ -139,9 +139,9 @@ pub fn lpad<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|((string, length), fill)| match (string, length, fill) {
(Some(string), Some(length), Some(fill)) => {
if length > i32::MAX as i64 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"lpad requested length {length} too large"
- )));
+ );
}
let length = if length < 0 { 0 } else { length as
usize };
@@ -178,9 +178,9 @@ pub fn lpad<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
Ok(Arc::new(result) as ArrayRef)
}
- other => Err(DataFusionError::Execution(format!(
+ other => exec_err!(
"lpad was called with {other} arguments. It requires at least 2
and at most 3."
- ))),
+ ),
}
}
@@ -245,9 +245,9 @@ pub fn rpad<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|(string, length)| match (string, length) {
(Some(string), Some(length)) => {
if length > i32::MAX as i64 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"rpad requested length {length} too large"
- )));
+ );
}
let length = if length < 0 { 0 } else { length as
usize };
@@ -281,9 +281,9 @@ pub fn rpad<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|((string, length), fill)| match (string, length, fill) {
(Some(string), Some(length), Some(fill)) => {
if length > i32::MAX as i64 {
- return Err(DataFusionError::Execution(format!(
+ return exec_err!(
"rpad requested length {length} too large"
- )));
+ );
}
let length = if length < 0 { 0 } else { length as
usize };
@@ -391,9 +391,9 @@ pub fn substr<T: OffsetSizeTrait>(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|((string, start), count)| match (string, start, count) {
(Some(string), Some(start), Some(count)) => {
if count < 0 {
- Err(DataFusionError::Execution(format!(
+ exec_err!(
"negative substring length not allowed:
substr(<str>, {start}, {count})"
- )))
+ )
} else {
let skip = max(0, start - 1);
let count = max(0, count + (if start < 1 {start -
1} else {0}));
diff --git a/datafusion/physical-expr/src/window/nth_value.rs
b/datafusion/physical-expr/src/window/nth_value.rs
index 0da04274e2..262a50969b 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -23,7 +23,7 @@ use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
-use datafusion_common::ScalarValue;
+use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::PartitionEvaluator;
@@ -77,9 +77,7 @@ impl NthValue {
n: u32,
) -> Result<Self> {
match n {
- 0 => Err(DataFusionError::Execution(
- "nth_value expect n to be > 0".to_owned(),
- )),
+ 0 => exec_err!("nth_value expect n to be > 0"),
_ => Ok(Self {
name: name.into(),
expr,
diff --git a/datafusion/physical-expr/src/window/rank.rs
b/datafusion/physical-expr/src/window/rank.rs
index 9eb442f8f9..e24bfee3c0 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -28,7 +28,7 @@ use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
@@ -156,9 +156,9 @@ impl PartitionEvaluator for RankEvaluator {
self.state.last_rank_boundary as u64 + 1,
))),
RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank
as u64))),
- RankType::Percent => Err(DataFusionError::Execution(
- "Can not execute PERCENT_RANK in a streaming
fashion".to_string(),
- )),
+ RankType::Percent => {
+ exec_err!("Can not execute PERCENT_RANK in a streaming
fashion")
+ }
}
}
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 27fd050394..28eaf241fa 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -21,7 +21,7 @@ use arrow_schema::{DataType, DECIMAL128_MAX_PRECISION,
DECIMAL_DEFAULT_SCALE};
use datafusion_common::tree_node::{Transformed, TreeNode};
use sqlparser::ast::Ident;
-use datafusion_common::{internal_err, plan_err};
+use datafusion_common::{exec_err, internal_err, plan_err};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction};
use datafusion_expr::expr_vec_fmt;
@@ -191,13 +191,9 @@ pub fn window_expr_common_partition_keys(window_exprs:
&[Expr]) -> Result<&[Expr
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
Ok(partition_by)
}
- expr => Err(DataFusionError::Execution(format!(
- "Impossibly got non-window expr {expr:?}"
- ))),
+ expr => exec_err!("Impossibly got non-window expr {expr:?}"),
},
- expr => Err(DataFusionError::Execution(format!(
- "Impossibly got non-window expr {expr:?}"
- ))),
+ expr => exec_err!("Impossibly got non-window expr {expr:?}"),
})
.collect::<Result<Vec<_>>>()?;
let result = all_partition_keys
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index d097d97fb7..fe38a9d8f8 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -26,7 +26,7 @@ use futures::stream::StreamExt;
use log::info;
use sqllogictest::strict_column_validator;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{exec_err, DataFusionError, Result};
const TEST_DIRECTORY: &str = "test_files/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
@@ -119,10 +119,7 @@ async fn run_tests() -> Result<()> {
for e in &errors {
println!("{e}");
}
- Err(DataFusionError::Execution(format!(
- "{} failures",
- errors.len()
- )))
+ exec_err!("{} failures", errors.len())
} else {
Ok(())
}
diff --git a/datafusion/substrait/src/logical_plan/producer.rs
b/datafusion/substrait/src/logical_plan/producer.rs
index 65fa7c7f9f..cf7ab0cc84 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -29,7 +29,7 @@ use datafusion::{
};
use datafusion::common::DFSchemaRef;
-use datafusion::common::{internal_err, not_impl_err};
+use datafusion::common::{exec_err, internal_err, not_impl_err};
#[allow(unused_imports)]
use datafusion::logical_expr::aggregate_function;
use datafusion::logical_expr::expr::{
@@ -571,9 +571,7 @@ fn to_substrait_sort_field(
sort_kind: Some(SortKind::Direction(sort_kind.into())),
})
}
- _ => Err(DataFusionError::Execution(
- "expects to receive sort expression".to_string(),
- )),
+ _ => exec_err!("expects to receive sort expression"),
}
}