This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e03f9f6767 Remove CountWildcardRule in Analyzer and move the
functionality in ExprPlanner, add `plan_aggregate` and `plan_window` to planner
(#14689)
e03f9f6767 is described below
commit e03f9f6767a69f16c5a96d5d3863cefc209497e2
Author: Jay Zhan <[email protected]>
AuthorDate: Sat Feb 22 07:45:46 2025 +0800
Remove CountWildcardRule in Analyzer and move the functionality in
ExprPlanner, add `plan_aggregate` and `plan_window` to planner (#14689)
* count planner
* window
* update slt
* remove rule
* rm rule
* doc
* fix name
* fix name
* fix test
* tpch test
* fix avro
* rename
* switch to count(*)
* use count(*)
* rename
* doc
* rename window funciotn
* fmt
* rm print
* upd logic
* count null
---
.../core/src/execution/session_state_defaults.rs | 2 +
.../core/tests/dataframe/dataframe_functions.rs | 34 ++-
datafusion/core/tests/dataframe/mod.rs | 30 +--
datafusion/core/tests/sql/explain_analyze.rs | 2 +-
datafusion/expr/src/expr.rs | 1 -
datafusion/expr/src/planner.rs | 62 ++++-
datafusion/expr/src/udaf.rs | 21 +-
datafusion/functions-aggregate/src/count.rs | 201 ++++++++++++++-
datafusion/functions-aggregate/src/lib.rs | 19 +-
datafusion/functions-aggregate/src/planner.rs | 63 +++++
datafusion/functions-window/src/lib.rs | 3 +
datafusion/functions-window/src/planner.rs | 61 +++++
.../optimizer/src/analyzer/count_wildcard_rule.rs | 277 ---------------------
datafusion/optimizer/src/analyzer/mod.rs | 3 -
.../optimizer/tests/optimizer_integration.rs | 28 ++-
datafusion/sql/src/expr/function.rs | 70 +++++-
datafusion/sql/tests/sql_integration.rs | 12 +-
datafusion/sqllogictest/test_files/aggregate.slt | 20 ++
datafusion/sqllogictest/test_files/avro.slt | 2 +-
datafusion/sqllogictest/test_files/coalesce.slt | 2 +-
datafusion/sqllogictest/test_files/copy.slt | 1 -
.../sqllogictest/test_files/count_star_rule.slt | 32 +--
datafusion/sqllogictest/test_files/ddl.slt | 1 -
datafusion/sqllogictest/test_files/errors.slt | 2 +-
datafusion/sqllogictest/test_files/explain.slt | 3 +-
datafusion/sqllogictest/test_files/insert.slt | 6 +-
.../sqllogictest/test_files/insert_to_external.slt | 4 +-
datafusion/sqllogictest/test_files/joins.slt | 4 +-
datafusion/sqllogictest/test_files/json.slt | 2 +-
datafusion/sqllogictest/test_files/limit.slt | 8 +-
.../test_files/optimizer_group_by_constant.slt | 16 +-
datafusion/sqllogictest/test_files/select.slt | 2 +-
datafusion/sqllogictest/test_files/subquery.slt | 42 ++--
.../sqllogictest/test_files/tpch/plans/q1.slt.part | 2 +-
.../test_files/tpch/plans/q13.slt.part | 2 +-
.../test_files/tpch/plans/q21.slt.part | 2 +-
.../test_files/tpch/plans/q22.slt.part | 2 +-
.../sqllogictest/test_files/tpch/plans/q4.slt.part | 2 +-
datafusion/sqllogictest/test_files/union.slt | 6 +-
datafusion/sqllogictest/test_files/window.slt | 12 +-
.../substrait/tests/cases/consumer_integration.rs | 26 +-
.../tests/cases/roundtrip_logical_plan.rs | 4 +-
42 files changed, 652 insertions(+), 442 deletions(-)
diff --git a/datafusion/core/src/execution/session_state_defaults.rs
b/datafusion/core/src/execution/session_state_defaults.rs
index 92f649781c..33bf01cf35 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -94,6 +94,8 @@ impl SessionStateDefaults {
feature = "unicode_expressions"
))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
+ Arc::new(functions_aggregate::planner::AggregateFunctionPlanner),
+ Arc::new(functions_window::planner::WindowFunctionPlanner),
];
expr_planners
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 29c24948fb..33f32e8f0f 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -22,6 +22,7 @@ use arrow::{
array::{Int32Array, StringArray},
record_batch::RecordBatch,
};
+use datafusion_functions_aggregate::count::count_all;
use std::sync::Arc;
use datafusion::error::Result;
@@ -31,7 +32,7 @@ use datafusion::prelude::*;
use datafusion::assert_batches_eq;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::expr::Alias;
-use datafusion_expr::ExprSchemable;
+use datafusion_expr::{table_scan, ExprSchemable, LogicalPlanBuilder};
use datafusion_functions_aggregate::expr_fn::{approx_median,
approx_percentile_cont};
use datafusion_functions_nested::map::map;
@@ -1123,3 +1124,34 @@ async fn test_fn_map() -> Result<()> {
Ok(())
}
+
+/// Call count wildcard from dataframe API
+#[tokio::test]
+async fn test_count_wildcard() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::UInt32, false),
+ Field::new("c", DataType::UInt32, false),
+ ]);
+
+ let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("b")], vec![count_all()])
+ .unwrap()
+ .project(vec![count_all()])
+ .unwrap()
+ .sort(vec![count_all().sort(true, false)])
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\
+ \n Projection: count(*) [count(*):Int64]\
+ \n Aggregate: groupBy=[[test.b]], aggr=[[count(*)]] [b:UInt32,
count(*):Int64]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+ let formatted_plan = plan.display_indent_schema().to_string();
+ assert_eq!(formatted_plan, expected);
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index d545157607..b05029e8e3 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -32,7 +32,8 @@ use arrow::datatypes::{
};
use arrow::error::ArrowError;
use arrow::util::pretty::pretty_format_batches;
-use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_functions_aggregate::count::{count_all, count_udaf};
use datafusion_functions_aggregate::expr_fn::{
array_agg, avg, count, count_distinct, max, median, min, sum,
};
@@ -72,7 +73,7 @@ use datafusion_expr::expr::{GroupingSet, Sort,
WindowFunction};
use datafusion_expr::var_provider::{VarProvider, VarType};
use datafusion_expr::{
cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
- scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable,
LogicalPlan,
+ scalar_subquery, when, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan,
ScalarFunctionImplementation, WindowFrame, WindowFrameBound,
WindowFrameUnits,
WindowFunctionDefinition,
};
@@ -2463,8 +2464,8 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
let df_results = ctx
.table("t1")
.await?
- .aggregate(vec![col("b")], vec![count(wildcard())])?
- .sort(vec![count(wildcard()).sort(true, false)])?
+ .aggregate(vec![col("b")], vec![count_all()])?
+ .sort(vec![count_all().sort(true, false)])?
.explain(false, false)?
.collect()
.await?;
@@ -2498,8 +2499,8 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
Arc::new(
ctx.table("t2")
.await?
- .aggregate(vec![], vec![count(wildcard())])?
- .select(vec![count(wildcard())])?
+ .aggregate(vec![], vec![count_all()])?
+ .select(vec![count_all()])?
.into_optimized_plan()?,
),
))?
@@ -2532,8 +2533,8 @@ async fn test_count_wildcard_on_where_exist() ->
Result<()> {
.filter(exists(Arc::new(
ctx.table("t2")
.await?
- .aggregate(vec![], vec![count(wildcard())])?
- .select(vec![count(wildcard())])?
+ .aggregate(vec![], vec![count_all()])?
+ .select(vec![count_all()])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/datafusion/issues/5771,
@@ -2568,7 +2569,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
.await?
.select(vec![Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::AggregateUDF(count_udaf()),
- vec![wildcard()],
+ vec![Expr::Literal(COUNT_STAR_EXPANSION)],
))
.order_by(vec![Sort::new(col("a"), false, true)])
.window_frame(WindowFrame::new_bounds(
@@ -2599,17 +2600,16 @@ async fn test_count_wildcard_on_aggregate() ->
Result<()> {
let sql_results = ctx
.sql("select count(*) from t1")
.await?
- .select(vec![col("count(*)")])?
.explain(false, false)?
.collect()
.await?;
- // add `.select(vec![count(wildcard())])?` to make sure we can analyze all
node instead of just top node.
+ // add `.select(vec![count_wildcard()])?` to make sure we can analyze all
node instead of just top node.
let df_results = ctx
.table("t1")
.await?
- .aggregate(vec![], vec![count(wildcard())])?
- .select(vec![count(wildcard())])?
+ .aggregate(vec![], vec![count_all()])?
+ .select(vec![count_all()])?
.explain(false, false)?
.collect()
.await?;
@@ -2646,8 +2646,8 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::UInt32,
"t1.a").eq(col("t2.a")))?
- .aggregate(vec![], vec![count(wildcard())])?
- .select(vec![col(count(wildcard()).to_string())])?
+ .aggregate(vec![], vec![count_all()])?
+ .select(vec![col(count_all().to_string())])?
.into_unoptimized_plan(),
))
.gt(lit(ScalarValue::UInt8(Some(0)))),
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index d4b5ae8b28..128d1d0aa4 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -780,7 +780,7 @@ async fn explain_logical_plan_only() {
let expected = vec![
vec![
"logical_plan",
- "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\
+ "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n SubqueryAlias: t\
\n Projection: \
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"),
Int64(2), Int64(150))"
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index df79b3568c..f8baf9c94b 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2294,7 +2294,6 @@ impl Display for SchemaDisplay<'_> {
| Expr::OuterReferenceColumn(..)
| Expr::Placeholder(_)
| Expr::Wildcard { .. } => write!(f, "{}", self.0),
-
Expr::AggregateFunction(AggregateFunction { func, params }) => {
match func.schema_name(params) {
Ok(name) => {
diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs
index 04cc26c910..a2ed0592ef 100644
--- a/datafusion/expr/src/planner.rs
+++ b/datafusion/expr/src/planner.rs
@@ -25,9 +25,12 @@ use datafusion_common::{
config::ConfigOptions, file_options::file_type::FileType, not_impl_err,
DFSchema,
Result, TableReference,
};
-use sqlparser::ast;
+use sqlparser::ast::{self, NullTreatment};
-use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource,
WindowUDF};
+use crate::{
+ AggregateUDF, Expr, GetFieldAccess, ScalarUDF, SortExpr, TableSource,
WindowFrame,
+ WindowFunctionDefinition, WindowUDF,
+};
/// Provides the `SQL` query planner meta-data about tables and
/// functions referenced in SQL statements, without a direct dependency on the
@@ -138,7 +141,7 @@ pub trait ExprPlanner: Debug + Send + Sync {
/// Plan an array literal, such as `[1, 2, 3]`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_array_literal(
&self,
exprs: Vec<Expr>,
@@ -149,14 +152,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
/// Plan a `POSITION` expression, such as `POSITION(<expr> in <expr>)`
///
- /// returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_position(&self, args: Vec<Expr>) ->
Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
/// Plan a dictionary literal, such as `{ key: value, ...}`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_dictionary_literal(
&self,
expr: RawDictionaryExpr,
@@ -167,14 +170,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
/// Plan an extract expression, such as`EXTRACT(month FROM foo)`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_extract(&self, args: Vec<Expr>) ->
Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
/// Plan an substring expression, such as `SUBSTRING(<expr> [FROM <expr>]
[FOR <expr>])`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_substring(&self, args: Vec<Expr>) ->
Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
@@ -195,14 +198,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
/// Plans an overlay expression, such as `overlay(str PLACING substr FROM
pos [FOR count])`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_overlay(&self, args: Vec<Expr>) ->
Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
/// Plans a `make_map` expression, such as `make_map(key1, value1, key2,
value2, ...)`
///
- /// Returns origin expression arguments if not possible
+ /// Returns original expression arguments if not possible
fn plan_make_map(&self, args: Vec<Expr>) ->
Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
@@ -230,6 +233,23 @@ pub trait ExprPlanner: Debug + Send + Sync {
fn plan_any(&self, expr: RawBinaryExpr) ->
Result<PlannerResult<RawBinaryExpr>> {
Ok(PlannerResult::Original(expr))
}
+
+ /// Plans aggregate functions, such as `COUNT(<expr>)`
+ ///
+ /// Returns original expression arguments if not possible
+ fn plan_aggregate(
+ &self,
+ expr: RawAggregateExpr,
+ ) -> Result<PlannerResult<RawAggregateExpr>> {
+ Ok(PlannerResult::Original(expr))
+ }
+
+ /// Plans window functions, such as `COUNT(<expr>)`
+ ///
+ /// Returns original expression arguments if not possible
+ fn plan_window(&self, expr: RawWindowExpr) ->
Result<PlannerResult<RawWindowExpr>> {
+ Ok(PlannerResult::Original(expr))
+ }
}
/// An operator with two arguments to plan
@@ -266,6 +286,30 @@ pub struct RawDictionaryExpr {
pub values: Vec<Expr>,
}
+/// This structure is used by `AggregateFunctionPlanner` to plan operators with
+/// custom expressions.
+#[derive(Debug, Clone)]
+pub struct RawAggregateExpr {
+ pub func: Arc<AggregateUDF>,
+ pub args: Vec<Expr>,
+ pub distinct: bool,
+ pub filter: Option<Box<Expr>>,
+ pub order_by: Option<Vec<SortExpr>>,
+ pub null_treatment: Option<NullTreatment>,
+}
+
+/// This structure is used by `WindowFunctionPlanner` to plan operators with
+/// custom expressions.
+#[derive(Debug, Clone)]
+pub struct RawWindowExpr {
+ pub func_def: WindowFunctionDefinition,
+ pub args: Vec<Expr>,
+ pub partition_by: Vec<Expr>,
+ pub order_by: Vec<SortExpr>,
+ pub window_frame: WindowFrame,
+ pub null_treatment: Option<NullTreatment>,
+}
+
/// Result of planning a raw expr with [`ExprPlanner`]
#[derive(Debug, Clone)]
pub enum PlannerResult<T> {
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index 2b9e2bddd1..ae7196c9b1 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -515,9 +515,9 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
null_treatment,
} = params;
- let mut schema_name = String::new();
+ let mut display_name = String::new();
- schema_name.write_fmt(format_args!(
+ display_name.write_fmt(format_args!(
"{}({}{})",
self.name(),
if *distinct { "DISTINCT " } else { "" },
@@ -525,17 +525,22 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
))?;
if let Some(nt) = null_treatment {
- schema_name.write_fmt(format_args!(" {}", nt))?;
+ display_name.write_fmt(format_args!(" {}", nt))?;
}
if let Some(fe) = filter {
- schema_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
+ display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
}
- if let Some(order_by) = order_by {
- schema_name
- .write_fmt(format_args!(" ORDER BY [{}]",
expr_vec_fmt!(order_by)))?;
+ if let Some(ob) = order_by {
+ display_name.write_fmt(format_args!(
+ " ORDER BY [{}]",
+ ob.iter()
+ .map(|o| format!("{o}"))
+ .collect::<Vec<String>>()
+ .join(", ")
+ ))?;
}
- Ok(schema_name)
+ Ok(display_name)
}
/// Returns the user-defined display name of function, given the arguments
diff --git a/datafusion/functions-aggregate/src/count.rs
b/datafusion/functions-aggregate/src/count.rs
index cb59042ef4..c11329d7f5 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -17,11 +17,15 @@
use ahash::RandomState;
use datafusion_common::stats::Precision;
+use datafusion_expr::expr::{
+ schema_name_from_exprs, schema_name_from_sorts, AggregateFunctionParams,
+ WindowFunctionParams,
+};
use
datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_macros::user_doc;
use datafusion_physical_expr::expressions;
use std::collections::HashSet;
-use std::fmt::Debug;
+use std::fmt::{Debug, Write};
use std::mem::{size_of, size_of_val};
use std::ops::BitAnd;
use std::sync::Arc;
@@ -47,11 +51,11 @@ use datafusion_common::{
downcast_value, internal_err, not_impl_err, Result, ScalarValue,
};
use datafusion_expr::function::StateFieldsArgs;
+use datafusion_expr::{expr_vec_fmt, Expr, ReversedUDAF, StatisticsArgs,
TypeSignature};
use datafusion_expr::{
function::AccumulatorArgs, utils::format_state_name, Accumulator,
AggregateUDFImpl,
Documentation, EmitTo, GroupsAccumulator, SetMonotonicity, Signature,
Volatility,
};
-use datafusion_expr::{Expr, ReversedUDAF, StatisticsArgs, TypeSignature};
use datafusion_functions_aggregate_common::aggregate::count_distinct::{
BytesDistinctCountAccumulator, FloatDistinctCountAccumulator,
PrimitiveDistinctCountAccumulator,
@@ -79,6 +83,11 @@ pub fn count_distinct(expr: Expr) -> Expr {
))
}
+/// Creates aggregation to count all rows, equivalent to `COUNT(*)`,
`COUNT()`, `COUNT(1)`
+pub fn count_all() -> Expr {
+ count(Expr::Literal(COUNT_STAR_EXPANSION))
+}
+
#[user_doc(
doc_section(label = "General Functions"),
description = "Returns the number of non-null values in the specified
column. To include null values in the total count, use `count(*)`.",
@@ -139,6 +148,185 @@ impl AggregateUDFImpl for Count {
"count"
}
+ fn schema_name(&self, params: &AggregateFunctionParams) -> Result<String> {
+ let AggregateFunctionParams {
+ args,
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ } = params;
+
+ let mut schema_name = String::new();
+
+ if is_count_wildcard(args) {
+ schema_name.write_str("count(*)")?;
+ } else {
+ schema_name.write_fmt(format_args!(
+ "{}({}{})",
+ self.name(),
+ if *distinct { "DISTINCT " } else { "" },
+ schema_name_from_exprs(args)?
+ ))?;
+ }
+
+ if let Some(null_treatment) = null_treatment {
+ schema_name.write_fmt(format_args!(" {}", null_treatment))?;
+ }
+
+ if let Some(filter) = filter {
+ schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?;
+ };
+
+ if let Some(order_by) = order_by {
+ schema_name.write_fmt(format_args!(
+ " ORDER BY [{}]",
+ schema_name_from_sorts(order_by)?
+ ))?;
+ };
+
+ Ok(schema_name)
+ }
+
+ fn window_function_schema_name(
+ &self,
+ params: &WindowFunctionParams,
+ ) -> Result<String> {
+ let WindowFunctionParams {
+ args,
+ partition_by,
+ order_by,
+ window_frame,
+ null_treatment,
+ } = params;
+
+ let mut schema_name = String::new();
+
+ if is_count_wildcard(args) {
+ schema_name.write_str("count(*)")?;
+ } else {
+ schema_name.write_fmt(format_args!(
+ "{}({})",
+ self.name(),
+ schema_name_from_exprs(args)?
+ ))?;
+ }
+
+ if let Some(null_treatment) = null_treatment {
+ schema_name.write_fmt(format_args!(" {}", null_treatment))?;
+ }
+
+ if !partition_by.is_empty() {
+ schema_name.write_fmt(format_args!(
+ " PARTITION BY [{}]",
+ schema_name_from_exprs(partition_by)?
+ ))?;
+ }
+
+ if !order_by.is_empty() {
+ schema_name.write_fmt(format_args!(
+ " ORDER BY [{}]",
+ schema_name_from_sorts(order_by)?
+ ))?;
+ };
+
+ schema_name.write_fmt(format_args!(" {window_frame}"))?;
+
+ Ok(schema_name)
+ }
+
+ fn display_name(&self, params: &AggregateFunctionParams) -> Result<String>
{
+ let AggregateFunctionParams {
+ args,
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ } = params;
+
+ let mut display_name = String::new();
+
+ if is_count_wildcard(args) {
+ display_name.write_str("count(*)")?;
+ } else {
+ display_name.write_fmt(format_args!(
+ "{}({}{})",
+ self.name(),
+ if *distinct { "DISTINCT " } else { "" },
+ args.iter()
+ .map(|arg| format!("{arg}"))
+ .collect::<Vec<String>>()
+ .join(", ")
+ ))?;
+ }
+
+ if let Some(nt) = null_treatment {
+ display_name.write_fmt(format_args!(" {}", nt))?;
+ }
+ if let Some(fe) = filter {
+ display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
+ }
+ if let Some(ob) = order_by {
+ display_name.write_fmt(format_args!(
+ " ORDER BY [{}]",
+ ob.iter()
+ .map(|o| format!("{o}"))
+ .collect::<Vec<String>>()
+ .join(", ")
+ ))?;
+ }
+
+ Ok(display_name)
+ }
+
+ fn window_function_display_name(
+ &self,
+ params: &WindowFunctionParams,
+ ) -> Result<String> {
+ let WindowFunctionParams {
+ args,
+ partition_by,
+ order_by,
+ window_frame,
+ null_treatment,
+ } = params;
+
+ let mut display_name = String::new();
+
+ if is_count_wildcard(args) {
+ display_name.write_str("count(*)")?;
+ } else {
+ display_name.write_fmt(format_args!(
+ "{}({})",
+ self.name(),
+ expr_vec_fmt!(args)
+ ))?;
+ }
+
+ if let Some(null_treatment) = null_treatment {
+ display_name.write_fmt(format_args!(" {}", null_treatment))?;
+ }
+
+ if !partition_by.is_empty() {
+ display_name.write_fmt(format_args!(
+ " PARTITION BY [{}]",
+ expr_vec_fmt!(partition_by)
+ ))?;
+ }
+
+ if !order_by.is_empty() {
+ display_name
+ .write_fmt(format_args!(" ORDER BY [{}]",
expr_vec_fmt!(order_by)))?;
+ };
+
+ display_name.write_fmt(format_args!(
+ " {} BETWEEN {} AND {}",
+ window_frame.units, window_frame.start_bound,
window_frame.end_bound
+ ))?;
+
+ Ok(display_name)
+ }
+
fn signature(&self) -> &Signature {
&self.signature
}
@@ -359,6 +547,15 @@ impl AggregateUDFImpl for Count {
}
}
+fn is_count_wildcard(args: &[Expr]) -> bool {
+ match args {
+ [] => true, // count()
+ // All const should be coerced to int64 or rejected by the signature
+ [Expr::Literal(ScalarValue::Int64(Some(_)))] => true, // count(1)
+ _ => false, // More than one argument or non-matching cases
+ }
+}
+
#[derive(Debug)]
struct CountAccumulator {
count: i64,
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index f4bdb53efd..a5c84298e9 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -64,28 +64,29 @@
pub mod macros;
pub mod approx_distinct;
+pub mod approx_median;
+pub mod approx_percentile_cont;
+pub mod approx_percentile_cont_with_weight;
pub mod array_agg;
+pub mod average;
+pub mod bit_and_or_xor;
+pub mod bool_and_or;
pub mod correlation;
pub mod count;
pub mod covariance;
pub mod first_last;
+pub mod grouping;
pub mod hyperloglog;
pub mod median;
pub mod min_max;
+pub mod nth_value;
pub mod regr;
pub mod stddev;
+pub mod string_agg;
pub mod sum;
pub mod variance;
-pub mod approx_median;
-pub mod approx_percentile_cont;
-pub mod approx_percentile_cont_with_weight;
-pub mod average;
-pub mod bit_and_or_xor;
-pub mod bool_and_or;
-pub mod grouping;
-pub mod nth_value;
-pub mod string_agg;
+pub mod planner;
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
use
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
diff --git a/datafusion/functions-aggregate/src/planner.rs
b/datafusion/functions-aggregate/src/planner.rs
new file mode 100644
index 0000000000..1f0a42c4c7
--- /dev/null
+++ b/datafusion/functions-aggregate/src/planner.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SQL planning extensions like [`AggregateFunctionPlanner`]
+
+use datafusion_common::Result;
+use datafusion_expr::{
+ expr::AggregateFunction,
+ lit,
+ planner::{ExprPlanner, PlannerResult, RawAggregateExpr},
+ utils::COUNT_STAR_EXPANSION,
+ Expr,
+};
+
+#[derive(Debug)]
+pub struct AggregateFunctionPlanner;
+
+impl ExprPlanner for AggregateFunctionPlanner {
+ fn plan_aggregate(
+ &self,
+ expr: RawAggregateExpr,
+ ) -> Result<PlannerResult<RawAggregateExpr>> {
+ if expr.func.name() == "count"
+ && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard
{ .. })
+ || expr.args.is_empty())
+ {
+ let RawAggregateExpr {
+ func,
+ args: _,
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ } = expr;
+ return Ok(PlannerResult::Planned(Expr::AggregateFunction(
+ AggregateFunction::new_udf(
+ func,
+ vec![lit(COUNT_STAR_EXPANSION)],
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ ),
+ )));
+ }
+
+ Ok(PlannerResult::Original(expr))
+ }
+}
diff --git a/datafusion/functions-window/src/lib.rs
b/datafusion/functions-window/src/lib.rs
index 0d932bf847..718b0bf158 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -45,6 +45,9 @@ pub mod nth_value;
pub mod ntile;
pub mod rank;
pub mod row_number;
+
+pub mod planner;
+
mod utils;
/// Fluent-style API for creating `Expr`s
diff --git a/datafusion/functions-window/src/planner.rs
b/datafusion/functions-window/src/planner.rs
new file mode 100644
index 0000000000..8f48ca8b18
--- /dev/null
+++ b/datafusion/functions-window/src/planner.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SQL planning extensions like [`WindowFunctionPlanner`]
+
+use datafusion_common::Result;
+use datafusion_expr::{
+ expr::WindowFunction,
+ lit,
+ planner::{ExprPlanner, PlannerResult, RawWindowExpr},
+ utils::COUNT_STAR_EXPANSION,
+ Expr, ExprFunctionExt,
+};
+
+#[derive(Debug)]
+pub struct WindowFunctionPlanner;
+
+impl ExprPlanner for WindowFunctionPlanner {
+ fn plan_window(&self, expr: RawWindowExpr) ->
Result<PlannerResult<RawWindowExpr>> {
+ if expr.func_def.name() == "count"
+ && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard
{ .. })
+ || expr.args.is_empty())
+ {
+ let RawWindowExpr {
+ func_def,
+ args: _,
+ partition_by,
+ order_by,
+ window_frame,
+ null_treatment,
+ } = expr;
+ return Ok(PlannerResult::Planned(
+ Expr::WindowFunction(WindowFunction::new(
+ func_def,
+ vec![lit(COUNT_STAR_EXPANSION)],
+ ))
+ .partition_by(partition_by)
+ .order_by(order_by)
+ .window_frame(window_frame)
+ .null_treatment(null_treatment)
+ .build()?,
+ ));
+ }
+
+ Ok(PlannerResult::Original(expr))
+ }
+}
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
deleted file mode 100644
index f517761b1e..0000000000
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ /dev/null
@@ -1,277 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::analyzer::AnalyzerRule;
-
-use crate::utils::NamePreserver;
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::Result;
-use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams,
WindowFunction};
-use datafusion_expr::utils::COUNT_STAR_EXPANSION;
-use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition};
-
-/// Rewrite `Count(Expr::Wildcard)` to `Count(Expr::Literal)`.
-///
-/// Resolves issue: <https://github.com/apache/datafusion/issues/5473>
-#[derive(Default, Debug)]
-pub struct CountWildcardRule {}
-
-impl CountWildcardRule {
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl AnalyzerRule for CountWildcardRule {
- fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_down_with_subqueries(analyze_internal).data()
- }
-
- fn name(&self) -> &str {
- "count_wildcard_rule"
- }
-}
-
-fn is_wildcard(expr: &Expr) -> bool {
- matches!(expr, Expr::Wildcard { .. })
-}
-
-fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool {
- matches!(aggregate_function,
- AggregateFunction {
- func,
- params: AggregateFunctionParams { args, .. },
- } if func.name() == "count" && (args.len() == 1 &&
is_wildcard(&args[0]) || args.is_empty()))
-}
-
-fn is_count_star_window_aggregate(window_function: &WindowFunction) -> bool {
- let args = &window_function.params.args;
- matches!(window_function.fun,
- WindowFunctionDefinition::AggregateUDF(ref udaf)
- if udaf.name() == "count" && (args.len() == 1 &&
is_wildcard(&args[0]) || args.is_empty()))
-}
-
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
- let name_preserver = NamePreserver::new(&plan);
- plan.map_expressions(|expr| {
- let original_name = name_preserver.save(&expr);
- let transformed_expr = expr.transform_up(|expr| match expr {
- Expr::WindowFunction(mut window_function)
- if is_count_star_window_aggregate(&window_function) =>
- {
- window_function.params.args = vec![lit(COUNT_STAR_EXPANSION)];
- Ok(Transformed::yes(Expr::WindowFunction(window_function)))
- }
- Expr::AggregateFunction(mut aggregate_function)
- if is_count_star_aggregate(&aggregate_function) =>
- {
- aggregate_function.params.args =
vec![lit(COUNT_STAR_EXPANSION)];
- Ok(Transformed::yes(Expr::AggregateFunction(
- aggregate_function,
- )))
- }
- _ => Ok(Transformed::no(expr)),
- })?;
- Ok(transformed_expr.update_data(|data| original_name.restore(data)))
- })
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::test::*;
- use arrow::datatypes::DataType;
- use datafusion_common::ScalarValue;
- use datafusion_expr::expr::Sort;
- use datafusion_expr::ExprFunctionExt;
- use datafusion_expr::{
- col, exists, in_subquery, logical_plan::LogicalPlanBuilder,
out_ref_col,
- scalar_subquery, wildcard, WindowFrame, WindowFrameBound,
WindowFrameUnits,
- };
- use datafusion_functions_aggregate::count::count_udaf;
- use datafusion_functions_aggregate::expr_fn::max;
- use std::sync::Arc;
-
- use datafusion_functions_aggregate::expr_fn::{count, sum};
-
- fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
- assert_analyzed_plan_eq_display_indent(
- Arc::new(CountWildcardRule::new()),
- plan,
- expected,
- )
- }
-
- #[test]
- fn test_count_wildcard_on_sort() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("b")], vec![count(wildcard())])?
- .project(vec![count(wildcard())])?
- .sort(vec![count(wildcard()).sort(true, false)])?
- .build()?;
- let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\
- \n Projection: count(*) [count(*):Int64]\
- \n Aggregate: groupBy=[[test.b]], aggr=[[count(Int64(1)) AS
count(*)]] [b:UInt32, count(*):Int64]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_count_wildcard_on_where_in() -> Result<()> {
- let table_scan_t1 = test_table_scan_with_name("t1")?;
- let table_scan_t2 = test_table_scan_with_name("t2")?;
-
- let plan = LogicalPlanBuilder::from(table_scan_t1)
- .filter(in_subquery(
- col("a"),
- Arc::new(
- LogicalPlanBuilder::from(table_scan_t2)
- .aggregate(Vec::<Expr>::new(),
vec![count(wildcard())])?
- .project(vec![count(wildcard())])?
- .build()?,
- ),
- ))?
- .build()?;
-
- let expected = "Filter: t1.a IN (<subquery>) [a:UInt32, b:UInt32,
c:UInt32]\
- \n Subquery: [count(*):Int64]\
- \n Projection: count(*) [count(*):Int64]\
- \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
[count(*):Int64]\
- \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_count_wildcard_on_where_exists() -> Result<()> {
- let table_scan_t1 = test_table_scan_with_name("t1")?;
- let table_scan_t2 = test_table_scan_with_name("t2")?;
-
- let plan = LogicalPlanBuilder::from(table_scan_t1)
- .filter(exists(Arc::new(
- LogicalPlanBuilder::from(table_scan_t2)
- .aggregate(Vec::<Expr>::new(), vec![count(wildcard())])?
- .project(vec![count(wildcard())])?
- .build()?,
- )))?
- .build()?;
-
- let expected = "Filter: EXISTS (<subquery>) [a:UInt32, b:UInt32,
c:UInt32]\
- \n Subquery: [count(*):Int64]\
- \n Projection: count(*) [count(*):Int64]\
- \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
[count(*):Int64]\
- \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
- let table_scan_t1 = test_table_scan_with_name("t1")?;
- let table_scan_t2 = test_table_scan_with_name("t2")?;
-
- let plan = LogicalPlanBuilder::from(table_scan_t1)
- .filter(
- scalar_subquery(Arc::new(
- LogicalPlanBuilder::from(table_scan_t2)
- .filter(out_ref_col(DataType::UInt32,
"t1.a").eq(col("t2.a")))?
- .aggregate(
- Vec::<Expr>::new(),
- vec![count(lit(COUNT_STAR_EXPANSION))],
- )?
- .project(vec![count(lit(COUNT_STAR_EXPANSION))])?
- .build()?,
- ))
- .gt(lit(ScalarValue::UInt8(Some(0)))),
- )?
- .project(vec![col("t1.a"), col("t1.b")])?
- .build()?;
-
- let expected = "Projection: t1.a, t1.b [a:UInt32, b:UInt32]\
- \n Filter: (<subquery>) > UInt8(0) [a:UInt32, b:UInt32,
c:UInt32]\
- \n Subquery: [count(Int64(1)):Int64]\
- \n Projection: count(Int64(1)) [count(Int64(1)):Int64]\
- \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
[count(Int64(1)):Int64]\
- \n Filter: outer_ref(t1.a) = t2.a [a:UInt32, b:UInt32,
c:UInt32]\
- \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
- #[test]
- fn test_count_wildcard_on_window() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .window(vec![Expr::WindowFunction(WindowFunction::new(
- WindowFunctionDefinition::AggregateUDF(count_udaf()),
- vec![wildcard()],
- ))
- .order_by(vec![Sort::new(col("a"), false, true)])
- .window_frame(WindowFrame::new_bounds(
- WindowFrameUnits::Range,
- WindowFrameBound::Preceding(ScalarValue::UInt32(Some(6))),
- WindowFrameBound::Following(ScalarValue::UInt32(Some(2))),
- ))
- .build()?])?
- .project(vec![count(wildcard())])?
- .build()?;
-
- let expected = "Projection: count(Int64(1)) AS count(*)
[count(*):Int64]\
- \n WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]]
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST]
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_count_wildcard_on_aggregate() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(Vec::<Expr>::new(), vec![count(wildcard())])?
- .project(vec![count(wildcard())])?
- .build()?;
-
- let expected = "Projection: count(*) [count(*):Int64]\
- \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
[count(*):Int64]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_count_wildcard_on_non_count_aggregate() -> Result<()> {
- let table_scan = test_table_scan()?;
- let res = LogicalPlanBuilder::from(table_scan)
- .aggregate(Vec::<Expr>::new(), vec![sum(wildcard())]);
- assert!(res.is_err());
- Ok(())
- }
-
- #[test]
- fn test_count_wildcard_on_nesting() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(Vec::<Expr>::new(), vec![max(count(wildcard()))])?
- .project(vec![count(wildcard())])?
- .build()?;
-
- let expected = "Projection: count(Int64(1)) AS count(*)
[count(*):Int64]\
- \n Aggregate: groupBy=[[]], aggr=[[max(count(Int64(1))) AS
max(count(*))]] [max(count(*)):Int64;N]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
- assert_plan_eq(plan, expected)
- }
-}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index 9d0ac6b54c..c506616d14 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -28,7 +28,6 @@ use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{InvariantLevel, LogicalPlan};
-use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
@@ -37,7 +36,6 @@ use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;
-pub mod count_wildcard_rule;
pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
@@ -106,7 +104,6 @@ impl Analyzer {
// [Expr::Wildcard] should be expanded before [TypeCoercion]
Arc::new(ResolveGroupingFunction::new()),
Arc::new(TypeCoercion::new()),
- Arc::new(CountWildcardRule::new()),
];
Self::with_rules(rules)
}
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs
b/datafusion/optimizer/tests/optimizer_integration.rs
index a33ecbc3a1..b59acd72a2 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -23,11 +23,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef,
TimeUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, plan_err, Result, TableReference};
+use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource,
WindowUDF};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_functions_aggregate::planner::AggregateFunctionPlanner;
+use datafusion_functions_window::planner::WindowFunctionPlanner;
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
@@ -195,7 +198,7 @@ fn between_date32_plus_interval() -> Result<()> {
WHERE col_date32 between '1998-03-18' AND cast('1998-03-18' as date) +
INTERVAL '90 days'";
let plan = test_sql(sql)?;
let expected =
- "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+ "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n Projection: \
\n Filter: test.col_date32 >= Date32(\"1998-03-18\") AND
test.col_date32 <= Date32(\"1998-06-16\")\
\n TableScan: test projection=[col_date32]";
@@ -209,7 +212,7 @@ fn between_date64_plus_interval() -> Result<()> {
WHERE col_date64 between '1998-03-18T00:00:00' AND cast('1998-03-18' as
date) + INTERVAL '90 days'";
let plan = test_sql(sql)?;
let expected =
- "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+ "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n Projection: \
\n Filter: test.col_date64 >= Date64(\"1998-03-18\") AND
test.col_date64 <= Date64(\"1998-06-16\")\
\n TableScan: test projection=[col_date64]";
@@ -266,7 +269,7 @@ fn push_down_filter_groupby_expr_contains_alias() {
let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*)
FROM test GROUP BY 1) where c > 3";
let plan = test_sql(sql).unwrap();
let expected = "Projection: test.col_int32 + test.col_uint32 AS c,
count(*)\
- \n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS
Int32)]], aggr=[[count(Int64(1)) AS count(*)]]\
+ \n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS
Int32)]], aggr=[[count(*)]]\
\n Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\
\n TableScan: test projection=[col_int32, col_uint32]";
assert_eq!(expected, format!("{plan}"));
@@ -311,7 +314,7 @@ fn eliminate_redundant_null_check_on_count() {
let plan = test_sql(sql).unwrap();
let expected = "\
Projection: test.col_int32, count(*) AS c\
- \n Aggregate: groupBy=[[test.col_int32]], aggr=[[count(Int64(1)) AS
count(*)]]\
+ \n Aggregate: groupBy=[[test.col_int32]], aggr=[[count(*)]]\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan}"));
}
@@ -422,7 +425,12 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let context_provider = MyContextProvider::default()
.with_udaf(sum_udaf())
.with_udaf(count_udaf())
- .with_udaf(avg_udaf());
+ .with_udaf(avg_udaf())
+ .with_expr_planners(vec![
+ Arc::new(AggregateFunctionPlanner),
+ Arc::new(WindowFunctionPlanner),
+ ]);
+
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone())?;
@@ -440,6 +448,7 @@ fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule)
{}
struct MyContextProvider {
options: ConfigOptions,
udafs: HashMap<String, Arc<AggregateUDF>>,
+ expr_planners: Vec<Arc<dyn ExprPlanner>>,
}
impl MyContextProvider {
@@ -448,6 +457,11 @@ impl MyContextProvider {
self.udafs.insert(udaf.name().to_lowercase(), udaf);
self
}
+
+ fn with_expr_planners(mut self, expr_planners: Vec<Arc<dyn ExprPlanner>>)
-> Self {
+ self.expr_planners = expr_planners;
+ self
+ }
}
impl ContextProvider for MyContextProvider {
@@ -516,6 +530,10 @@ impl ContextProvider for MyContextProvider {
fn udwf_names(&self) -> Vec<String> {
Vec::new()
}
+
+ fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
+ &self.expr_planners
+ }
}
struct MyTableSource {
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 1cf3dcb289..035749a789 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -23,7 +23,7 @@ use datafusion_common::{
DFSchema, Dependency, Result,
};
use datafusion_expr::expr::{ScalarFunction, Unnest};
-use datafusion_expr::planner::PlannerResult;
+use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr};
use datafusion_expr::{
expr, qualified_wildcard, wildcard, Expr, ExprFunctionExt, ExprSchemable,
WindowFrame, WindowFunctionDefinition,
@@ -315,15 +315,38 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
};
if let Ok(fun) = self.find_window_func(&name) {
- return Expr::WindowFunction(expr::WindowFunction::new(
- fun,
- self.function_args_to_expr(args, schema, planner_context)?,
- ))
- .partition_by(partition_by)
- .order_by(order_by)
- .window_frame(window_frame)
- .null_treatment(null_treatment)
- .build();
+ let args = self.function_args_to_expr(args, schema,
planner_context)?;
+ let mut window_expr = RawWindowExpr {
+ func_def: fun,
+ args,
+ partition_by,
+ order_by,
+ window_frame,
+ null_treatment,
+ };
+
+ for planner in
self.context_provider.get_expr_planners().iter() {
+ match planner.plan_window(window_expr)? {
+ PlannerResult::Planned(expr) => return Ok(expr),
+ PlannerResult::Original(expr) => window_expr = expr,
+ }
+ }
+
+ let RawWindowExpr {
+ func_def,
+ args,
+ partition_by,
+ order_by,
+ window_frame,
+ null_treatment,
+ } = window_expr;
+
+ return
Expr::WindowFunction(expr::WindowFunction::new(func_def, args))
+ .partition_by(partition_by)
+ .order_by(order_by)
+ .window_frame(window_frame)
+ .null_treatment(null_treatment)
+ .build();
}
} else {
// User defined aggregate functions (UDAF) have precedence in case
it has the same name as a scalar built-in function
@@ -341,8 +364,33 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.map(|e| self.sql_expr_to_logical_expr(*e, schema,
planner_context))
.transpose()?
.map(Box::new);
+
+ let mut aggregate_expr = RawAggregateExpr {
+ func: fm,
+ args,
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ };
+ for planner in
self.context_provider.get_expr_planners().iter() {
+ match planner.plan_aggregate(aggregate_expr)? {
+ PlannerResult::Planned(expr) => return Ok(expr),
+ PlannerResult::Original(expr) => aggregate_expr = expr,
+ }
+ }
+
+ let RawAggregateExpr {
+ func,
+ args,
+ distinct,
+ filter,
+ order_by,
+ null_treatment,
+ } = aggregate_expr;
+
return
Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
- fm,
+ func,
args,
distinct,
filter,
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 1df1830268..9c0d6316ad 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -1461,14 +1461,14 @@ fn
select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby()
fn select_simple_aggregate_with_groupby_can_use_positions() {
quick_test(
"SELECT state, age AS b, count(1) FROM person GROUP BY 1, 2",
- "Projection: person.state, person.age AS b, count(Int64(1))\
- \n Aggregate: groupBy=[[person.state, person.age]],
aggr=[[count(Int64(1))]]\
+ "Projection: person.state, person.age AS b, count(*)\
+ \n Aggregate: groupBy=[[person.state, person.age]],
aggr=[[count(*)]]\
\n TableScan: person",
);
quick_test(
"SELECT state, age AS b, count(1) FROM person GROUP BY 2, 1",
- "Projection: person.state, person.age AS b, count(Int64(1))\
- \n Aggregate: groupBy=[[person.age, person.state]],
aggr=[[count(Int64(1))]]\
+ "Projection: person.state, person.age AS b, count(*)\
+ \n Aggregate: groupBy=[[person.age, person.state]],
aggr=[[count(*)]]\
\n TableScan: person",
);
}
@@ -1630,8 +1630,8 @@ fn test_wildcard() {
#[test]
fn select_count_one() {
let sql = "SELECT count(1) FROM person";
- let expected = "Projection: count(Int64(1))\
- \n Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+ let expected = "Projection: count(*)\
+ \n Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n TableScan: person";
quick_test(sql, expected);
}
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 7caa81d64e..f175973f92 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -6276,6 +6276,26 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5],
file_type=csv, has_header=true
+# test count(null) case (null with type)
+
+statement count 0
+create table t(a int, b int) as values (1, 3), (2, 4), (3, 5);
+
+query I
+select count(null::bigint) from t;
+----
+0
+
+query TT
+explain select count(null::bigint) from t;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[count(Int64(NULL)) AS count(NULL)]]
+02)--TableScan: t projection=[]
+physical_plan
+01)AggregateExec: mode=Single, gby=[], aggr=[count(NULL)]
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+
#######
# Group median test
#######
diff --git a/datafusion/sqllogictest/test_files/avro.slt
b/datafusion/sqllogictest/test_files/avro.slt
index 80bf0bc2dd..20179e0c5b 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -243,7 +243,7 @@ query TT
EXPLAIN SELECT count(*) from alltypes_plain
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--TableScan: alltypes_plain projection=[]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[count(*)]
diff --git a/datafusion/sqllogictest/test_files/coalesce.slt
b/datafusion/sqllogictest/test_files/coalesce.slt
index 5f2d2f0d1d..e7cf31dc69 100644
--- a/datafusion/sqllogictest/test_files/coalesce.slt
+++ b/datafusion/sqllogictest/test_files/coalesce.slt
@@ -442,4 +442,4 @@ drop table test
query T
select coalesce(arrow_cast('', 'Utf8View'), arrow_cast('', 'Dictionary(UInt32,
Utf8)'));
----
-(empty)
\ No newline at end of file
+(empty)
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index 7dd85b3ae2..f39ff56ce4 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -631,4 +631,3 @@ COPY source_table to '/tmp/table.parquet' (row_group_size
55 + 102);
# Copy using execution.keep_partition_by_columns with an invalid value
query error DataFusion error: Invalid or Unsupported Configuration: provided
value for 'execution.keep_partition_by_columns' was not recognized:
"invalid_value"
COPY source_table to '/tmp/table.parquet' OPTIONS
(execution.keep_partition_by_columns invalid_value);
-
diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt
b/datafusion/sqllogictest/test_files/count_star_rule.slt
index d660257b60..0efd9e9988 100644
--- a/datafusion/sqllogictest/test_files/count_star_rule.slt
+++ b/datafusion/sqllogictest/test_files/count_star_rule.slt
@@ -31,44 +31,44 @@ query TT
EXPLAIN SELECT COUNT() FROM (SELECT 1 AS a, 2 AS b) AS t;
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count()]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--SubqueryAlias: t
03)----EmptyRelation
physical_plan
-01)ProjectionExec: expr=[1 as count()]
+01)ProjectionExec: expr=[1 as count(*)]
02)--PlaceholderRowExec
query TT
EXPLAIN SELECT t1.a, COUNT() FROM t1 GROUP BY t1.a;
----
logical_plan
-01)Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]]
+01)Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]]
02)--TableScan: t1 projection=[a]
physical_plan
-01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()]
+01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()]
+05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
query TT
EXPLAIN SELECT t1.a, COUNT() AS cnt FROM t1 GROUP BY t1.a HAVING COUNT() > 0;
----
logical_plan
-01)Projection: t1.a, count() AS cnt
-02)--Filter: count() > Int64(0)
-03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]]
+01)Projection: t1.a, count(*) AS cnt
+02)--Filter: count(*) > Int64(0)
+03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]]
04)------TableScan: t1 projection=[a]
physical_plan
-01)ProjectionExec: expr=[a@0 as a, count()@1 as cnt]
+01)ProjectionExec: expr=[a@0 as a, count(*)@1 as cnt]
02)--CoalesceBatchesExec: target_batch_size=8192
-03)----FilterExec: count()@1 > 0
-04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()]
+03)----FilterExec: count(*)@1 > 0
+04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()]
+08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
query II
@@ -80,12 +80,12 @@ query TT
EXPLAIN SELECT a, COUNT() OVER (PARTITION BY a) AS count_a FROM t1;
----
logical_plan
-01)Projection: t1.a, count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS count_a
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS count() PARTITION BY [t1.a] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+01)Projection: t1.a, count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS count_a
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
03)----TableScan: t1 projection=[a]
physical_plan
-01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
-02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]
+01)ProjectionExec: expr=[a@0 as a, count(*) PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
+02)--WindowAggExec: wdw=[count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(*) PARTITION BY
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------DataSourceExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/sqllogictest/test_files/ddl.slt
b/datafusion/sqllogictest/test_files/ddl.slt
index aefc2672b5..6f75a7d7f8 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -827,4 +827,3 @@ drop table table_with_pk;
statement ok
set datafusion.catalog.information_schema = false;
-
diff --git a/datafusion/sqllogictest/test_files/errors.slt
b/datafusion/sqllogictest/test_files/errors.slt
index a35a4d6f28..dc7a53adf8 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -184,4 +184,4 @@ query error DataFusion error: Schema error: No field named
ammp\. Did you mean '
select ammp from a;
statement ok
-drop table a;
\ No newline at end of file
+drop table a;
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 037565ce05..0d5eab6cf5 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -178,7 +178,6 @@ logical_plan after inline_table_scan SAME TEXT AS ABOVE
logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE
logical_plan after resolve_grouping_function SAME TEXT AS ABOVE
logical_plan after type_coercion SAME TEXT AS ABOVE
-logical_plan after count_wildcard_rule SAME TEXT AS ABOVE
analyzed_logical_plan SAME TEXT AS ABOVE
logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
logical_plan after simplify_expressions SAME TEXT AS ABOVE
@@ -427,7 +426,7 @@ logical_plan
02)--TableScan: t1 projection=[a]
03)--SubqueryAlias: __correlated_sq_1
04)----Projection:
-05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+05)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
06)--------TableScan: t2 projection=[]
physical_plan
01)NestedLoopJoinExec: join_type=LeftSemi
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index ee76ee1c55..32428fdef7 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -61,7 +61,7 @@ logical_plan
02)--Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
04)------Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
01)DataSinkExec: sink=MemoryTable (partitions=1)
@@ -122,7 +122,7 @@ FROM aggregate_test_100
logical_plan
01)Dml: op=[Insert Into] table=[table_without_values]
02)--Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
04)------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
01)DataSinkExec: sink=MemoryTable (partitions=1)
@@ -172,7 +172,7 @@ logical_plan
02)--Projection: a1 AS a1, a2 AS a2
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
04)------Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
01)DataSinkExec: sink=MemoryTable (partitions=8)
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index ee1d67c5e2..752e8ce0e4 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -352,7 +352,7 @@ logical_plan
02)--Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
04)------Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
@@ -414,7 +414,7 @@ FROM aggregate_test_100
logical_plan
01)Dml: op=[Insert Into] table=[table_without_values]
02)--Projection: sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
04)------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 5d311bc432..5b5368f6b0 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1396,7 +1396,7 @@ group by t1_id
----
logical_plan
01)Projection: count(*)
-02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(*)]]
03)----Projection: join_t1.t1_id
04)------Inner Join: join_t1.t1_id = join_t2.t2_id
05)--------TableScan: join_t1 projection=[t1_id]
@@ -4442,7 +4442,7 @@ FROM my_catalog.my_schema.table_with_many_types AS l
JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col =
r.binary_col
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--Projection:
03)----Inner Join: l.binary_col = r.binary_col
04)------SubqueryAlias: l
diff --git a/datafusion/sqllogictest/test_files/json.slt
b/datafusion/sqllogictest/test_files/json.slt
index dd310f7f2b..466bba5566 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -54,7 +54,7 @@ query TT
EXPLAIN SELECT count(*) from json_test
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--TableScan: json_test projection=[]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[count(*)]
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index 4e74b27b87..b4487be850 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -307,7 +307,7 @@ query TT
EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11);
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--Limit: skip=11, fetch=3
03)----TableScan: t1 projection=[], fetch=14
physical_plan
@@ -325,7 +325,7 @@ query TT
EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--Limit: skip=8, fetch=3
03)----TableScan: t1 projection=[], fetch=11
physical_plan
@@ -343,7 +343,7 @@ query TT
EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 OFFSET 8);
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--Limit: skip=8, fetch=None
03)----TableScan: t1 projection=[]
physical_plan
@@ -360,7 +360,7 @@ query TT
EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 WHERE a > 3 LIMIT 3 OFFSET 6);
----
logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
02)--Projection:
03)----Limit: skip=6, fetch=3
04)------Filter: t1.a > Int32(3)
diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
index de6a153f58..8c87af75ed 100644
--- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
+++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
@@ -48,8 +48,8 @@ FROM test_table t
GROUP BY 1, 2, 3, 4
----
logical_plan
-01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(Int64(1))
-02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(Int64(1))]]
+01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(*)
+02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(*)]]
03)----SubqueryAlias: t
04)------TableScan: test_table projection=[c1, c5, c8]
@@ -60,8 +60,8 @@ FROM test_table t
group by 1, 2, 3
----
logical_plan
-01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), avg(t.c12)
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), avg(t.c12)]]
+01)Projection: Int64(123), Int64(456), Int64(789), count(*), avg(t.c12)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*), avg(t.c12)]]
03)----SubqueryAlias: t
04)------TableScan: test_table projection=[c12]
@@ -72,8 +72,8 @@ FROM test_table t
GROUP BY 1, 2
----
logical_plan
-01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter,
count(Int64(1))
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter,
count(*)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
03)----SubqueryAlias: t
04)------TableScan: test_table projection=[]
@@ -90,8 +90,8 @@ FROM test_table t
GROUP BY 1
----
logical_plan
-01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN
Int64(50) AND Int64(60), count(Int64(1))
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN
Int64(50) AND Int64(60), count(*)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
03)----SubqueryAlias: t
04)------TableScan: test_table projection=[]
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index e12bdca37e..dcd373546d 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1541,7 +1541,7 @@ LIMIT 4)
GROUP BY c2;
----
logical_plan
-01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(Int64(1)) AS
count(*)]]
+01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(*)]]
02)--Projection: aggregate_test_100.c2
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC
NULLS LAST, fetch=4
04)------Projection: aggregate_test_100.c2, aggregate_test_100.c1
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 264392fc10..c847f433f7 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -555,7 +555,7 @@ logical_plan
03)----Subquery:
04)------Projection: count(*)
05)--------Filter: sum(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
-06)----------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*),
sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
+06)----------Aggregate: groupBy=[[]], aggr=[[count(*),
sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
07)------------Filter: outer_ref(t1.t1_name) = t2.t2_name
08)--------------TableScan: t2
09)----TableScan: t1 projection=[t1_id, t1_name, t1_int]
@@ -738,7 +738,7 @@ explain select (select count(*) from t1) as b
logical_plan
01)Projection: __scalar_sq_1.count(*) AS b
02)--SubqueryAlias: __scalar_sq_1
-03)----Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+03)----Aggregate: groupBy=[[]], aggr=[[count(*)]]
04)------TableScan: t1 projection=[]
#simple_uncorrelated_scalar_subquery2
@@ -746,13 +746,13 @@ query TT
explain select (select count(*) from t1) as b, (select count(1) from t2)
----
logical_plan
-01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS
count(Int64(1))
+01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*)
02)--Left Join:
03)----SubqueryAlias: __scalar_sq_1
-04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+04)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
05)--------TableScan: t1 projection=[]
06)----SubqueryAlias: __scalar_sq_2
-07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+07)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
08)--------TableScan: t2 projection=[]
statement ok
@@ -762,20 +762,20 @@ query TT
explain select (select count(*) from t1) as b, (select count(1) from t2)
----
logical_plan
-01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS
count(Int64(1))
+01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*)
02)--Left Join:
03)----SubqueryAlias: __scalar_sq_1
-04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+04)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
05)--------TableScan: t1 projection=[]
06)----SubqueryAlias: __scalar_sq_2
-07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+07)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
08)--------TableScan: t2 projection=[]
physical_plan
-01)ProjectionExec: expr=[count(*)@0 as b, count(Int64(1))@1 as count(Int64(1))]
+01)ProjectionExec: expr=[count(*)@0 as b, count(*)@1 as count(*)]
02)--NestedLoopJoinExec: join_type=Left
03)----ProjectionExec: expr=[4 as count(*)]
04)------PlaceholderRowExec
-05)----ProjectionExec: expr=[4 as count(Int64(1))]
+05)----ProjectionExec: expr=[4 as count(*)]
06)------PlaceholderRowExec
statement ok
@@ -796,7 +796,7 @@ logical_plan
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
07)----------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -818,7 +818,7 @@ logical_plan
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
07)----------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -839,7 +839,7 @@ logical_plan
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*) AS _cnt, t2.t2_int, Boolean(true) AS
__always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
07)----------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -860,7 +860,7 @@ logical_plan
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*) + Int64(2) AS _cnt, t2.t2_int, Boolean(true) AS
__always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
07)----------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -883,7 +883,7 @@ logical_plan
05)--------TableScan: t1 projection=[t1_id, t1_int]
06)--------SubqueryAlias: __scalar_sq_1
07)----------Projection: count(*), t2.t2_id, Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(Int64(1)) AS
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(*)]]
09)--------------TableScan: t2 projection=[t2_id]
query I rowsort
@@ -905,7 +905,7 @@ logical_plan
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int
06)--------Filter: count(*) > Int64(1)
-07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
08)------------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -927,7 +927,7 @@ logical_plan
03)----TableScan: t1 projection=[t1_id, t1_int]
04)----SubqueryAlias: __scalar_sq_1
05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int, count(*),
Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
07)----------TableScan: t2 projection=[t2_int]
query II rowsort
@@ -951,7 +951,7 @@ logical_plan
06)----------TableScan: t1 projection=[t1_int]
07)--------SubqueryAlias: __scalar_sq_1
08)----------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
10)--------------TableScan: t2 projection=[t2_int]
query I rowsort
@@ -972,7 +972,7 @@ logical_plan
05)--------TableScan: t1 projection=[t1_int]
06)--------SubqueryAlias: __scalar_sq_1
07)----------Projection: count(*) AS cnt, t2.t2_int, Boolean(true) AS
__always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
09)--------------TableScan: t2 projection=[t2_int]
@@ -1002,7 +1002,7 @@ logical_plan
05)--------TableScan: t1 projection=[t1_int]
06)--------SubqueryAlias: __scalar_sq_1
07)----------Projection: count(*) + Int64(1) + Int64(1) AS cnt_plus_two,
t2.t2_int, count(*), Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
09)--------------TableScan: t2 projection=[t2_int]
query I rowsort
@@ -1031,7 +1031,7 @@ logical_plan
05)--------TableScan: t1 projection=[t1_int]
06)--------SubqueryAlias: __scalar_sq_1
07)----------Projection: CASE WHEN count(*) = Int64(1) THEN Int64(NULL) ELSE
count(*) END AS cnt, t2.t2_int, Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
09)--------------TableScan: t2 projection=[t2_int]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
index 2616b7b75b..6a41ecb51b 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
@@ -42,7 +42,7 @@ explain select
logical_plan
01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS
LAST
02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus,
sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS
sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge,
avg(lineitem.l_quantity) AS avg_qty, avg(lineitem.l_extendedprice) AS
avg_price, avg(lineitem.l_discount) AS avg_disc, count(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice),
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) +
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1)) AS
[...]
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice),
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) +
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]]
04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity,
lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax,
lineitem.l_returnflag, lineitem.l_linestatus
05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02")
06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate],
partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
index eb41445c3c..68532733c6 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
@@ -42,7 +42,7 @@ limit 10;
logical_plan
01)Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST, fetch=10
02)--Projection: c_orders.c_count, count(*) AS custdist
-03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(Int64(1)) AS
count(*)]]
+03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(*)]]
04)------SubqueryAlias: c_orders
05)--------Projection: count(orders.o_orderkey) AS c_count
06)----------Aggregate: groupBy=[[customer.c_custkey]],
aggr=[[count(orders.o_orderkey)]]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
index 9e39732689..eb10f4c8d1 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
@@ -60,7 +60,7 @@ order by
logical_plan
01)Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
02)--Projection: supplier.s_name, count(*) AS numwait
-03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(Int64(1)) AS
count(*)]]
+03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(*)]]
04)------Projection: supplier.s_name
05)--------LeftAnti Join: l1.l_orderkey = __correlated_sq_2.l_orderkey Filter:
__correlated_sq_2.l_suppkey != l1.l_suppkey
06)----------LeftSemi Join: l1.l_orderkey = __correlated_sq_1.l_orderkey
Filter: __correlated_sq_1.l_suppkey != l1.l_suppkey
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
index 9ad9936125..af8b7948c1 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
@@ -58,7 +58,7 @@ order by
logical_plan
01)Sort: custsale.cntrycode ASC NULLS LAST
02)--Projection: custsale.cntrycode, count(*) AS numcust,
sum(custsale.c_acctbal) AS totacctbal
-03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(Int64(1)) AS
count(*), sum(custsale.c_acctbal)]]
+03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(*),
sum(custsale.c_acctbal)]]
04)------SubqueryAlias: custsale
05)--------Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS
cntrycode, customer.c_acctbal
06)----------Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(19,
6)) > __scalar_sq_2.avg(customer.c_acctbal)
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
index fb93850ab0..766b21c22f 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
@@ -42,7 +42,7 @@ order by
logical_plan
01)Sort: orders.o_orderpriority ASC NULLS LAST
02)--Projection: orders.o_orderpriority, count(*) AS order_count
-03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(Int64(1))
AS count(*)]]
+03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(*)]]
04)------Projection: orders.o_orderpriority
05)--------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
06)----------Projection: orders.o_orderkey, orders.o_orderpriority
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index dfac9c0310..57207f00f7 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -449,7 +449,7 @@ SELECT count(*) FROM (
----
logical_plan
01)Projection: count(*)
-02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(*)]]
03)----Union
04)------Aggregate: groupBy=[[t1.name]], aggr=[[]]
05)--------TableScan: t1 projection=[name]
@@ -493,7 +493,7 @@ logical_plan
02)--Union
03)----Projection: count(*) AS cnt
04)------Limit: skip=0, fetch=3
-05)--------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+05)--------Aggregate: groupBy=[[]], aggr=[[count(*)]]
06)----------SubqueryAlias: a
07)------------Projection:
08)--------------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
@@ -651,7 +651,7 @@ select x, y from (select 1 as x , max(10) as y) b
logical_plan
01)Union
02)--Projection: count(*) AS count, a.n
-03)----Aggregate: groupBy=[[a.n]], aggr=[[count(Int64(1)) AS count(*)]]
+03)----Aggregate: groupBy=[[a.n]], aggr=[[count(*)]]
04)------SubqueryAlias: a
05)--------Projection: Int64(5) AS n
06)----------EmptyRelation
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index ca4713e7d5..6c00af879e 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1305,7 +1305,7 @@ EXPLAIN SELECT
----
logical_plan
01)Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING]]
03)----Projection: aggregate_test_100.c1, aggregate_test_100.c2,
sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING
04)------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING]]
05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4]
@@ -1765,7 +1765,7 @@ EXPLAIN SELECT count(*) as global_count FROM
----
logical_plan
01)Projection: count(*) AS global_count
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
03)----SubqueryAlias: a
04)------Projection:
05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
@@ -2571,10 +2571,10 @@ logical_plan
01)Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1,
cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1,
cntr2, sum4, cnt3
02)--Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
03)----Projection: sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING AS sum1, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING AS sum2, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING AS sum3, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_
[...]
-04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING,
count(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS count(*) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING,
count(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
05)--------Projection: __common_expr_1, annotated_data_finite.inc_col,
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING,
min(annotated_data_finite.i [...]
-06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(anno [...]
-07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, m [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(anno [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, m [...]
08)--------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS
__common_expr_1, CAST(annotated_data_finite.inc_col AS Int64) AS
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col,
annotated_data_finite.desc_col
09)----------------TableScan: annotated_data_finite projection=[ts, inc_col,
desc_col]
physical_plan
@@ -4112,7 +4112,7 @@ EXPLAIN select count(*) over (partition by a order by a)
from (select * from a w
----
logical_plan
01)Projection: count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [a.a] ORDER BY [a.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS count(*)
PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
03)----Filter: a.a = Int64(1)
04)------TableScan: a projection=[a]
physical_plan
diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs
b/datafusion/substrait/tests/cases/consumer_integration.rs
index 219f656bb4..086c085811 100644
--- a/datafusion/substrait/tests/cases/consumer_integration.rs
+++ b/datafusion/substrait/tests/cases/consumer_integration.rs
@@ -50,9 +50,9 @@ mod tests {
let plan_str = tpch_plan_to_string(1).await?;
assert_eq!(
plan_str,
- "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE,
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
+ "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE,
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(*) AS COUNT_ORDER\
\n Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST,
LINEITEM.L_LINESTATUS ASC NULLS LAST\
- \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG,
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY),
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY),
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\
+ \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG,
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY),
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY),
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(*)]]\
\n Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE *
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT),
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) -
LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX),
LINEITEM.L_DISCOUNT\
\n Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") -
IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\
\n TableScan: LINEITEM"
@@ -119,9 +119,9 @@ mod tests {
let plan_str = tpch_plan_to_string(4).await?;
assert_eq!(
plan_str,
- "Projection: ORDERS.O_ORDERPRIORITY, count(Int64(1)) AS
ORDER_COUNT\
+ "Projection: ORDERS.O_ORDERPRIORITY, count(*) AS ORDER_COUNT\
\n Sort: ORDERS.O_ORDERPRIORITY ASC NULLS LAST\
- \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]],
aggr=[[count(Int64(1))]]\
+ \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]],
aggr=[[count(*)]]\
\n Projection: ORDERS.O_ORDERPRIORITY\
\n Filter: ORDERS.O_ORDERDATE >= CAST(Utf8(\"1993-07-01\")
AS Date32) AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1993-10-01\") AS Date32) AND
EXISTS (<subquery>)\
\n Subquery:\
@@ -269,10 +269,10 @@ mod tests {
let plan_str = tpch_plan_to_string(13).await?;
assert_eq!(
plan_str,
- "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(Int64(1))
AS CUSTDIST\
- \n Sort: count(Int64(1)) DESC NULLS FIRST,
count(ORDERS.O_ORDERKEY) DESC NULLS FIRST\
- \n Projection: count(ORDERS.O_ORDERKEY), count(Int64(1))\
- \n Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]],
aggr=[[count(Int64(1))]]\
+ "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(*) AS
CUSTDIST\
+ \n Sort: count(*) DESC NULLS FIRST, count(ORDERS.O_ORDERKEY) DESC
NULLS FIRST\
+ \n Projection: count(ORDERS.O_ORDERKEY), count(*)\
+ \n Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]],
aggr=[[count(*)]]\
\n Projection: count(ORDERS.O_ORDERKEY)\
\n Aggregate: groupBy=[[CUSTOMER.C_CUSTKEY]],
aggr=[[count(ORDERS.O_ORDERKEY)]]\
\n Projection: CUSTOMER.C_CUSTKEY, ORDERS.O_ORDERKEY\
@@ -410,10 +410,10 @@ mod tests {
let plan_str = tpch_plan_to_string(21).await?;
assert_eq!(
plan_str,
- "Projection: SUPPLIER.S_NAME, count(Int64(1)) AS NUMWAIT\
+ "Projection: SUPPLIER.S_NAME, count(*) AS NUMWAIT\
\n Limit: skip=0, fetch=100\
- \n Sort: count(Int64(1)) DESC NULLS FIRST, SUPPLIER.S_NAME ASC
NULLS LAST\
- \n Aggregate: groupBy=[[SUPPLIER.S_NAME]],
aggr=[[count(Int64(1))]]\
+ \n Sort: count(*) DESC NULLS FIRST, SUPPLIER.S_NAME ASC NULLS
LAST\
+ \n Aggregate: groupBy=[[SUPPLIER.S_NAME]], aggr=[[count(*)]]\
\n Projection: SUPPLIER.S_NAME\
\n Filter: SUPPLIER.S_SUPPKEY = LINEITEM.L_SUPPKEY AND
ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY AND ORDERS.O_ORDERSTATUS = Utf8(\"F\")
AND LINEITEM.L_RECEIPTDATE > LINEITEM.L_COMMITDATE AND EXISTS (<subquery>) AND
NOT EXISTS (<subquery>) AND SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND
NATION.N_NAME = Utf8(\"SAUDI ARABIA\")\
\n Subquery:\
@@ -438,9 +438,9 @@ mod tests {
let plan_str = tpch_plan_to_string(22).await?;
assert_eq!(
plan_str,
- "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS
CNTRYCODE, count(Int64(1)) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\
+ "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS
CNTRYCODE, count(*) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\
\n Sort: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) ASC NULLS
LAST\
- \n Aggregate:
groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(Int64(1)),
sum(CUSTOMER.C_ACCTBAL)]]\
+ \n Aggregate:
groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(*),
sum(CUSTOMER.C_ACCTBAL)]]\
\n Projection: substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)),
CUSTOMER.C_ACCTBAL\
\n Filter: (substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"13\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"31\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"23\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"29\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"30\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) =
CAST(Utf8(\"18\") AS Utf8) OR [...]
\n Subquery:\
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index 5fb357dfcd..68856117a3 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -687,7 +687,7 @@ async fn simple_intersect() -> Result<()> {
// Substrait treats both count(*) and count(1) the same
assert_expected_plan(
"SELECT count(*) FROM (SELECT data.a FROM data INTERSECT SELECT
data2.a FROM data2);",
- "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\
+ "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n Projection: \
\n LeftSemi Join: data.a = data2.a\
\n Aggregate: groupBy=[[data.a]], aggr=[[]]\
@@ -822,7 +822,7 @@ async fn simple_intersect_table_reuse() -> Result<()> {
// Schema check works because we set aliases to what the Substrait
consumer will generate.
assert_expected_plan(
"SELECT count(1) FROM (SELECT left.a FROM data AS left INTERSECT
SELECT right.a FROM data AS right);",
- "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+ "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
\n Projection: \
\n LeftSemi Join: left.a = right.a\
\n SubqueryAlias: left\
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]