This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c50268670f Add `internal_err` error macros. Part 2 (#7321)
c50268670f is described below
commit c50268670f8bee2022a9892a0c67073fc360352a
Author: comphead <[email protected]>
AuthorDate: Fri Aug 18 04:24:46 2023 -0700
Add `internal_err` error macros. Part 2 (#7321)
* Add `internal_err` error macros. Part 2
* fmt
---
datafusion/common/src/config.rs | 6 +-
datafusion/common/src/scalar.rs | 21 +-
.../core/src/datasource/default_table_source.rs | 6 +-
datafusion/core/src/datasource/file_format/csv.rs | 5 +-
.../core/src/datasource/file_format/file_type.rs | 10 +-
.../core/src/datasource/file_format/write.rs | 7 +-
datafusion/core/src/datasource/listing/table.rs | 6 +-
.../src/datasource/physical_plan/file_stream.rs | 6 +-
.../core/src/physical_optimizer/join_selection.rs | 6 +-
datafusion/core/src/physical_plan/analyze.rs | 4 +-
.../core/src/physical_plan/coalesce_partitions.rs | 6 +-
datafusion/core/src/physical_plan/insert.rs | 6 +-
.../core/src/physical_plan/joins/hash_join.rs | 30 +-
.../src/physical_plan/joins/sort_merge_join.rs | 4 +-
datafusion/core/src/physical_plan/limit.rs | 8 +-
datafusion/core/src/physical_plan/mod.rs | 6 +-
.../physical_plan/sorts/sort_preserving_merge.rs | 5 +-
datafusion/core/src/physical_plan/union.rs | 8 +-
.../src/physical_plan/windows/window_agg_exec.rs | 6 +-
datafusion/core/src/physical_planner.rs | 64 ++---
datafusion/expr/src/accumulator.rs | 8 +-
datafusion/expr/src/built_in_function.rs | 27 +-
datafusion/expr/src/expr.rs | 13 +-
datafusion/expr/src/expr_schema.rs | 21 +-
datafusion/expr/src/type_coercion/binary.rs | 6 +-
datafusion/expr/src/window_state.rs | 9 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 4 +-
.../optimizer/src/common_subexpr_eliminate.rs | 18 +-
datafusion/optimizer/src/push_down_filter.rs | 20 +-
.../src/simplify_expressions/expr_simplifier.rs | 8 +-
.../physical-expr/src/aggregate/approx_distinct.rs | 7 +-
.../physical-expr/src/aggregate/array_agg.rs | 6 +-
.../src/aggregate/array_agg_ordered.rs | 6 +-
datafusion/physical-expr/src/aggregate/average.rs | 4 +-
.../physical-expr/src/aggregate/bit_and_or_xor.rs | 4 +-
.../physical-expr/src/aggregate/count_distinct.rs | 6 +-
datafusion/physical-expr/src/aggregate/min_max.rs | 4 +-
datafusion/physical-expr/src/aggregate/stddev.rs | 6 +-
.../physical-expr/src/aggregate/sum_distinct.rs | 6 +-
datafusion/physical-expr/src/crypto_expressions.rs | 12 +-
datafusion/physical-expr/src/expressions/case.rs | 6 +-
.../physical-expr/src/expressions/negative.rs | 8 +-
datafusion/physical-expr/src/functions.rs | 94 ++++---
.../src/intervals/interval_aritmetic.rs | 4 +-
datafusion/physical-expr/src/math_expressions.rs | 44 +--
datafusion/physical-expr/src/physical_expr.rs | 6 +-
datafusion/physical-expr/src/string_expressions.rs | 6 +-
datafusion/physical-expr/src/window/lead_lag.rs | 6 +-
datafusion/proto/src/logical_plan/from_proto.rs | 8 +-
datafusion/proto/src/logical_plan/mod.rs | 22 +-
datafusion/proto/src/physical_plan/mod.rs | 13 +-
datafusion/sql/src/expr/mod.rs | 305 +++++++++++++++------
datafusion/sql/src/planner.rs | 3 +-
datafusion/sql/src/utils.rs | 6 +-
datafusion/substrait/src/logical_plan/producer.rs | 4 +-
55 files changed, 482 insertions(+), 468 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 38c530f2bd..5a32349c65 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -126,9 +126,9 @@ macro_rules! config_namespace {
$(
stringify!($field_name) => self.$field_name.set(rem,
value),
)*
- _ => Err(DataFusionError::Internal(
- format!(concat!("Config value \"{}\" not found on ",
stringify!($struct_name)), key)
- ))
+ _ => _internal_err!(
+ "Config value \"{}\" not found on {}", key,
stringify!($struct_name)
+ )
}
}
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 26185b3799..1dc27872ee 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -700,7 +700,7 @@ macro_rules! primitive_right {
};
($TERM:expr, /, $SCALAR:ident) => {
internal_err!(
- "Can not divide an uninitialized value to a non-floating point
value",
+ "Can not divide an uninitialized value to a non-floating point
value"
)
};
($TERM:expr, &, $SCALAR:ident) => {
@@ -722,11 +722,10 @@ macro_rules! primitive_right {
macro_rules! unsigned_subtraction_error {
($SCALAR:expr) => {{
- let msg = format!(
+ _internal_err!(
"Can not subtract a {} value from an uninitialized value",
$SCALAR
- );
- Err(DataFusionError::Internal(msg))
+ )
}};
}
@@ -1404,9 +1403,7 @@ where
DT_MODE => add_day_time(prior, interval as i64, sign),
MDN_MODE => add_m_d_nano(prior, interval, sign),
_ => {
- return Err(DataFusionError::Internal(
- "Undefined interval mode for interval
calculations".to_string(),
- ));
+ return _internal_err!("Undefined interval mode for interval
calculations");
}
})
}
@@ -2241,9 +2238,9 @@ impl ScalarValue {
// figure out the type based on the first element
let data_type = match scalars.peek() {
None => {
- return Err(DataFusionError::Internal(
- "Empty iterator passed to
ScalarValue::iter_to_array".to_string(),
- ));
+ return _internal_err!(
+ "Empty iterator passed to ScalarValue::iter_to_array"
+ );
}
Some(sv) => sv.get_datatype(),
};
@@ -3062,9 +3059,7 @@ impl ScalarValue {
Ok(ScalarValue::Decimal256(Some(value), precision, scale))
}
}
- _ => Err(DataFusionError::Internal(
- "Unsupported decimal type".to_string(),
- )),
+ _ => _internal_err!("Unsupported decimal type"),
}
}
diff --git a/datafusion/core/src/datasource/default_table_source.rs
b/datafusion/core/src/datasource/default_table_source.rs
index bd3e832804..58d0997bb6 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use crate::datasource::TableProvider;
use arrow::datatypes::SchemaRef;
-use datafusion_common::{Constraints, DataFusionError};
+use datafusion_common::{internal_err, Constraints, DataFusionError};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};
/// DataFusion default table source, wrapping TableProvider
@@ -91,8 +91,6 @@ pub fn source_as_provider(
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
- _ => Err(DataFusionError::Internal(
- "TableSource was not DefaultTableSource".to_string(),
- )),
+ _ => internal_err!("TableSource was not DefaultTableSource"),
}
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 59c4fedeff..7edc9ace22 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -632,6 +632,7 @@ mod tests {
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
+ use datafusion_common::internal_err;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
@@ -972,9 +973,7 @@ mod tests {
}
}
- Err(DataFusionError::Internal(
- "query contains no CsvExec".to_string(),
- ))
+ internal_err!("query contains no CsvExec")
}
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs
b/datafusion/core/src/datasource/file_format/file_type.rs
index 68967221ee..1d9fd06185 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -17,13 +17,13 @@
//! File type abstraction
-use crate::error::{DataFusionError, Result};
-
+use crate::common::internal_err;
use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use crate::error::{DataFusionError, Result};
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
@@ -291,9 +291,9 @@ impl FileType {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext,
c.get_ext())),
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match
c.variant {
UNCOMPRESSED => Ok(ext),
- _ => Err(DataFusionError::Internal(
- "FileCompressionType can be specified for CSV/JSON
FileType.".into(),
- )),
+ _ => internal_err!(
+ "FileCompressionType can be specified for CSV/JSON
FileType."
+ ),
},
}
}
diff --git a/datafusion/core/src/datasource/file_format/write.rs
b/datafusion/core/src/datasource/file_format/write.rs
index 3c005894f6..13546a7279 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -29,6 +29,7 @@ use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
use arrow_array::RecordBatch;
+use datafusion_common::internal_err;
use datafusion_common::DataFusionError;
use async_trait::async_trait;
@@ -331,13 +332,11 @@ pub(crate) async fn stateless_serialize_and_write_files(
per_thread_output: bool,
) -> Result<u64> {
if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) {
- return Err(DataFusionError::Internal(
- "per_thread_output is false, but got more than 1 writer!".into(),
- ));
+ return internal_err!("per_thread_output is false, but got more than 1
writer!");
}
let num_partitions = data.len();
if per_thread_output && (num_partitions != writers.len()) {
- return Err(DataFusionError::Internal("per_thread_output is true, but
did not get 1 writer for each output partition!".into()));
+ return internal_err!("per_thread_output is true, but did not get 1
writer for each output partition!");
}
let mut row_count = 0;
// Map errors to DatafusionError.
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 5cc31c8397..994823da19 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder,
SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
-use datafusion_common::{plan_err, project_schema, SchemaExt, ToDFSchema};
+use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt,
ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering,
PhysicalSortExpr};
@@ -195,9 +195,7 @@ impl ListingTableConfig {
options: Some(options),
})
}
- None => Err(DataFusionError::Internal(
- "No `ListingOptions` set for inferring schema".into(),
- )),
+ None => internal_err!("No `ListingOptions` set for inferring
schema"),
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 6ac073c34f..71b322de44 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -519,6 +519,7 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
#[cfg(test)]
mod tests {
use arrow_schema::Schema;
+ use datafusion_common::internal_err;
use datafusion_common::DataFusionError;
use super::*;
@@ -557,10 +558,7 @@ mod tests {
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
if self.error_opening_idx.contains(&idx) {
- Ok(futures::future::ready(Err(DataFusionError::Internal(
- "error opening".to_owned(),
- )))
- .boxed())
+ Ok(futures::future::ready(internal_err!("error
opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(Err(ArrowError::IoError(
"error scanning".to_owned(),
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index be75bf5cea..342e33f16c 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -545,9 +545,9 @@ fn swap_join_according_to_unboundedness(
(PartitionMode::CollectLeft, _) => {
swap_hash_join(hash_join, PartitionMode::CollectLeft)
}
- (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
- "Auto is not acceptable for unbounded input here.".to_string(),
- )),
+ (PartitionMode::Auto, _) => {
+ internal_err!("Auto is not acceptable for unbounded input here.")
+ }
}
}
diff --git a/datafusion/core/src/physical_plan/analyze.rs
b/datafusion/core/src/physical_plan/analyze.rs
index 132cd358ff..0e6edc6182 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -93,9 +93,7 @@ impl ExecutionPlan for AnalyzeExec {
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
- Err(DataFusionError::Internal(
- "Optimization not supported for ANALYZE".to_string(),
- ))
+ internal_err!("Optimization not supported for ANALYZE")
}
/// Get the output partitioning of this plan
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index da84a5898a..78cb7b201f 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -125,9 +125,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
let input_partitions =
self.input.output_partitioning().partition_count();
match input_partitions {
- 0 => Err(DataFusionError::Internal(
- "CoalescePartitionsExec requires at least one input
partition".to_owned(),
- )),
+ 0 => internal_err!(
+ "CoalescePartitionsExec requires at least one input partition"
+ ),
1 => {
// bypass any threading / metrics if there is a single
partition
self.input.execute(0, context)
diff --git a/datafusion/core/src/physical_plan/insert.rs
b/datafusion/core/src/physical_plan/insert.rs
index acbca834f2..b8d652d1cd 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -36,7 +36,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion_common::DataFusionError;
+use datafusion_common::{internal_err, DataFusionError};
use datafusion_execution::TaskContext;
/// `DataSink` implements writing streams of [`RecordBatch`]es to
@@ -232,9 +232,7 @@ impl ExecutionPlan for FileSinkExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
- return Err(DataFusionError::Internal(
- "FileSinkExec can only be called on partition 0!".into(),
- ));
+ return internal_err!("FileSinkExec can only be called on partition
0!");
}
let data = self.execute_all_input_streams(context.clone())?;
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index da0161e163..ff341ae211 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -966,9 +966,9 @@ pub fn equal_rows(
equal_rows_elem!(Time32MillisecondArray, l, r, left,
right, null_equals_null)
}
_ => {
- err = Some(Err(DataFusionError::Internal(
- "Unsupported data type in hasher".to_string(),
- )));
+ err = Some(internal_err!(
+ "Unsupported data type in hasher"
+ ));
false
}
}
@@ -980,9 +980,9 @@ pub fn equal_rows(
equal_rows_elem!(Time64NanosecondArray, l, r, left, right,
null_equals_null)
}
_ => {
- err = Some(Err(DataFusionError::Internal(
- "Unsupported data type in hasher".to_string(),
- )));
+ err = Some(internal_err!(
+ "Unsupported data type in hasher"
+ ));
false
}
}
@@ -1049,16 +1049,16 @@ pub fn equal_rows(
null_equals_null
)
} else {
- err = Some(Err(DataFusionError::Internal(
- "Inconsistent Decimal data type in hasher, the
scale should be same".to_string(),
- )));
+ err = Some(internal_err!(
+ "Inconsistent Decimal data type in hasher, the
scale should be same"
+ ));
false
}
}
_ => {
- err = Some(Err(DataFusionError::Internal(
- "Unsupported data type in hasher".to_string(),
- )));
+ err = Some(internal_err!(
+ "Unsupported data type in hasher"
+ ));
false
}
},
@@ -1148,9 +1148,9 @@ pub fn equal_rows(
}
_ => {
// should not happen
- err = Some(Err(DataFusionError::Internal(
- "Unsupported data type in hasher".to_string(),
- )));
+ err = Some(internal_err!(
+ "Unsupported data type in hasher"
+ ));
false
}
}
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 71d34359f3..2f60043eec 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -311,9 +311,7 @@ impl ExecutionPlan for SortMergeJoinExec {
self.sort_options.clone(),
self.null_equals_null,
)?)),
- _ => Err(DataFusionError::Internal(
- "SortMergeJoin wrong number of children".to_string(),
- )),
+ _ => internal_err!("SortMergeJoin wrong number of children"),
}
}
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 4d1766cd6e..87a07f8d46 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -170,9 +170,7 @@ impl ExecutionPlan for GlobalLimitExec {
// GlobalLimitExec requires a single input partition
if 1 != self.input.output_partitioning().partition_count() {
- return Err(DataFusionError::Internal(
- "GlobalLimitExec requires a single input partition".to_owned(),
- ));
+ return internal_err!("GlobalLimitExec requires a single input
partition");
}
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -331,9 +329,7 @@ impl ExecutionPlan for LocalLimitExec {
children[0].clone(),
self.fetch,
))),
- _ => Err(DataFusionError::Internal(
- "LocalLimitExec wrong number of children".to_string(),
- )),
+ _ => internal_err!("LocalLimitExec wrong number of children"),
}
}
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 4c7c9c809b..1aa6baaf9f 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -26,7 +26,7 @@ use self::{
use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
-pub use datafusion_common::{ColumnStatistics, Statistics};
+pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
use arrow::datatypes::SchemaRef;
@@ -232,9 +232,7 @@ pub fn with_new_children_if_necessary(
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let old_children = plan.children();
if children.len() != old_children.len() {
- Err(DataFusionError::Internal(
- "Wrong number of children".to_string(),
- ))
+ internal_err!("Wrong number of children")
} else if children.is_empty()
|| children
.iter()
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e90b67101e..006133eb48 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -219,10 +219,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
.register(&context.runtime_env().memory_pool);
match input_partitions {
- 0 => Err(DataFusionError::Internal(
+ 0 => internal_err!(
"SortPreservingMergeExec requires at least one input partition"
- .to_owned(),
- )),
+ ),
1 => {
// bypass if there is only one partition to merge (no metrics
in this case either)
let result = self.input.execute(0, context);
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index f95a11abd2..d57d3ddeb6 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -29,7 +29,7 @@ use arrow::{
datatypes::{Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_common::{DFSchemaRef, DataFusionError};
+use datafusion_common::{internal_err, DFSchemaRef, DataFusionError};
use futures::Stream;
use itertools::Itertools;
use log::{debug, trace, warn};
@@ -327,9 +327,9 @@ impl InterleaveExec {
let schema = union_schema(&inputs);
if !can_interleave(&inputs) {
- return Err(DataFusionError::Internal(String::from(
- "Not all InterleaveExec children have a consistent hash
partitioning",
- )));
+ return internal_err!(
+ "Not all InterleaveExec children have a consistent hash
partitioning"
+ );
}
Ok(InterleaveExec {
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 83979af2f4..68d1f33fcd 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -40,7 +40,7 @@ use arrow::{
};
use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
use datafusion_common::Result;
-use datafusion_common::{plan_err, DataFusionError};
+use datafusion_common::{internal_err, plan_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{OrderingEquivalenceProperties,
PhysicalSortRequirement};
use futures::stream::Stream;
@@ -326,9 +326,7 @@ impl WindowAggStream {
) -> Result<Self> {
// In WindowAggExec all partition by columns should be ordered.
if window_expr[0].partition_by().len() !=
ordered_partition_by_indices.len() {
- return Err(DataFusionError::Internal(
- "All partition by columns should have an ordering".to_string(),
- ));
+ return internal_err!("All partition by columns should have an
ordering");
}
Ok(Self {
schema,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 1b1b232e2d..bbddedcf55 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -356,21 +356,19 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
- Expr::Sort { .. } => Err(DataFusionError::Internal(
- "Create physical name does not support sort
expression".to_string(),
- )),
- Expr::Wildcard => Err(DataFusionError::Internal(
- "Create physical name does not support wildcard".to_string(),
- )),
- Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
- "Create physical name does not support qualified
wildcard".to_string(),
- )),
- Expr::Placeholder(_) => Err(DataFusionError::Internal(
- "Create physical name does not support placeholder".to_string(),
- )),
- Expr::OuterReferenceColumn(_, _) => Err(DataFusionError::Internal(
- "Create physical name does not support
OuterReferenceColumn".to_string(),
- )),
+ Expr::Sort { .. } => {
+ internal_err!("Create physical name does not support sort
expression")
+ }
+ Expr::Wildcard => internal_err!("Create physical name does not support
wildcard"),
+ Expr::QualifiedWildcard { .. } => {
+ internal_err!("Create physical name does not support qualified
wildcard")
+ }
+ Expr::Placeholder(_) => {
+ internal_err!("Create physical name does not support placeholder")
+ }
+ Expr::OuterReferenceColumn(_, _) => {
+ internal_err!("Create physical name does not support
OuterReferenceColumn")
+ }
}
}
@@ -657,9 +655,9 @@ impl DefaultPhysicalPlanner {
input, window_expr, ..
}) => {
if window_expr.is_empty() {
- return Err(DataFusionError::Internal(
- "Impossibly got empty window
expression".to_owned(),
- ));
+ return internal_err!(
+ "Impossibly got empty window expression"
+ );
}
let input_exec = self.create_initial_plan(input,
session_state).await?;
@@ -1254,21 +1252,21 @@ impl DefaultPhysicalPlanner {
))
}
LogicalPlan::DescribeTable(_) => {
- Err(DataFusionError::Internal(
- "Unsupported logical plan: DescribeTable must be root
of the plan".to_string(),
- ))
+ internal_err!(
+ "Unsupported logical plan: DescribeTable must be root
of the plan"
+ )
}
- LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
- "Unsupported logical plan: Explain must be root of the
plan".to_string(),
- )),
+ LogicalPlan::Explain(_) => internal_err!(
+ "Unsupported logical plan: Explain must be root of the
plan"
+ ),
LogicalPlan::Distinct(_) => {
- Err(DataFusionError::Internal(
- "Unsupported logical plan: Distinct should be replaced
to Aggregate".to_string(),
- ))
+ internal_err!(
+ "Unsupported logical plan: Distinct should be replaced
to Aggregate"
+ )
}
- LogicalPlan::Analyze(_) => Err(DataFusionError::Internal(
- "Unsupported logical plan: Analyze must be root of the
plan".to_string(),
- )),
+ LogicalPlan::Analyze(_) => internal_err!(
+ "Unsupported logical plan: Analyze must be root of the
plan"
+ ),
LogicalPlan::Extension(e) => {
let physical_inputs =
self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
@@ -1849,9 +1847,7 @@ pub fn create_physical_sort_expr(
},
})
} else {
- Err(DataFusionError::Internal(
- "Expects a sort expression".to_string(),
- ))
+ internal_err!("Expects a sort expression")
}
}
@@ -2511,7 +2507,7 @@ mod tests {
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- Err(DataFusionError::Internal("BOOM".to_string()))
+ internal_err!("BOOM")
}
}
/// An example extension node that doesn't do anything
diff --git a/datafusion/expr/src/accumulator.rs
b/datafusion/expr/src/accumulator.rs
index 9ac3dac5e5..32de88b3d9 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -18,7 +18,7 @@
//! Accumulator module contains the trait definition for aggregation
function's accumulators.
use arrow::array::ArrayRef;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use std::fmt::Debug;
/// Describes an aggregate functions's state.
@@ -178,9 +178,9 @@ pub trait Accumulator: Send + Sync + Debug {
/// entering the window, `[F, G, H]`.
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
// TODO add retract for all accumulators
- Err(DataFusionError::Internal(
- "Retract should be implemented for aggregate functions when used
with custom window frame queries".to_string()
- ))
+ internal_err!(
+ "Retract should be implemented for aggregate functions when used
with custom window frame queries"
+ )
}
/// Does the accumulator support incrementally updating its value
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index 35c117a12c..54ffc312a3 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -502,9 +502,7 @@ impl BuiltinScalarFunction {
DataType::List(_) =>
get_base_type(field.data_type()),
_ => Ok(data_type.to_owned()),
},
- _ => Err(DataFusionError::Internal(
- "Not reachable, data_type should be
List".to_string(),
- )),
+ _ => internal_err!("Not reachable, data_type should be
List"),
}
}
@@ -680,9 +678,9 @@ impl BuiltinScalarFunction {
Null => Null,
_ => {
// this error is internal as `data_types` should have
captured this.
- return Err(DataFusionError::Internal(
- "The encode function can only accept utf8 or
binary.".to_string(),
- ));
+ return internal_err!(
+ "The encode function can only accept utf8 or binary."
+ );
}
}),
BuiltinScalarFunction::Decode => Ok(match input_expr_types[0] {
@@ -693,9 +691,9 @@ impl BuiltinScalarFunction {
Null => Null,
_ => {
// this error is internal as `data_types` should have
captured this.
- return Err(DataFusionError::Internal(
- "The decode function can only accept utf8 or
binary.".to_string(),
- ));
+ return internal_err!(
+ "The decode function can only accept utf8 or binary."
+ );
}
}),
BuiltinScalarFunction::SplitPart => {
@@ -712,9 +710,9 @@ impl BuiltinScalarFunction {
Int8 | Int16 | Int32 | Int64 => Utf8,
_ => {
// this error is internal as `data_types` should have
captured this.
- return Err(DataFusionError::Internal(
- "The to_hex function can only accept
integers.".to_string(),
- ));
+ return internal_err!(
+ "The to_hex function can only accept integers."
+ );
}
}),
BuiltinScalarFunction::ToTimestamp => Ok(Timestamp(Nanosecond,
None)),
@@ -740,10 +738,9 @@ impl BuiltinScalarFunction {
Null => Null,
_ => {
// this error is internal as `data_types` should have
captured this.
- return Err(DataFusionError::Internal(
+ return internal_err!(
"The regexp_extract function can only accept strings."
- .to_string(),
- ));
+ );
}
}),
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index a0cfb6e1b0..94ef69eb79 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -27,6 +27,7 @@ use crate::window_frame;
use crate::window_function;
use crate::Operator;
use arrow::datatypes::DataType;
+use datafusion_common::internal_err;
use datafusion_common::{plan_err, Column, DataFusionError, Result,
ScalarValue};
use std::collections::HashSet;
use std::fmt;
@@ -1553,13 +1554,13 @@ fn create_name(e: &Expr) -> Result<String> {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
- Expr::Sort { .. } => Err(DataFusionError::Internal(
- "Create name does not support sort expression".to_string(),
- )),
+ Expr::Sort { .. } => {
+ internal_err!("Create name does not support sort expression")
+ }
Expr::Wildcard => Ok("*".to_string()),
- Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
- "Create name does not support qualified wildcard".to_string(),
- )),
+ Expr::QualifiedWildcard { .. } => {
+ internal_err!("Create name does not support qualified wildcard")
+ }
Expr::Placeholder(Placeholder { id, .. }) => Ok((*id).to_string()),
}
}
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index d7bc86158b..82f1bf8ed7 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -27,7 +27,8 @@ use crate::{LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{
- plan_err, Column, DFField, DFSchema, DataFusionError, ExprSchema, Result,
+ internal_err, plan_err, Column, DFField, DFSchema, DataFusionError,
ExprSchema,
+ Result,
};
use std::collections::HashMap;
use std::sync::Arc;
@@ -149,10 +150,9 @@ impl ExprSchemable for Expr {
// Wildcard do not really have a type and do not appear in
projections
Ok(DataType::Null)
}
- Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
+ Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical
query plan"
- .to_owned(),
- )),
+ ),
Expr::GroupingSet(_) => {
// grouping sets do not really have a type and do not appear
in projections
Ok(DataType::Null)
@@ -259,13 +259,12 @@ impl ExprSchemable for Expr {
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
Ok(expr.nullable(input_schema)? ||
pattern.nullable(input_schema)?)
}
- Expr::Wildcard => Err(DataFusionError::Internal(
- "Wildcard expressions are not valid in a logical query
plan".to_owned(),
- )),
- Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
+ Expr::Wildcard => internal_err!(
+ "Wildcard expressions are not valid in a logical query plan"
+ ),
+ Expr::QualifiedWildcard { .. } => internal_err!(
"QualifiedWildcard expressions are not valid in a logical
query plan"
- .to_owned(),
- )),
+ ),
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
field_for_index(expr, field, input_schema).map(|x|
x.is_nullable())
}
@@ -573,7 +572,7 @@ mod tests {
impl ExprSchema for MockExprSchema {
fn nullable(&self, _col: &Column) -> Result<bool> {
if self.error_on_nullable {
- Err(DataFusionError::Internal("nullable error".into()))
+ internal_err!("nullable error")
} else {
Ok(self.nullable)
}
diff --git a/datafusion/expr/src/type_coercion/binary.rs
b/datafusion/expr/src/type_coercion/binary.rs
index 6df4dc5e4b..fd456402dc 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -780,7 +780,7 @@ mod tests {
use arrow::datatypes::DataType;
use datafusion_common::Result;
- use datafusion_common::{assert_contains, DataFusionError};
+ use datafusion_common::{assert_contains, internal_err, DataFusionError};
use crate::Operator;
@@ -798,9 +798,7 @@ mod tests {
);
Ok(())
} else {
- Err(DataFusionError::Internal(
- "Coercion should have returned an
DataFusionError::Internal".to_string(),
- ))
+ internal_err!("Coercion should have returned an
DataFusionError::Internal")
}
}
diff --git a/datafusion/expr/src/window_state.rs
b/datafusion/expr/src/window_state.rs
index ade4e7e3b9..4ea9ecea5f 100644
--- a/datafusion/expr/src/window_state.rs
+++ b/datafusion/expr/src/window_state.rs
@@ -211,7 +211,7 @@ impl WindowFrameContext {
}
// ERRONEOUS FRAMES
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) =>
{
- return Err(DataFusionError::Internal("Rows should be
Uint".to_string()))
+ return internal_err!("Rows should be Uint")
}
};
let end = match window_frame.end_bound {
@@ -236,7 +236,7 @@ impl WindowFrameContext {
}
// ERRONEOUS FRAMES
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) =>
{
- return Err(DataFusionError::Internal("Rows should be
Uint".to_string()))
+ return internal_err!("Rows should be Uint")
}
};
Ok(Range { start, end })
@@ -522,10 +522,9 @@ impl WindowFrameStateGroups {
if let ScalarValue::UInt64(Some(value)) = delta {
*value as usize
} else {
- return Err(DataFusionError::Internal(
+ return internal_err!(
"Unexpectedly got a non-UInt64 value in a GROUPS mode
window frame"
- .to_string(),
- ));
+ );
}
} else {
0
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index b5d25eb3ed..e9d155d5d2 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -497,9 +497,7 @@ fn coerce_window_frame(
);
}
} else {
- return Err(DataFusionError::Internal(
- "ORDER BY column cannot be empty".to_string(),
- ));
+ return internal_err!("ORDER BY column cannot be empty");
}
}
WindowFrameUnits::Rows | WindowFrameUnits::Groups => &DataType::UInt64,
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 08b28567fb..dda1dfd8b2 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -25,7 +25,7 @@ use datafusion_common::tree_node::{
RewriteRecursion, TreeNode, TreeNodeRewriter, TreeNodeVisitor,
VisitRecursion,
};
use datafusion_common::{
- Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
+ internal_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
Result,
};
use datafusion_expr::expr::Alias;
use datafusion_expr::{
@@ -162,9 +162,7 @@ impl CommonSubexprEliminate {
Arc::new(new_input),
)?))
} else {
- Err(DataFusionError::Internal(
- "Failed to pop predicate expr".to_string(),
- ))
+ internal_err!("Failed to pop predicate expr")
}
}
@@ -265,9 +263,7 @@ impl CommonSubexprEliminate {
agg_exprs.push(expr.clone().alias(&id));
}
_ => {
- return Err(DataFusionError::Internal(
- "expr_set invalid state".to_string(),
- ));
+ return internal_err!("expr_set invalid state");
}
}
}
@@ -453,9 +449,7 @@ fn build_common_expr_project_plan(
project_exprs.push(expr.clone().alias(&id));
}
_ => {
- return Err(DataFusionError::Internal(
- "expr_set invalid state".to_string(),
- ));
+ return internal_err!("expr_set invalid state");
}
}
}
@@ -721,9 +715,7 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
Ok(RewriteRecursion::Skip)
}
}
- _ => Err(DataFusionError::Internal(
- "expr_set invalid state".to_string(),
- )),
+ _ => internal_err!("expr_set invalid state"),
}
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index e62ad7a996..d94c5b5bdb 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -18,7 +18,7 @@ use crate::optimizer::ApplyOrder;
use crate::utils::{conjunction, split_conjunction};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{Column, DFSchema, DataFusionError, Result};
+use datafusion_common::{internal_err, Column, DFSchema, DataFusionError,
Result};
use datafusion_expr::expr::Alias;
use datafusion_expr::{
and,
@@ -94,9 +94,7 @@ fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool,
bool)> {
JoinType::RightSemi | JoinType::RightAnti => Ok((false, true)),
},
LogicalPlan::CrossJoin(_) => Ok((true, true)),
- _ => Err(DataFusionError::Internal(
- "lr_is_preserved only valid for JOIN nodes".to_string(),
- )),
+ _ => internal_err!("lr_is_preserved only valid for JOIN nodes"),
}
}
@@ -114,12 +112,10 @@ fn on_lr_is_preserved(plan: &LogicalPlan) ->
Result<(bool, bool)> {
JoinType::LeftAnti => Ok((false, true)),
JoinType::RightAnti => Ok((true, false)),
},
- LogicalPlan::CrossJoin(_) => Err(DataFusionError::Internal(
- "on_lr_is_preserved cannot be applied to CROSSJOIN
nodes".to_string(),
- )),
- _ => Err(DataFusionError::Internal(
- "on_lr_is_preserved only valid for JOIN nodes".to_string(),
- )),
+ LogicalPlan::CrossJoin(_) => {
+ internal_err!("on_lr_is_preserved cannot be applied to CROSSJOIN
nodes")
+ }
+ _ => internal_err!("on_lr_is_preserved only valid for JOIN nodes"),
}
}
@@ -192,9 +188,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) ->
Result<bool> {
| Expr::AggregateUDF { .. }
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
- | Expr::GroupingSet(_) => Err(DataFusionError::Internal(
- "Unsupported predicate type".to_string(),
- )),
+ | Expr::GroupingSet(_) => internal_err!("Unsupported predicate type"),
})?;
Ok(is_evaluate)
}
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index b7e8612d53..895432026b 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -31,7 +31,9 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::tree_node::{RewriteRecursion, TreeNode,
TreeNodeRewriter};
-use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue};
+use datafusion_common::{
+ internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
+};
use datafusion_expr::expr::{InList, InSubquery, ScalarFunction};
use datafusion_expr::{
and, expr, lit, or, BinaryExpr, BuiltinScalarFunction, Case,
ColumnarValue, Expr,
@@ -208,9 +210,7 @@ impl<'a> TreeNodeRewriter for ConstEvaluator<'a> {
match self.can_evaluate.pop() {
Some(true) => Ok(Expr::Literal(self.evaluate_to_scalar(expr)?)),
Some(false) => Ok(expr),
- _ => Err(DataFusionError::Internal(
- "Failed to pop can_evaluate".to_string(),
- )),
+ _ => internal_err!("Failed to pop can_evaluate"),
}
}
}
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index b8922e1992..e64ae169f9 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -30,7 +30,7 @@ use arrow::datatypes::{
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use datafusion_common::{downcast_value, ScalarValue};
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::convert::TryFrom;
@@ -219,10 +219,9 @@ impl<T: Hash> TryFrom<&ScalarValue> for HyperLogLog<T> {
if let ScalarValue::Binary(Some(slice)) = v {
slice.as_slice().try_into()
} else {
- Err(DataFusionError::Internal(
+ internal_err!(
"Impossibly got invalid scalar value while converting to
HyperLogLog"
- .into(),
- ))
+ )
}
}
}
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs
b/datafusion/physical-expr/src/aggregate/array_agg.rs
index ebf24750cb..0cf39888f1 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -23,7 +23,7 @@ use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;
@@ -142,9 +142,7 @@ impl Accumulator for ArrayAggAccumulator {
self.values.extend(values);
Ok(())
} else {
- Err(DataFusionError::Internal(
- "array_agg state must be list!".into(),
- ))
+ internal_err!("array_agg state must be list!")
}
})
}
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index d91f06fc76..77868683e2 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -33,7 +33,7 @@ use arrow::datatypes::{DataType, Field};
use arrow_array::{Array, ListArray};
use arrow_schema::{Fields, SortOptions};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
use itertools::izip;
@@ -220,9 +220,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
partition_values.push(other_values);
partition_ordering_values.push(other_ordering_values);
} else {
- return Err(DataFusionError::Internal(
- "ARRAY_AGG state must be list!".into(),
- ));
+ return internal_err!("ARRAY_AGG state must be list!");
}
}
let sort_options = self
diff --git a/datafusion/physical-expr/src/aggregate/average.rs
b/datafusion/physical-expr/src/aggregate/average.rs
index 85651c1f7c..c9fea856de 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -289,9 +289,7 @@ impl Accumulator for AvgAccumulator {
}
}
}
- _ => Err(DataFusionError::Internal(
- "Sum should be f64 or decimal128 on average".to_string(),
- )),
+ _ => internal_err!("Sum should be f64 or decimal128 on average"),
}
}
fn supports_retract_batch(&self) -> bool {
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
index 3dda6d6bcf..78c0c8896e 100644
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -741,9 +741,7 @@ impl Accumulator for DistinctBitXorAccumulator {
}
});
} else {
- return Err(DataFusionError::Internal(
- "Unexpected accumulator state".into(),
- ));
+ return internal_err!("Unexpected accumulator state");
}
Ok(())
})
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs
b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index 3f91b2a0db..05be8cbccb 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -28,7 +28,7 @@ use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
type DistinctScalarValues = ScalarValue;
@@ -182,9 +182,7 @@ impl Accumulator for DistinctCountAccumulator {
}
});
} else {
- return Err(DataFusionError::Internal(
- "Unexpected accumulator state".into(),
- ));
+ return internal_err!("Unexpected accumulator state");
}
Ok(())
})
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs
b/datafusion/physical-expr/src/aggregate/min_max.rs
index bcd16c0a1f..4de34c6fc0 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -467,9 +467,7 @@ macro_rules! interval_min_max {
Some(interval_choose_min_max!($OP)) => $RHS.clone(),
Some(_) => $LHS.clone(),
None => {
- return Err(DataFusionError::Internal(
- "Comparison error while computing interval
min/max".to_string(),
- ))
+ return internal_err!("Comparison error while computing
interval min/max")
}
}
}};
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs
b/datafusion/physical-expr/src/aggregate/stddev.rs
index e1b9b9ae23..330507d6ff 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -27,7 +27,7 @@ use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
/// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression
@@ -230,9 +230,7 @@ impl Accumulator for StddevAccumulator {
Ok(ScalarValue::Float64(e.map(|f| f.sqrt())))
}
}
- _ => Err(DataFusionError::Internal(
- "Variance should be f64".to_string(),
- )),
+ _ => internal_err!("Variance should be f64"),
}
}
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 73f74df967..366b875c23 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -28,7 +28,7 @@ use std::collections::HashSet;
use crate::aggregate::utils::down_cast_any_ref;
use crate::{AggregateExpr, PhysicalExpr};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
/// Expression for a SUM(DISTINCT) aggregation.
@@ -168,9 +168,7 @@ impl Accumulator for DistinctSumAccumulator {
}
});
} else {
- return Err(DataFusionError::Internal(
- "Unexpected accumulator state".into(),
- ));
+ return internal_err!("Unexpected accumulator state");
}
Ok(())
})
diff --git a/datafusion/physical-expr/src/crypto_expressions.rs
b/datafusion/physical-expr/src/crypto_expressions.rs
index e143cc0a20..580b0ed01b 100644
--- a/datafusion/physical-expr/src/crypto_expressions.rs
+++ b/datafusion/physical-expr/src/crypto_expressions.rs
@@ -284,11 +284,7 @@ pub fn md5(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
}
- _ => {
- return Err(DataFusionError::Internal(
- "Impossibly got invalid results from digest".into(),
- ))
- }
+ _ => return internal_err!("Impossibly got invalid results from
digest"),
})
}
@@ -345,9 +341,9 @@ pub fn digest(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
}
other => internal_err!("Unsupported data type {other:?} for
function digest"),
},
- ColumnarValue::Array(_) => Err(DataFusionError::Internal(
- "Digest using dynamically decided method is not yet
supported".into(),
- )),
+ ColumnarValue::Array(_) => {
+ internal_err!("Digest using dynamically decided method is not yet
supported")
+ }
}?;
digest_process(&args[0], digest_algorithm)
}
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index 506c01b6f3..8005cc7274 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -28,7 +28,7 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, eq_dyn, is_null, not, or, prep_null_mask_filter};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use datafusion_common::{cast::as_boolean_array, DataFusionError, Result};
+use datafusion_common::{cast::as_boolean_array, internal_err, DataFusionError,
Result};
use datafusion_expr::ColumnarValue;
use itertools::Itertools;
@@ -319,9 +319,7 @@ impl PhysicalExpr for CaseExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
if children.len() != self.children().len() {
- Err(DataFusionError::Internal(
- "CaseExpr: Wrong number of children".to_string(),
- ))
+ internal_err!("CaseExpr: Wrong number of children")
} else {
assert_eq!(children.len() % 2, 0);
let expr = match
children[0].clone().as_any().downcast_ref::<NoOp>() {
diff --git a/datafusion/physical-expr/src/expressions/negative.rs
b/datafusion/physical-expr/src/expressions/negative.rs
index dc45d6dbdd..897f3b0d52 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -29,7 +29,7 @@ use arrow::{
use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric},
ColumnarValue,
@@ -126,9 +126,9 @@ pub fn negative(
if is_null(&data_type) {
Ok(arg)
} else if !is_signed_numeric(&data_type) && !is_interval(&data_type) {
- Err(DataFusionError::Internal(
- format!("Can't create negative physical expr for (- '{arg:?}'),
the type of child expr is {data_type}, not signed numeric"),
- ))
+ internal_err!(
+ "Can't create negative physical expr for (- '{arg:?}'), the type
of child expr is {data_type}, not signed numeric"
+ )
} else {
Ok(Arc::new(NegativeExpr::new(arg)))
}
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index ab7601f7d7..82226ecfa1 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -1092,9 +1092,9 @@ mod tests {
test_function!(
CharacterLength,
&[lit("josé")],
- Err(DataFusionError::Internal(
- "function character_length requires compilation with feature
flag: unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function character_length requires compilation with feature
flag: unicode_expressions."
+ ),
i32,
Int32,
Int32Array
@@ -1369,9 +1369,9 @@ mod tests {
lit("abcde"),
lit(ScalarValue::Int8(Some(2))),
],
- Err(DataFusionError::Internal(
- "function left requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function left requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -1520,9 +1520,9 @@ mod tests {
lit("josé"),
lit(ScalarValue::Int64(Some(5))),
],
- Err(DataFusionError::Internal(
- "function lpad requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function lpad requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -1606,9 +1606,9 @@ mod tests {
test_function!(
MD5,
&[lit("tom")],
- Err(DataFusionError::Internal(
- "function md5 requires compilation with feature flag:
crypto_expressions.".to_string()
- )),
+ internal_err!(
+ "function md5 requires compilation with feature flag:
crypto_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -1756,9 +1756,9 @@ mod tests {
lit("b.."),
lit("X"),
],
- Err(DataFusionError::Internal(
- "function regexp_replace requires compilation with feature
flag: regex_expressions.".to_string()
- )),
+ internal_err!(
+ "function regexp_replace requires compilation with feature
flag: regex_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -1830,9 +1830,9 @@ mod tests {
test_function!(
Reverse,
&[lit("abcde")],
- Err(DataFusionError::Internal(
- "function reverse requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function reverse requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -1928,9 +1928,9 @@ mod tests {
lit("abcde"),
lit(ScalarValue::Int8(Some(2))),
],
- Err(DataFusionError::Internal(
- "function right requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function right requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -2079,9 +2079,9 @@ mod tests {
lit("josé"),
lit(ScalarValue::Int64(Some(5))),
],
- Err(DataFusionError::Internal(
- "function rpad requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function rpad requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -2173,9 +2173,9 @@ mod tests {
test_function!(
SHA224,
&[lit("tom")],
- Err(DataFusionError::Internal(
- "function sha224 requires compilation with feature flag:
crypto_expressions.".to_string()
- )),
+ internal_err!(
+ "function sha224 requires compilation with feature flag:
crypto_expressions."
+ ),
&[u8],
Binary,
BinaryArray
@@ -2219,9 +2219,9 @@ mod tests {
test_function!(
SHA256,
&[lit("tom")],
- Err(DataFusionError::Internal(
- "function sha256 requires compilation with feature flag:
crypto_expressions.".to_string()
- )),
+ internal_err!(
+ "function sha256 requires compilation with feature flag:
crypto_expressions."
+ ),
&[u8],
Binary,
BinaryArray
@@ -2269,9 +2269,9 @@ mod tests {
test_function!(
SHA384,
&[lit("tom")],
- Err(DataFusionError::Internal(
- "function sha384 requires compilation with feature flag:
crypto_expressions.".to_string()
- )),
+ internal_err!(
+ "function sha384 requires compilation with feature flag:
crypto_expressions."
+ ),
&[u8],
Binary,
BinaryArray
@@ -2321,9 +2321,9 @@ mod tests {
test_function!(
SHA512,
&[lit("tom")],
- Err(DataFusionError::Internal(
- "function sha512 requires compilation with feature flag:
crypto_expressions.".to_string()
- )),
+ internal_err!(
+ "function sha512 requires compilation with feature flag:
crypto_expressions."
+ ),
&[u8],
Binary,
BinaryArray
@@ -2459,9 +2459,9 @@ mod tests {
lit("joséésoj"),
lit(ScalarValue::Utf8(None)),
],
- Err(DataFusionError::Internal(
- "function strpos requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function strpos requires compilation with feature flag:
unicode_expressions."
+ ),
i32,
Int32,
Int32Array
@@ -2689,9 +2689,9 @@ mod tests {
lit("alphabet"),
lit(ScalarValue::Int64(Some(0))),
],
- Err(DataFusionError::Internal(
- "function substr requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function substr requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -2749,9 +2749,9 @@ mod tests {
lit("143"),
lit("ax"),
],
- Err(DataFusionError::Internal(
- "function translate requires compilation with feature flag:
unicode_expressions.".to_string()
- )),
+ internal_err!(
+ "function translate requires compilation with feature flag:
unicode_expressions."
+ ),
&str,
Utf8,
StringArray
@@ -3004,9 +3004,7 @@ mod tests {
if let ColumnarValue::Array(array) = col? {
Ok(as_uint64_array(&array)?.values().to_vec())
} else {
- Err(DataFusionError::Internal(
- "Unexpected scalar created by a test function".to_string(),
- ))
+ internal_err!("Unexpected scalar created by a test function")
}
}
diff --git a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
index 3396e3ee5f..a985f34d22 100644
--- a/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
+++ b/datafusion/physical-expr/src/intervals/interval_aritmetic.rs
@@ -379,9 +379,7 @@ impl Interval {
upper:
IntervalBound::new(ScalarValue::Boolean(Some(upper)), false),
})
}
- _ => Err(DataFusionError::Internal(
- "Incompatible types for logical conjunction".to_string(),
- )),
+ _ => internal_err!("Incompatible types for logical conjunction"),
}
}
diff --git a/datafusion/physical-expr/src/math_expressions.rs
b/datafusion/physical-expr/src/math_expressions.rs
index 4a5976387a..d1bfe4970c 100644
--- a/datafusion/physical-expr/src/math_expressions.rs
+++ b/datafusion/physical-expr/src/math_expressions.rs
@@ -329,9 +329,7 @@ pub fn isnan(args: &[ArrayRef]) -> Result<ArrayRef> {
{ f32::is_nan }
)) as ArrayRef),
- other => Err(DataFusionError::Internal(format!(
- "Unsupported data type {other:?} for function isnan"
- ))),
+ other => internal_err!("Unsupported data type {other:?} for function
isnan"),
}
}
@@ -354,18 +352,14 @@ pub fn iszero(args: &[ArrayRef]) -> Result<ArrayRef> {
{ |x: f32| { x == 0_f32 } }
)) as ArrayRef),
- other => Err(DataFusionError::Internal(format!(
- "Unsupported data type {other:?} for function iszero"
- ))),
+ other => internal_err!("Unsupported data type {other:?} for function
iszero"),
}
}
/// Pi SQL function
pub fn pi(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if !matches!(&args[0], ColumnarValue::Array(_)) {
- return Err(DataFusionError::Internal(
- "Expect pi function to take no param".to_string(),
- ));
+ return internal_err!("Expect pi function to take no param");
}
let array = Float64Array::from_value(std::f64::consts::PI, 1);
Ok(ColumnarValue::Array(Arc::new(array)))
@@ -375,11 +369,7 @@ pub fn pi(args: &[ColumnarValue]) -> Result<ColumnarValue>
{
pub fn random(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len: usize = match &args[0] {
ColumnarValue::Array(array) => array.len(),
- _ => {
- return Err(DataFusionError::Internal(
- "Expect random function to take no param".to_string(),
- ))
- }
+ _ => return internal_err!("Expect random function to take no param"),
};
let mut rng = thread_rng();
let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len);
@@ -434,10 +424,9 @@ pub fn round(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
+ _ => internal_err!(
"round function requires a scalar or array for decimal_places"
- .to_string(),
- )),
+ ),
},
DataType::Float32 => match decimal_places {
@@ -471,10 +460,9 @@ pub fn round(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
+ _ => internal_err!(
"round function requires a scalar or array for decimal_places"
- .to_string(),
- )),
+ ),
},
other => internal_err!("Unsupported data type {other:?} for function
round"),
@@ -560,9 +548,7 @@ pub fn log(args: &[ArrayRef]) -> Result<ArrayRef> {
Float64Array,
{ f64::log }
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
- "log function requires a scalar or array for base".to_string(),
- )),
+ _ => internal_err!("log function requires a scalar or array for
base"),
},
DataType::Float32 => match base {
@@ -580,9 +566,7 @@ pub fn log(args: &[ArrayRef]) -> Result<ArrayRef> {
Float32Array,
{ f32::log }
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
- "log function requires a scalar or array for base".to_string(),
- )),
+ _ => internal_err!("log function requires a scalar or array for
base"),
},
other => internal_err!("Unsupported data type {other:?} for function
log"),
@@ -652,9 +636,7 @@ pub fn trunc(args: &[ArrayRef]) -> Result<ArrayRef> {
Int64Array,
{ compute_truncate64 }
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
- "trunc function requires a scalar or array for
precision".to_string(),
- )),
+ _ => internal_err!("trunc function requires a scalar or array for
precision"),
},
DataType::Float32 => match precision {
ColumnarValue::Scalar(Int64(Some(0))) => Ok(Arc::new(
@@ -669,9 +651,7 @@ pub fn trunc(args: &[ArrayRef]) -> Result<ArrayRef> {
Int64Array,
{ compute_truncate32 }
)) as ArrayRef),
- _ => Err(DataFusionError::Internal(
- "trunc function requires a scalar or array for
precision".to_string(),
- )),
+ _ => internal_err!("trunc function requires a scalar or array for
precision"),
},
other => internal_err!("Unsupported data type {other:?} for function
trunc"),
}
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index e9fb66b634..0d2808730c 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -23,7 +23,7 @@ use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::DataPtr;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::any::Any;
@@ -145,9 +145,7 @@ pub fn with_new_children_if_necessary(
) -> Result<Arc<dyn PhysicalExpr>> {
let old_children = expr.children();
if children.len() != old_children.len() {
- Err(DataFusionError::Internal(
- "PhysicalExpr: Wrong number of children".to_string(),
- ))
+ internal_err!("PhysicalExpr: Wrong number of children")
} else if children.is_empty()
|| children
.iter()
diff --git a/datafusion/physical-expr/src/string_expressions.rs
b/datafusion/physical-expr/src/string_expressions.rs
index 00f0fe4abb..39a21a0e44 100644
--- a/datafusion/physical-expr/src/string_expressions.rs
+++ b/datafusion/physical-expr/src/string_expressions.rs
@@ -549,11 +549,7 @@ pub fn upper(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
pub fn uuid(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len: usize = match &args[0] {
ColumnarValue::Array(array) => array.len(),
- _ => {
- return Err(DataFusionError::Internal(
- "Expect uuid function to take no param".to_string(),
- ))
- }
+ _ => return internal_err!("Expect uuid function to take no param"),
};
let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len);
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs
b/datafusion/physical-expr/src/window/lead_lag.rs
index 862648993a..887dc4b66d 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -24,7 +24,7 @@ use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::cmp::min;
@@ -233,9 +233,7 @@ fn get_default_value(
if let ScalarValue::Int64(Some(val)) = value {
ScalarValue::try_from_string(val.to_string(), dtype)
} else {
- Err(DataFusionError::Internal(
- "Expects default value to have Int64 type".to_string(),
- ))
+ internal_err!("Expects default value to have Int64 type")
}
} else {
Ok(ScalarValue::try_from(dtype)?)
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 43f1d44b7d..d3329c6967 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -31,8 +31,8 @@ use arrow::datatypes::{
};
use datafusion::execution::registry::FunctionRegistry;
use datafusion_common::{
- Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference, Result,
- ScalarValue,
+ internal_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
+ OwnedTableReference, Result, ScalarValue,
};
use datafusion_expr::expr::{Alias, Placeholder};
use datafusion_expr::{
@@ -1649,9 +1649,7 @@ fn parse_escape_char(s: &str) -> Result<Option<char>> {
match s.len() {
0 => Ok(None),
1 => Ok(s.chars().next()),
- _ => Err(DataFusionError::Internal(
- "Invalid length for escape char".to_string(),
- )),
+ _ => internal_err!("Invalid length for escape char"),
}
}
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index d218cdf3e6..ec9d660678 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1464,7 +1464,7 @@ mod roundtrip_tests {
};
use datafusion::test_util::{TestTableFactory, TestTableProvider};
use datafusion_common::{
- plan_err, DFSchemaRef, DataFusionError, Result, ScalarValue,
+ internal_err, plan_err, DFSchemaRef, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like,
ScalarFunction,
@@ -1814,14 +1814,10 @@ mod roundtrip_tests {
node: Arc::new(node),
})
} else {
- Err(DataFusionError::Internal(
- "invalid plan, no expr".to_string(),
- ))
+ internal_err!("invalid plan, no expr")
}
} else {
- Err(DataFusionError::Internal(
- "invalid plan, no input".to_string(),
- ))
+ internal_err!("invalid plan, no input")
}
}
@@ -1840,9 +1836,7 @@ mod roundtrip_tests {
Ok(())
} else {
- Err(DataFusionError::Internal(
- "unsupported plan type".to_string(),
- ))
+ internal_err!("unsupported plan type")
}
}
@@ -1852,9 +1846,7 @@ mod roundtrip_tests {
_schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>> {
- Err(DataFusionError::Internal(
- "unsupported plan type".to_string(),
- ))
+ internal_err!("unsupported plan type")
}
fn try_encode_table_provider(
@@ -1862,9 +1854,7 @@ mod roundtrip_tests {
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> Result<()> {
- Err(DataFusionError::Internal(
- "unsupported plan type".to_string(),
- ))
+ internal_err!("unsupported plan type")
}
}
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 8d4f5138f9..593c558eff 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -249,9 +249,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
),
)?))
}
- _ => Err(DataFusionError::Internal(
- "Invalid partitioning scheme".to_owned(),
- )),
+ _ => internal_err!("Invalid partitioning scheme"),
}
}
PhysicalPlanType::GlobalLimit(limit) => {
@@ -331,9 +329,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
&physical_schema,
)?)
}
- _ => Err(DataFusionError::Internal(
- "Invalid expression for
WindowAggrExec".to_string(),
- )),
+ _ => internal_err!("Invalid expression for
WindowAggrExec"),
}
})
.collect::<Result<Vec<_>, _>>()?;
@@ -485,10 +481,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
proto_error("Invalid AggregateExpr,
missing aggregate_function")
})
}
- _ => Err(DataFusionError::Internal(
+ _ => internal_err!(
"Invalid aggregate expression for
AggregateExec"
- .to_string(),
- )),
+ ),
}
})
.collect::<Result<Vec<_>, _>>()?;
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index aad9f770ff..a872355ae0 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -29,8 +29,8 @@ mod value;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow_schema::DataType;
-use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{internal_err, plan_err};
use datafusion_common::{Column, DFSchema, DataFusionError, Result,
ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{InList, Placeholder};
@@ -164,26 +164,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Value(value) => {
self.parse_value(value,
planner_context.prepare_param_data_types())
}
- SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction
(ScalarFunction::new(
- BuiltinScalarFunction::DatePart,
- vec![
- Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
- self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?,
- ],
- ))),
+ SQLExpr::Extract { field, expr } => {
+ Ok(Expr::ScalarFunction(ScalarFunction::new(
+ BuiltinScalarFunction::DatePart,
+ vec![
+
Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
+ self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?,
+ ],
+ )))
+ }
SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
- SQLExpr::Interval(interval)=> self.sql_interval_to_expr(
- false,
- interval,
- schema,
- planner_context,
- ),
- SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id, schema,
planner_context),
+ SQLExpr::Interval(interval) => {
+ self.sql_interval_to_expr(false, interval, schema,
planner_context)
+ }
+ SQLExpr::Identifier(id) => {
+ self.sql_identifier_to_expr(id, schema, planner_context)
+ }
SQLExpr::MapAccess { column, keys } => {
if let SQLExpr::Identifier(id) = *column {
- self.plan_indexed(col(self.normalizer.normalize(id)),
keys, schema, planner_context)
+ self.plan_indexed(
+ col(self.normalizer.normalize(id)),
+ keys,
+ schema,
+ planner_context,
+ )
} else {
Err(DataFusionError::NotImplemented(format!(
"map access requires an identifier, found column
{column} instead"
@@ -192,39 +198,48 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
SQLExpr::ArrayIndex { obj, indexes } => {
- let expr = self.sql_expr_to_logical_expr(*obj, schema,
planner_context)?;
+ let expr =
+ self.sql_expr_to_logical_expr(*obj, schema,
planner_context)?;
self.plan_indexed(expr, indexes, schema, planner_context)
}
- SQLExpr::CompoundIdentifier(ids) =>
self.sql_compound_identifier_to_expr(ids, schema, planner_context),
+ SQLExpr::CompoundIdentifier(ids) => {
+ self.sql_compound_identifier_to_expr(ids, schema,
planner_context)
+ }
SQLExpr::Case {
operand,
conditions,
results,
else_result,
- } => self.sql_case_identifier_to_expr(operand, conditions,
results, else_result, schema, planner_context),
+ } => self.sql_case_identifier_to_expr(
+ operand,
+ conditions,
+ results,
+ else_result,
+ schema,
+ planner_context,
+ ),
- SQLExpr::Cast {
- expr,
- data_type,
- } => Ok(Expr::Cast(Cast::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?),
+ SQLExpr::Cast { expr, data_type } => Ok(Expr::Cast(Cast::new(
+ Box::new(self.sql_expr_to_logical_expr(
+ *expr,
+ schema,
+ planner_context,
+ )?),
self.convert_data_type(&data_type)?,
))),
- SQLExpr::TryCast {
- expr,
- data_type,
- } => Ok(Expr::TryCast(TryCast::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?),
+ SQLExpr::TryCast { expr, data_type } =>
Ok(Expr::TryCast(TryCast::new(
+ Box::new(self.sql_expr_to_logical_expr(
+ *expr,
+ schema,
+ planner_context,
+ )?),
self.convert_data_type(&data_type)?,
))),
- SQLExpr::TypedString {
- data_type,
- value,
- } => Ok(Expr::Cast(Cast::new(
+ SQLExpr::TypedString { data_type, value } =>
Ok(Expr::Cast(Cast::new(
Box::new(lit(value)),
self.convert_data_type(&data_type)?,
))),
@@ -237,31 +252,65 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
- SQLExpr::IsDistinctFrom(left, right) =>
Ok(Expr::BinaryExpr(BinaryExpr::new(
- Box::new(self.sql_expr_to_logical_expr(*left, schema,
planner_context)?),
- Operator::IsDistinctFrom,
- Box::new(self.sql_expr_to_logical_expr(*right, schema,
planner_context)?),
- ))),
+ SQLExpr::IsDistinctFrom(left, right) => {
+ Ok(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(self.sql_expr_to_logical_expr(
+ *left,
+ schema,
+ planner_context,
+ )?),
+ Operator::IsDistinctFrom,
+ Box::new(self.sql_expr_to_logical_expr(
+ *right,
+ schema,
+ planner_context,
+ )?),
+ )))
+ }
- SQLExpr::IsNotDistinctFrom(left, right) =>
Ok(Expr::BinaryExpr(BinaryExpr::new(
- Box::new(self.sql_expr_to_logical_expr(*left, schema,
planner_context)?),
- Operator::IsNotDistinctFrom,
- Box::new(self.sql_expr_to_logical_expr(*right, schema,
planner_context)?),
- ))),
+ SQLExpr::IsNotDistinctFrom(left, right) => {
+ Ok(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(self.sql_expr_to_logical_expr(
+ *left,
+ schema,
+ planner_context,
+ )?),
+ Operator::IsNotDistinctFrom,
+ Box::new(self.sql_expr_to_logical_expr(
+ *right,
+ schema,
+ planner_context,
+ )?),
+ )))
+ }
- SQLExpr::IsTrue(expr) =>
Ok(Expr::IsTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::IsFalse(expr) =>
Ok(Expr::IsFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::IsNotTrue(expr) =>
Ok(Expr::IsNotTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::IsNotFalse(expr) =>
Ok(Expr::IsNotFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::IsUnknown(expr) =>
Ok(Expr::IsUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::IsNotUnknown(expr) =>
Ok(Expr::IsNotUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?))),
+ SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
- SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(op,
*expr, schema, planner_context),
+ SQLExpr::UnaryOp { op, expr } => {
+ self.parse_sql_unary_op(op, *expr, schema, planner_context)
+ }
SQLExpr::Between {
expr,
@@ -269,10 +318,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
low,
high,
} => Ok(Expr::Between(Between::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema,
planner_context)?),
+ Box::new(self.sql_expr_to_logical_expr(
+ *expr,
+ schema,
+ planner_context,
+ )?),
negated,
Box::new(self.sql_expr_to_logical_expr(*low, schema,
planner_context)?),
- Box::new(self.sql_expr_to_logical_expr(*high, schema,
planner_context)?),
+ Box::new(self.sql_expr_to_logical_expr(
+ *high,
+ schema,
+ planner_context,
+ )?),
))),
SQLExpr::InList {
@@ -281,18 +338,52 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
negated,
} => self.sql_in_list_to_expr(*expr, list, negated, schema,
planner_context),
- SQLExpr::Like { negated, expr, pattern, escape_char } =>
self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema,
planner_context,false),
+ SQLExpr::Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ } => self.sql_like_to_expr(
+ negated,
+ *expr,
+ *pattern,
+ escape_char,
+ schema,
+ planner_context,
+ false,
+ ),
- SQLExpr::ILike { negated, expr, pattern, escape_char } =>
self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema,
planner_context,true),
+ SQLExpr::ILike {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ } => self.sql_like_to_expr(
+ negated,
+ *expr,
+ *pattern,
+ escape_char,
+ schema,
+ planner_context,
+ true,
+ ),
- SQLExpr::SimilarTo { negated, expr, pattern, escape_char } =>
self.sql_similarto_to_expr(negated, *expr, *pattern, escape_char, schema,
planner_context),
+ SQLExpr::SimilarTo {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ } => self.sql_similarto_to_expr(
+ negated,
+ *expr,
+ *pattern,
+ escape_char,
+ schema,
+ planner_context,
+ ),
- SQLExpr::BinaryOp {
- ..
- } => {
- Err(DataFusionError::Internal(
- "binary_op should be handled by
sql_expr_to_logical_expr.".to_string()
- ))
+ SQLExpr::BinaryOp { .. } => {
+ internal_err!("binary_op should be handled by
sql_expr_to_logical_expr.")
}
#[cfg(feature = "unicode_expressions")]
@@ -300,37 +391,89 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
expr,
substring_from,
substring_for,
- } => self.sql_substring_to_expr(expr, substring_from,
substring_for, schema, planner_context),
+ } => self.sql_substring_to_expr(
+ expr,
+ substring_from,
+ substring_for,
+ schema,
+ planner_context,
+ ),
#[cfg(not(feature = "unicode_expressions"))]
- SQLExpr::Substring {
- ..
- } => {
- Err(DataFusionError::Internal(
- "statement substring requires compilation with feature
flag: unicode_expressions.".to_string()
- ))
+ SQLExpr::Substring { .. } => {
+ internal_err!(
+ "statement substring requires compilation with feature
flag: unicode_expressions."
+ )
}
- SQLExpr::Trim { expr, trim_where, trim_what } =>
self.sql_trim_to_expr(*expr, trim_where, trim_what, schema, planner_context),
+ SQLExpr::Trim {
+ expr,
+ trim_where,
+ trim_what,
+ } => self.sql_trim_to_expr(
+ *expr,
+ trim_where,
+ trim_what,
+ schema,
+ planner_context,
+ ),
- SQLExpr::AggregateExpressionWithFilter { expr, filter } =>
self.sql_agg_with_filter_to_expr(*expr, *filter, schema, planner_context),
+ SQLExpr::AggregateExpressionWithFilter { expr, filter } => {
+ self.sql_agg_with_filter_to_expr(*expr, *filter, schema,
planner_context)
+ }
- SQLExpr::Function(function) => self.sql_function_to_expr(function,
schema, planner_context),
+ SQLExpr::Function(function) => {
+ self.sql_function_to_expr(function, schema, planner_context)
+ }
- SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema,
planner_context),
- SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema,
planner_context),
- SQLExpr::GroupingSets(exprs) =>
self.sql_grouping_sets_to_expr(exprs, schema, planner_context),
+ SQLExpr::Rollup(exprs) => {
+ self.sql_rollup_to_expr(exprs, schema, planner_context)
+ }
+ SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema,
planner_context),
+ SQLExpr::GroupingSets(exprs) => {
+ self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
+ }
- SQLExpr::Floor { expr, field: _field } =>
self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Floor, schema,
planner_context),
- SQLExpr::Ceil { expr, field: _field } =>
self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Ceil, schema,
planner_context),
+ SQLExpr::Floor {
+ expr,
+ field: _field,
+ } => self.sql_named_function_to_expr(
+ *expr,
+ BuiltinScalarFunction::Floor,
+ schema,
+ planner_context,
+ ),
+ SQLExpr::Ceil {
+ expr,
+ field: _field,
+ } => self.sql_named_function_to_expr(
+ *expr,
+ BuiltinScalarFunction::Ceil,
+ schema,
+ planner_context,
+ ),
- SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema,
planner_context),
+ SQLExpr::Nested(e) => {
+ self.sql_expr_to_logical_expr(*e, schema, planner_context)
+ }
- SQLExpr::Exists { subquery, negated } =>
self.parse_exists_subquery(*subquery, negated, schema, planner_context),
- SQLExpr::InSubquery { expr, subquery, negated } =>
self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context),
- SQLExpr::Subquery(subquery) =>
self.parse_scalar_subquery(*subquery, schema, planner_context),
+ SQLExpr::Exists { subquery, negated } => {
+ self.parse_exists_subquery(*subquery, negated, schema,
planner_context)
+ }
+ SQLExpr::InSubquery {
+ expr,
+ subquery,
+ negated,
+ } => {
+ self.parse_in_subquery(*expr, *subquery, negated, schema,
planner_context)
+ }
+ SQLExpr::Subquery(subquery) => {
+ self.parse_scalar_subquery(*subquery, schema, planner_context)
+ }
- SQLExpr::ArrayAgg(array_agg) => self.parse_array_agg(array_agg,
schema, planner_context),
+ SQLExpr::ArrayAgg(array_agg) => {
+ self.parse_array_agg(array_agg, schema, planner_context)
+ }
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported ast node in sqltorel: {sql:?}"
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 0f54638d74..1bf1ffbaef 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -22,6 +22,7 @@ use std::vec;
use arrow_schema::*;
use datafusion_common::field_not_found;
+use datafusion_common::internal_err;
use datafusion_expr::WindowUDF;
use sqlparser::ast::ExactNumberInfo;
use sqlparser::ast::TimezoneInfo;
@@ -284,7 +285,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map_err(|_: DataFusionError| {
field_not_found(col.relation.clone(), col.name.as_str(),
schema)
}),
- _ => Err(DataFusionError::Internal("Not a
column".to_string())),
+ _ => internal_err!("Not a column"),
})
}
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 521b55f544..27fd050394 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -21,7 +21,7 @@ use arrow_schema::{DataType, DECIMAL128_MAX_PRECISION,
DECIMAL_DEFAULT_SCALE};
use datafusion_common::tree_node::{Transformed, TreeNode};
use sqlparser::ast::Ident;
-use datafusion_common::plan_err;
+use datafusion_common::{internal_err, plan_err};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction};
use datafusion_expr::expr_vec_fmt;
@@ -82,9 +82,7 @@ pub(crate) fn check_columns_satisfy_exprs(
) -> Result<()> {
columns.iter().try_for_each(|c| match c {
Expr::Column(_) => Ok(()),
- _ => Err(DataFusionError::Internal(
- "Expr::Column are required".to_string(),
- )),
+ _ => internal_err!("Expr::Column are required"),
})?;
let column_exprs = find_column_exprs(exprs);
for e in &column_exprs {
diff --git a/datafusion/substrait/src/logical_plan/producer.rs
b/datafusion/substrait/src/logical_plan/producer.rs
index 495ccf1e35..79cd8995c6 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -961,9 +961,7 @@ pub fn to_substrait_rex(
fn to_substrait_type(dt: &DataType) -> Result<substrait::proto::Type> {
let default_nullability = r#type::Nullability::Required as i32;
match dt {
- DataType::Null => Err(DataFusionError::Internal(
- "Null cast is not valid".to_string(),
- )),
+ DataType::Null => internal_err!("Null cast is not valid"),
DataType::Boolean => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::Bool(r#type::Boolean {
type_variation_reference: DEFAULT_TYPE_REF,