This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2185842be2 Implement `DISTINCT ON` from Postgres (#7981)
2185842be2 is described below
commit 2185842be22b695cf00e615db68b373f86fd162b
Author: Marko Grujic <[email protected]>
AuthorDate: Mon Nov 13 15:45:54 2023 +0100
Implement `DISTINCT ON` from Postgres (#7981)
* Initial DISTINT ON implementation
* Add a couple more tests
* Add comments in the replace_distinct_aggregate optimizer
* Run cargo fmt to fix CI
* Make DISTINCT ON planning more robust to support arbitrary selection
expressions
* Add DISTINCT ON + join SLT
* Handle no DISTINCT ON expressions and extend the docs for the
replace_distinct_aggregate optimizer
* Remove misleading DISTINCT ON SLT comment
* Add an EXPLAIN SLT for a basic DISTINCT ON query
* Revise comment in CommonSubexprEliminate::try_optimize_aggregate
* Implement qualified expression alias and extend test coverage
* Update datafusion/proto/proto/datafusion.proto
Co-authored-by: Jonah Gao <[email protected]>
* Accompanying generated changes to alias proto tag revision
* Remove obsolete comment
---------
Co-authored-by: Jonah Gao <[email protected]>
---
datafusion/expr/src/expr.rs | 33 +++-
datafusion/expr/src/expr_schema.rs | 7 +
datafusion/expr/src/logical_plan/builder.rs | 29 +++-
datafusion/expr/src/logical_plan/mod.rs | 8 +-
datafusion/expr/src/logical_plan/plan.rs | 163 +++++++++++++++++--
datafusion/expr/src/tree_node/expr.rs | 8 +-
datafusion/expr/src/utils.rs | 8 +-
.../optimizer/src/common_subexpr_eliminate.rs | 23 ++-
datafusion/optimizer/src/eliminate_nested_union.rs | 12 +-
datafusion/optimizer/src/optimizer.rs | 6 +-
datafusion/optimizer/src/push_down_projection.rs | 2 +-
.../optimizer/src/replace_distinct_aggregate.rs | 106 ++++++++++++-
datafusion/optimizer/src/test/mod.rs | 7 +-
datafusion/proto/proto/datafusion.proto | 9 ++
datafusion/proto/src/generated/pbjson.rs | 176 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 18 ++-
datafusion/proto/src/logical_plan/from_proto.rs | 5 +
datafusion/proto/src/logical_plan/mod.rs | 67 +++++++-
datafusion/proto/src/logical_plan/to_proto.rs | 10 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 26 +++
datafusion/proto/tests/cases/serialize.rs | 6 +
datafusion/sql/src/query.rs | 12 +-
datafusion/sql/src/select.rs | 83 ++++++----
datafusion/sqllogictest/test_files/distinct_on.slt | 146 +++++++++++++++++
datafusion/substrait/src/logical_plan/producer.rs | 8 +-
25 files changed, 879 insertions(+), 99 deletions(-)
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 4267f182bd..97e4fcc327 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -28,7 +28,7 @@ use crate::Operator;
use crate::{aggregate_function, ExprSchemable};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{internal_err, DFSchema};
+use datafusion_common::{internal_err, DFSchema, OwnedTableReference};
use datafusion_common::{plan_err, Column, DataFusionError, Result,
ScalarValue};
use std::collections::HashSet;
use std::fmt;
@@ -187,13 +187,20 @@ pub enum Expr {
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
+ pub relation: Option<OwnedTableReference>,
pub name: String,
}
impl Alias {
- pub fn new(expr: Expr, name: impl Into<String>) -> Self {
+ /// Create an alias with an optional schema/field qualifier.
+ pub fn new(
+ expr: Expr,
+ relation: Option<impl Into<OwnedTableReference>>,
+ name: impl Into<String>,
+ ) -> Self {
Self {
expr: Box::new(expr),
+ relation: relation.map(|r| r.into()),
name: name.into(),
}
}
@@ -844,7 +851,27 @@ impl Expr {
asc,
nulls_first,
}) => Expr::Sort(Sort::new(Box::new(expr.alias(name)), asc,
nulls_first)),
- _ => Expr::Alias(Alias::new(self, name.into())),
+ _ => Expr::Alias(Alias::new(self, None::<&str>, name.into())),
+ }
+ }
+
+ /// Return `self AS name` alias expression with a specific qualifier
+ pub fn alias_qualified(
+ self,
+ relation: Option<impl Into<OwnedTableReference>>,
+ name: impl Into<String>,
+ ) -> Expr {
+ match self {
+ Expr::Sort(Sort {
+ expr,
+ asc,
+ nulls_first,
+ }) => Expr::Sort(Sort::new(
+ Box::new(expr.alias_qualified(relation, name)),
+ asc,
+ nulls_first,
+ )),
+ _ => Expr::Alias(Alias::new(self, relation, name.into())),
}
}
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 2631708fb7..5881feece1 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -305,6 +305,13 @@ impl ExprSchemable for Expr {
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
+ Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
+ relation.clone(),
+ name,
+ self.get_type(input_schema)?,
+ self.nullable(input_schema)?,
+ )
+ .with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 4a30f4e223..c4ff9fe954 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -32,8 +32,8 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::logical_plan::{
- Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter,
Join,
- JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Prepare,
+ Aggregate, Analyze, CrossJoin, Distinct, DistinctOn, EmptyRelation,
Explain, Filter,
+ Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest,
Values,
Window,
};
@@ -551,16 +551,29 @@ impl LogicalPlanBuilder {
let left_plan: LogicalPlan = self.plan;
let right_plan: LogicalPlan = plan;
- Ok(Self::from(LogicalPlan::Distinct(Distinct {
- input: Arc::new(union(left_plan, right_plan)?),
- })))
+ Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
+ union(left_plan, right_plan)?,
+ )))))
}
/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
- Ok(Self::from(LogicalPlan::Distinct(Distinct {
- input: Arc::new(self.plan),
- })))
+ Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
+ self.plan,
+ )))))
+ }
+
+ /// Project first values of the specified expression list according to the
provided
+ /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
+ pub fn distinct_on(
+ self,
+ on_expr: Vec<Expr>,
+ select_expr: Vec<Expr>,
+ sort_expr: Option<Vec<Expr>>,
+ ) -> Result<Self> {
+ Ok(Self::from(LogicalPlan::Distinct(Distinct::On(
+ DistinctOn::try_new(on_expr, select_expr, sort_expr,
Arc::new(self.plan))?,
+ ))))
}
/// Apply a join to `right` using explicitly specified columns and an
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index 8316417138..51d78cd721 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -33,10 +33,10 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
- Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, EmptyRelation,
Explain,
- Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning,
- PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery,
- SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
+ Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn,
EmptyRelation,
+ Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan,
+ Partitioning, PlanType, Prepare, Projection, Repartition, Sort,
StringifiedPlan,
+ Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest,
Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion,
TransactionEnd,
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index d62ac89263..b7537dc02e 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -25,8 +25,8 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
-use crate::expr::{Alias, Exists, InSubquery, Placeholder};
-use crate::expr_rewriter::create_col_from_scalar_expr;
+use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
+use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
@@ -163,7 +163,8 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
- LogicalPlan::Distinct(Distinct { input }) => input.schema(),
+ LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
+ LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) =>
schema,
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
@@ -367,6 +368,16 @@ impl LogicalPlan {
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ ..
+ })) => on_expr
+ .iter()
+ .chain(select_expr.iter())
+ .chain(sort_expr.clone().unwrap_or(vec![]).iter())
+ .try_for_each(f),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
@@ -377,7 +388,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
- | LogicalPlan::Distinct(_)
+ | LogicalPlan::Distinct(Distinct::All(_))
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
@@ -405,7 +416,9 @@ impl LogicalPlan {
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
- LogicalPlan::Distinct(Distinct { input }) => vec![input],
+ LogicalPlan::Distinct(
+ Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
+ ) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
@@ -461,8 +474,11 @@ impl LogicalPlan {
Ok(Some(agg.group_expr.as_slice()[0].clone()))
}
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, ..
})) => {
+ Ok(Some(select_expr[0].clone()))
+ }
LogicalPlan::Filter(Filter { input, .. })
- | LogicalPlan::Distinct(Distinct { input, .. })
+ | LogicalPlan::Distinct(Distinct::All(input))
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
@@ -823,10 +839,29 @@ impl LogicalPlan {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
- LogicalPlan::Distinct(Distinct { .. }) => {
- Ok(LogicalPlan::Distinct(Distinct {
- input: Arc::new(inputs[0].clone()),
- }))
+ LogicalPlan::Distinct(distinct) => {
+ let distinct = match distinct {
+ Distinct::All(_) =>
Distinct::All(Arc::new(inputs[0].clone())),
+ Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ ..
+ }) => {
+ let sort_expr = expr.split_off(on_expr.len() +
select_expr.len());
+ let select_expr = expr.split_off(on_expr.len());
+ Distinct::On(DistinctOn::try_new(
+ expr,
+ select_expr,
+ if !sort_expr.is_empty() {
+ Some(sort_expr)
+ } else {
+ None
+ },
+ Arc::new(inputs[0].clone()),
+ )?)
+ }
+ };
+ Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
@@ -1064,7 +1099,9 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) =>
input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
- LogicalPlan::Distinct(Distinct { input }) => input.max_rows(),
+ LogicalPlan::Distinct(
+ Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
+ ) => input.max_rows(),
LogicalPlan::Values(v) => Some(v.values.len()),
LogicalPlan::Unnest(_) => None,
LogicalPlan::Ddl(_)
@@ -1667,9 +1704,21 @@ impl LogicalPlan {
LogicalPlan::Statement(statement) => {
write!(f, "{}", statement.display())
}
- LogicalPlan::Distinct(Distinct { .. }) => {
- write!(f, "Distinct:")
- }
+ LogicalPlan::Distinct(distinct) => match distinct {
+ Distinct::All(_) => write!(f, "Distinct:"),
+ Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ ..
+ }) => write!(
+ f,
+ "DistinctOn: on_expr=[[{}]], select_expr=[[{}]],
sort_expr=[[{}]]",
+ expr_vec_fmt!(on_expr),
+ expr_vec_fmt!(select_expr),
+ if let Some(sort_expr) = sort_expr {
expr_vec_fmt!(sort_expr) } else { "".to_string() },
+ ),
+ },
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
@@ -2132,9 +2181,93 @@ pub struct Limit {
/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct Distinct {
+pub enum Distinct {
+ /// Plain `DISTINCT` referencing all selection expressions
+ All(Arc<LogicalPlan>),
+ /// The `Postgres` addition, allowing separate control over DISTINCT'd and
selected columns
+ On(DistinctOn),
+}
+
+/// Removes duplicate rows from the input
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DistinctOn {
+ /// The `DISTINCT ON` clause expression list
+ pub on_expr: Vec<Expr>,
+ /// The selected projection expression list
+ pub select_expr: Vec<Expr>,
+ /// The `ORDER BY` clause, whose initial expressions must match those of
the `ON` clause when
+ /// present. Note that those matching expressions actually wrap the `ON`
expressions with
+ /// additional info pertaining to the sorting procedure (i.e. ASC/DESC,
and NULLS FIRST/LAST).
+ pub sort_expr: Option<Vec<Expr>>,
/// The logical plan that is being DISTINCT'd
pub input: Arc<LogicalPlan>,
+ /// The schema description of the DISTINCT ON output
+ pub schema: DFSchemaRef,
+}
+
+impl DistinctOn {
+ /// Create a new `DistinctOn` struct.
+ pub fn try_new(
+ on_expr: Vec<Expr>,
+ select_expr: Vec<Expr>,
+ sort_expr: Option<Vec<Expr>>,
+ input: Arc<LogicalPlan>,
+ ) -> Result<Self> {
+ if on_expr.is_empty() {
+ return plan_err!("No `ON` expressions provided");
+ }
+
+ let on_expr = normalize_cols(on_expr, input.as_ref())?;
+
+ let schema = DFSchema::new_with_metadata(
+ exprlist_to_fields(&select_expr, &input)?,
+ input.schema().metadata().clone(),
+ )?;
+
+ let mut distinct_on = DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr: None,
+ input,
+ schema: Arc::new(schema),
+ };
+
+ if let Some(sort_expr) = sort_expr {
+ distinct_on = distinct_on.with_sort_expr(sort_expr)?;
+ }
+
+ Ok(distinct_on)
+ }
+
+ /// Try to update `self` with a new sort expressions.
+ ///
+ /// Validates that the sort expressions are a super-set of the `ON`
expressions.
+ pub fn with_sort_expr(mut self, sort_expr: Vec<Expr>) -> Result<Self> {
+ let sort_expr = normalize_cols(sort_expr, self.input.as_ref())?;
+
+ // Check that the left-most sort expressions are the same as the `ON`
expressions.
+ let mut matched = true;
+ for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
+ match sort {
+ Expr::Sort(SortExpr { expr, .. }) => {
+ if on != &**expr {
+ matched = false;
+ break;
+ }
+ }
+ _ => return plan_err!("Not a sort expression: {sort}"),
+ }
+ }
+
+ if self.on_expr.len() > sort_expr.len() || !matched {
+ return plan_err!(
+ "SELECT DISTINCT ON expressions must match initial ORDER BY
expressions"
+ );
+ }
+
+ self.sort_expr = Some(sort_expr);
+ Ok(self)
+ }
}
/// Aggregates its input based on a set of grouping and aggregate
diff --git a/datafusion/expr/src/tree_node/expr.rs
b/datafusion/expr/src/tree_node/expr.rs
index d6c14b8622..6b86de37ba 100644
--- a/datafusion/expr/src/tree_node/expr.rs
+++ b/datafusion/expr/src/tree_node/expr.rs
@@ -157,9 +157,11 @@ impl TreeNode for Expr {
let mut transform = transform;
Ok(match self {
- Expr::Alias(Alias { expr, name, .. }) => {
- Expr::Alias(Alias::new(transform(*expr)?, name))
- }
+ Expr::Alias(Alias {
+ expr,
+ relation,
+ name,
+ }) => Expr::Alias(Alias::new(transform(*expr)?, relation, name)),
Expr::Column(_) => self,
Expr::OuterReferenceColumn(_, _) => self,
Expr::Exists { .. } => self,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index a462cdb346..8f13bf5f61 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -800,9 +800,11 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) ->
Expr {
match e {
Expr::Column(_) => e,
Expr::OuterReferenceColumn(_, _) => e,
- Expr::Alias(Alias { expr, name, .. }) => {
- columnize_expr(*expr, input_schema).alias(name)
- }
+ Expr::Alias(Alias {
+ expr,
+ relation,
+ name,
+ }) => columnize_expr(*expr, input_schema).alias_qualified(relation,
name),
Expr::Cast(Cast { expr, data_type }) => Expr::Cast(Cast {
expr: Box::new(columnize_expr(*expr, input_schema)),
data_type,
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 8025402cce..f5ad767c50 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -238,6 +238,14 @@ impl CommonSubexprEliminate {
let rewritten = pop_expr(&mut rewritten)?;
if affected_id.is_empty() {
+ // Alias aggregation expressions if they have changed
+ let new_aggr_expr = new_aggr_expr
+ .iter()
+ .zip(aggr_expr.iter())
+ .map(|(new_expr, old_expr)| {
+ new_expr.clone().alias_if_changed(old_expr.display_name()?)
+ })
+ .collect::<Result<Vec<Expr>>>()?;
// Since group_epxr changes, schema changes also. Use try_new
method.
Aggregate::try_new(Arc::new(new_input), new_group_expr,
new_aggr_expr)
.map(LogicalPlan::Aggregate)
@@ -367,7 +375,7 @@ impl OptimizerRule for CommonSubexprEliminate {
Ok(Some(build_recover_project_plan(
&original_schema,
optimized_plan,
- )))
+ )?))
}
plan => Ok(plan),
}
@@ -458,16 +466,19 @@ fn build_common_expr_project_plan(
/// the "intermediate" projection plan built in
[build_common_expr_project_plan].
///
/// This is for those plans who don't keep its own output schema like `Filter`
or `Sort`.
-fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) ->
LogicalPlan {
+fn build_recover_project_plan(
+ schema: &DFSchema,
+ input: LogicalPlan,
+) -> Result<LogicalPlan> {
let col_exprs = schema
.fields()
.iter()
.map(|field| Expr::Column(field.qualified_column()))
.collect();
- LogicalPlan::Projection(
- Projection::try_new(col_exprs, Arc::new(input))
- .expect("Cannot build projection plan from an invalid schema"),
- )
+ Ok(LogicalPlan::Projection(Projection::try_new(
+ col_exprs,
+ Arc::new(input),
+ )?))
}
fn extract_expressions(
diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs
b/datafusion/optimizer/src/eliminate_nested_union.rs
index 89bcc90bc0..5771ea2e19 100644
--- a/datafusion/optimizer/src/eliminate_nested_union.rs
+++ b/datafusion/optimizer/src/eliminate_nested_union.rs
@@ -52,7 +52,7 @@ impl OptimizerRule for EliminateNestedUnion {
schema: schema.clone(),
})))
}
- LogicalPlan::Distinct(Distinct { input: plan }) => match
plan.as_ref() {
+ LogicalPlan::Distinct(Distinct::All(plan)) => match plan.as_ref() {
LogicalPlan::Union(Union { inputs, schema }) => {
let inputs = inputs
.iter()
@@ -60,12 +60,12 @@ impl OptimizerRule for EliminateNestedUnion {
.flat_map(extract_plans_from_union)
.collect::<Vec<_>>();
- Ok(Some(LogicalPlan::Distinct(Distinct {
- input: Arc::new(LogicalPlan::Union(Union {
+ Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(
+ LogicalPlan::Union(Union {
inputs,
schema: schema.clone(),
- })),
- })))
+ }),
+ )))))
}
_ => Ok(None),
},
@@ -94,7 +94,7 @@ fn extract_plans_from_union(plan: &Arc<LogicalPlan>) ->
Vec<Arc<LogicalPlan>> {
fn extract_plan_from_distinct(plan: &Arc<LogicalPlan>) -> &Arc<LogicalPlan> {
match plan.as_ref() {
- LogicalPlan::Distinct(Distinct { input: plan }) => plan,
+ LogicalPlan::Distinct(Distinct::All(plan)) => plan,
_ => plan,
}
}
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 5231dc8698..e93565fef0 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -427,7 +427,7 @@ impl Optimizer {
/// Returns an error if plans have different schemas.
///
/// It ignores metadata and nullability.
-fn assert_schema_is_the_same(
+pub(crate) fn assert_schema_is_the_same(
rule_name: &str,
prev_plan: &LogicalPlan,
new_plan: &LogicalPlan,
@@ -438,7 +438,7 @@ fn assert_schema_is_the_same(
if !equivalent {
let e = DataFusionError::Internal(format!(
- "Failed due to generate a different schema, original schema: {:?},
new schema: {:?}",
+ "Failed due to a difference in schemas, original schema: {:?}, new
schema: {:?}",
prev_plan.schema(),
new_plan.schema()
));
@@ -503,7 +503,7 @@ mod tests {
let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Optimizer rule 'get table_scan rule' failed\ncaused by\nget
table_scan rule\ncaused by\n\
- Internal error: Failed due to generate a different schema, \
+ Internal error: Failed due to a difference in schemas, \
original schema: DFSchema { fields: [], metadata: {},
functional_dependencies: FunctionalDependencies { deps: [] } }, \
new schema: DFSchema { fields: [\
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field
{ name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} } }, \
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index b05d811cb4..2c314bf765 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -228,7 +228,7 @@ impl OptimizerRule for PushDownProjection {
// Gather all columns needed for expressions in this Aggregate
let mut new_aggr_expr = vec![];
for e in agg.aggr_expr.iter() {
- let column = Column::from_name(e.display_name()?);
+ let column = Column::from(e.display_name()?);
if required_columns.contains(&column) {
new_aggr_expr.push(e.clone());
}
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs
b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 540617b770..187e510e55 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -20,7 +20,11 @@ use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::utils::expand_wildcard;
-use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
+use datafusion_expr::{
+ aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
+ expr::AggregateFunction, LogicalPlanBuilder,
+};
+use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
///
@@ -32,6 +36,22 @@ use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
/// ```text
/// SELECT a, b FROM tab GROUP BY a, b
/// ```
+///
+/// On the other hand, for a `DISTINCT ON` query the replacement is
+/// a bit more involved and effectively converts
+/// ```text
+/// SELECT DISTINCT ON (a) b FROM tab ORDER BY a DESC, c
+/// ```
+///
+/// into
+/// ```text
+/// SELECT b FROM (
+/// SELECT a, FIRST_VALUE(b ORDER BY a DESC, c) AS b
+/// FROM tab
+/// GROUP BY a
+/// )
+/// ORDER BY a DESC
+/// ```
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
#[derive(Default)]
@@ -51,7 +71,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
- LogicalPlan::Distinct(Distinct { input }) => {
+ LogicalPlan::Distinct(Distinct::All(input)) => {
let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
@@ -60,6 +80,65 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
)?);
Ok(Some(aggregate))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ select_expr,
+ on_expr,
+ sort_expr,
+ input,
+ schema,
+ })) => {
+ // Construct the aggregation expression to be used to fetch
the selected expressions.
+ let aggr_expr = select_expr
+ .iter()
+ .map(|e| {
+ Expr::AggregateFunction(AggregateFunction::new(
+ AggregateFunctionFunc::FirstValue,
+ vec![e.clone()],
+ false,
+ None,
+ sort_expr.clone(),
+ ))
+ })
+ .collect::<Vec<Expr>>();
+
+ // Build the aggregation plan
+ let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+ .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+ .build()?;
+
+ let plan = if let Some(sort_expr) = sort_expr {
+ // While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
+ // this on it's own isn't enough to guarantee the proper
output order of the grouping
+ // (`ON`) expression, so we need to sort those as well.
+ LogicalPlanBuilder::from(plan)
+ .sort(sort_expr[..on_expr.len()].to_vec())?
+ .build()?
+ } else {
+ plan
+ };
+
+ // Whereas the aggregation plan by default outputs both the
grouping and the aggregation
+ // expressions, for `DISTINCT ON` we only need to emit the
original selection expressions.
+ let project_exprs = plan
+ .schema()
+ .fields()
+ .iter()
+ .skip(on_expr.len())
+ .zip(schema.fields().iter())
+ .map(|(new_field, old_field)| {
+ Ok(col(new_field.qualified_column()).alias_qualified(
+ old_field.qualifier().cloned(),
+ old_field.name(),
+ ))
+ })
+ .collect::<Result<Vec<Expr>>>()?;
+
+ let plan = LogicalPlanBuilder::from(plan)
+ .project(project_exprs)?
+ .build()?;
+
+ Ok(Some(plan))
+ }
_ => Ok(None),
}
}
@@ -98,4 +177,27 @@ mod tests {
expected,
)
}
+
+ #[test]
+ fn replace_distinct_on() -> datafusion_common::Result<()> {
+ let table_scan = test_table_scan().unwrap();
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .distinct_on(
+ vec![col("a")],
+ vec![col("b")],
+ Some(vec![col("a").sort(false, true), col("c").sort(true,
false)]),
+ )?
+ .build()?;
+
+ let expected = "Projection: FIRST_VALUE(test.b) ORDER BY [test.a DESC
NULLS FIRST, test.c ASC NULLS LAST] AS b\
+ \n Sort: test.a DESC NULLS FIRST\
+ \n Aggregate: groupBy=[[test.a]], aggr=[[FIRST_VALUE(test.b) ORDER
BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(
+ Arc::new(ReplaceDistinctWithAggregate::new()),
+ &plan,
+ expected,
+ )
+ }
}
diff --git a/datafusion/optimizer/src/test/mod.rs
b/datafusion/optimizer/src/test/mod.rs
index 3eac2317b8..917ddc565c 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::analyzer::{Analyzer, AnalyzerRule};
-use crate::optimizer::Optimizer;
+use crate::optimizer::{assert_schema_is_the_same, Optimizer};
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
@@ -155,7 +155,7 @@ pub fn assert_optimized_plan_eq(
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
- let optimizer = Optimizer::with_rules(vec![rule]);
+ let optimizer = Optimizer::with_rules(vec![rule.clone()]);
let optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
@@ -163,6 +163,9 @@ pub fn assert_optimized_plan_eq(
&OptimizerContext::new(),
)?
.unwrap_or_else(|| plan.clone());
+
+ // Ensure schemas always match after an optimization
+ assert_schema_is_the_same(rule.name(), plan, &optimized_plan)?;
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 9dcd55e731..62b226e333 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -73,6 +73,7 @@ message LogicalPlanNode {
CustomTableScanNode custom_scan = 25;
PrepareNode prepare = 26;
DropViewNode drop_view = 27;
+ DistinctOnNode distinct_on = 28;
}
}
@@ -308,6 +309,13 @@ message DistinctNode {
LogicalPlanNode input = 1;
}
+message DistinctOnNode {
+ repeated LogicalExprNode on_expr = 1;
+ repeated LogicalExprNode select_expr = 2;
+ repeated LogicalExprNode sort_expr = 3;
+ LogicalPlanNode input = 4;
+}
+
message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
@@ -485,6 +493,7 @@ message Not {
message AliasNode {
LogicalExprNode expr = 1;
string alias = 2;
+ repeated OwnedTableReference relation = 3;
}
message BinaryExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 948ad0c4ce..7602e1a366 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -967,6 +967,9 @@ impl serde::Serialize for AliasNode {
if !self.alias.is_empty() {
len += 1;
}
+ if !self.relation.is_empty() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.AliasNode", len)?;
if let Some(v) = self.expr.as_ref() {
struct_ser.serialize_field("expr", v)?;
@@ -974,6 +977,9 @@ impl serde::Serialize for AliasNode {
if !self.alias.is_empty() {
struct_ser.serialize_field("alias", &self.alias)?;
}
+ if !self.relation.is_empty() {
+ struct_ser.serialize_field("relation", &self.relation)?;
+ }
struct_ser.end()
}
}
@@ -986,12 +992,14 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
const FIELDS: &[&str] = &[
"expr",
"alias",
+ "relation",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Expr,
Alias,
+ Relation,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -1015,6 +1023,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
match value {
"expr" => Ok(GeneratedField::Expr),
"alias" => Ok(GeneratedField::Alias),
+ "relation" => Ok(GeneratedField::Relation),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -1036,6 +1045,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
{
let mut expr__ = None;
let mut alias__ = None;
+ let mut relation__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Expr => {
@@ -1050,11 +1060,18 @@ impl<'de> serde::Deserialize<'de> for AliasNode {
}
alias__ = Some(map_.next_value()?);
}
+ GeneratedField::Relation => {
+ if relation__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("relation"));
+ }
+ relation__ = Some(map_.next_value()?);
+ }
}
}
Ok(AliasNode {
expr: expr__,
alias: alias__.unwrap_or_default(),
+ relation: relation__.unwrap_or_default(),
})
}
}
@@ -6070,6 +6087,151 @@ impl<'de> serde::Deserialize<'de> for DistinctNode {
deserializer.deserialize_struct("datafusion.DistinctNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for DistinctOnNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.on_expr.is_empty() {
+ len += 1;
+ }
+ if !self.select_expr.is_empty() {
+ len += 1;
+ }
+ if !self.sort_expr.is_empty() {
+ len += 1;
+ }
+ if self.input.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.DistinctOnNode", len)?;
+ if !self.on_expr.is_empty() {
+ struct_ser.serialize_field("onExpr", &self.on_expr)?;
+ }
+ if !self.select_expr.is_empty() {
+ struct_ser.serialize_field("selectExpr", &self.select_expr)?;
+ }
+ if !self.sort_expr.is_empty() {
+ struct_ser.serialize_field("sortExpr", &self.sort_expr)?;
+ }
+ if let Some(v) = self.input.as_ref() {
+ struct_ser.serialize_field("input", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for DistinctOnNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "on_expr",
+ "onExpr",
+ "select_expr",
+ "selectExpr",
+ "sort_expr",
+ "sortExpr",
+ "input",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ OnExpr,
+ SelectExpr,
+ SortExpr,
+ Input,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "onExpr" | "on_expr" => Ok(GeneratedField::OnExpr),
+ "selectExpr" | "select_expr" =>
Ok(GeneratedField::SelectExpr),
+ "sortExpr" | "sort_expr" =>
Ok(GeneratedField::SortExpr),
+ "input" => Ok(GeneratedField::Input),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = DistinctOnNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.DistinctOnNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<DistinctOnNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut on_expr__ = None;
+ let mut select_expr__ = None;
+ let mut sort_expr__ = None;
+ let mut input__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::OnExpr => {
+ if on_expr__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("onExpr"));
+ }
+ on_expr__ = Some(map_.next_value()?);
+ }
+ GeneratedField::SelectExpr => {
+ if select_expr__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("selectExpr"));
+ }
+ select_expr__ = Some(map_.next_value()?);
+ }
+ GeneratedField::SortExpr => {
+ if sort_expr__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("sortExpr"));
+ }
+ sort_expr__ = Some(map_.next_value()?);
+ }
+ GeneratedField::Input => {
+ if input__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("input"));
+ }
+ input__ = map_.next_value()?;
+ }
+ }
+ }
+ Ok(DistinctOnNode {
+ on_expr: on_expr__.unwrap_or_default(),
+ select_expr: select_expr__.unwrap_or_default(),
+ sort_expr: sort_expr__.unwrap_or_default(),
+ input: input__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.DistinctOnNode", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for DropViewNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -13146,6 +13308,9 @@ impl serde::Serialize for LogicalPlanNode {
logical_plan_node::LogicalPlanType::DropView(v) => {
struct_ser.serialize_field("dropView", v)?;
}
+ logical_plan_node::LogicalPlanType::DistinctOn(v) => {
+ struct_ser.serialize_field("distinctOn", v)?;
+ }
}
}
struct_ser.end()
@@ -13195,6 +13360,8 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"prepare",
"drop_view",
"dropView",
+ "distinct_on",
+ "distinctOn",
];
#[allow(clippy::enum_variant_names)]
@@ -13225,6 +13392,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
CustomScan,
Prepare,
DropView,
+ DistinctOn,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -13272,6 +13440,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
"customScan" | "custom_scan" =>
Ok(GeneratedField::CustomScan),
"prepare" => Ok(GeneratedField::Prepare),
"dropView" | "drop_view" =>
Ok(GeneratedField::DropView),
+ "distinctOn" | "distinct_on" =>
Ok(GeneratedField::DistinctOn),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -13474,6 +13643,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode {
return
Err(serde::de::Error::duplicate_field("dropView"));
}
logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::DropView)
+;
+ }
+ GeneratedField::DistinctOn => {
+ if logical_plan_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("distinctOn"));
+ }
+ logical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::DistinctOn)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 93b0a05c31..825481a188 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -38,7 +38,7 @@ pub struct DfSchema {
pub struct LogicalPlanNode {
#[prost(
oneof = "logical_plan_node::LogicalPlanType",
- tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27"
+ tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28"
)]
pub logical_plan_type:
::core::option::Option<logical_plan_node::LogicalPlanType>,
}
@@ -99,6 +99,8 @@ pub mod logical_plan_node {
Prepare(::prost::alloc::boxed::Box<super::PrepareNode>),
#[prost(message, tag = "27")]
DropView(super::DropViewNode),
+ #[prost(message, tag = "28")]
+ DistinctOn(::prost::alloc::boxed::Box<super::DistinctOnNode>),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -483,6 +485,18 @@ pub struct DistinctNode {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct DistinctOnNode {
+ #[prost(message, repeated, tag = "1")]
+ pub on_expr: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ #[prost(message, repeated, tag = "2")]
+ pub select_expr: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ #[prost(message, repeated, tag = "3")]
+ pub sort_expr: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ #[prost(message, optional, boxed, tag = "4")]
+ pub input:
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnionNode {
#[prost(message, repeated, tag = "1")]
pub inputs: ::prost::alloc::vec::Vec<LogicalPlanNode>,
@@ -754,6 +768,8 @@ pub struct AliasNode {
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
#[prost(string, tag = "2")]
pub alias: ::prost::alloc::string::String,
+ #[prost(message, repeated, tag = "3")]
+ pub relation: ::prost::alloc::vec::Vec<OwnedTableReference>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index b2b66693f7..674492edef 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -1151,6 +1151,11 @@ pub fn parse_expr(
}
ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
parse_required_expr(alias.expr.as_deref(), registry, "expr")?,
+ alias
+ .relation
+ .first()
+ .map(|r| OwnedTableReference::try_from(r.clone()))
+ .transpose()?,
alias.alias.clone(),
))),
ExprType::IsNullExpr(is_null) =>
Ok(Expr::IsNull(Box::new(parse_required_expr(
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index e426c59852..851f062bd5 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -55,7 +55,7 @@ use datafusion_expr::{
EmptyRelation, Extension, Join, JoinConstraint, Limit, Prepare,
Projection,
Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
},
- DropView, Expr, LogicalPlan, LogicalPlanBuilder,
+ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder,
};
use prost::bytes::BufMut;
@@ -734,6 +734,33 @@ impl AsLogicalPlan for LogicalPlanNode {
into_logical_plan!(distinct.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).distinct()?.build()
}
+ LogicalPlanType::DistinctOn(distinct_on) => {
+ let input: LogicalPlan =
+ into_logical_plan!(distinct_on.input, ctx,
extension_codec)?;
+ let on_expr = distinct_on
+ .on_expr
+ .iter()
+ .map(|expr| from_proto::parse_expr(expr, ctx))
+ .collect::<Result<Vec<Expr>, _>>()?;
+ let select_expr = distinct_on
+ .select_expr
+ .iter()
+ .map(|expr| from_proto::parse_expr(expr, ctx))
+ .collect::<Result<Vec<Expr>, _>>()?;
+ let sort_expr = match distinct_on.sort_expr.len() {
+ 0 => None,
+ _ => Some(
+ distinct_on
+ .sort_expr
+ .iter()
+ .map(|expr| from_proto::parse_expr(expr, ctx))
+ .collect::<Result<Vec<Expr>, _>>()?,
+ ),
+ };
+ LogicalPlanBuilder::from(input)
+ .distinct_on(on_expr, select_expr, sort_expr)?
+ .build()
+ }
LogicalPlanType::ViewScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
@@ -1005,7 +1032,7 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
- LogicalPlan::Distinct(Distinct { input }) => {
+ LogicalPlan::Distinct(Distinct::All(input)) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
@@ -1019,6 +1046,42 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ input,
+ ..
+ })) => {
+ let input: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ input.as_ref(),
+ extension_codec,
+ )?;
+ let sort_expr = match sort_expr {
+ None => vec![],
+ Some(sort_expr) => sort_expr
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ };
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type:
Some(LogicalPlanType::DistinctOn(Box::new(
+ protobuf::DistinctOnNode {
+ on_expr: on_expr
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ select_expr: select_expr
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ sort_expr,
+ input: Some(Box::new(input)),
+ },
+ ))),
+ })
+ }
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index e590731f58..946f2c6964 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -476,9 +476,17 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::Column(c) => Self {
expr_type: Some(ExprType::Column(c.into())),
},
- Expr::Alias(Alias { expr, name, .. }) => {
+ Expr::Alias(Alias {
+ expr,
+ relation,
+ name,
+ }) => {
let alias = Box::new(protobuf::AliasNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
+ relation: relation
+ .to_owned()
+ .map(|r| vec![r.into()])
+ .unwrap_or(vec![]),
alias: name.to_owned(),
});
Self {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 97c553dc04..cc76e8a19e 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -300,6 +300,32 @@ async fn roundtrip_logical_plan_aggregation() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn roundtrip_logical_plan_distinct_on() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new("b", DataType::Decimal128(15, 2), true),
+ ]);
+
+ ctx.register_csv(
+ "t1",
+ "tests/testdata/test.csv",
+ CsvReadOptions::default().schema(&schema),
+ )
+ .await?;
+
+ let query = "SELECT DISTINCT ON (a % 2) a, b * 2 FROM t1 ORDER BY a % 2
DESC, b";
+ let plan = ctx.sql(query).await?.into_optimized_plan()?;
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+
+ Ok(())
+}
+
#[tokio::test]
async fn roundtrip_single_count_distinct() -> Result<()> {
let ctx = SessionContext::new();
diff --git a/datafusion/proto/tests/cases/serialize.rs
b/datafusion/proto/tests/cases/serialize.rs
index f32c815279..5b890accd8 100644
--- a/datafusion/proto/tests/cases/serialize.rs
+++ b/datafusion/proto/tests/cases/serialize.rs
@@ -128,6 +128,12 @@ fn exact_roundtrip_linearized_binary_expr() {
}
}
+#[test]
+fn roundtrip_qualified_alias() {
+ let qual_alias = col("c1").alias_qualified(Some("my_table"), "my_column");
+ assert_eq!(qual_alias, roundtrip_expr(&qual_alias));
+}
+
#[test]
fn roundtrip_deeply_nested_binary_expr() {
// We need more stack space so this doesn't overflow in dev builds
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index fc2a3fb9a5..832e2da9c6 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -23,7 +23,7 @@ use datafusion_common::{
not_impl_err, plan_err, sql_err, Constraints, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr::{
- CreateMemoryTable, DdlStatement, Expr, LogicalPlan, LogicalPlanBuilder,
+ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan,
LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value,
@@ -161,6 +161,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(),
planner_context)?;
- LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
+
+ if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
+ // In case of `DISTINCT ON` we must capture the sort expressions
since during the plan
+ // optimization we're effectively doing a `first_value`
aggregation according to them.
+ let distinct_on =
distinct_on.clone().with_sort_expr(order_by_rex)?;
+ Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
+ } else {
+ LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
+ }
}
}
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index e9a7941ab0..31333affe0 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -76,7 +76,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
// process `where` clause
- let plan = self.plan_selection(select.selection, plan,
planner_context)?;
+ let base_plan = self.plan_selection(select.selection, plan,
planner_context)?;
// handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
@@ -84,16 +84,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
- &plan,
+ &base_plan,
select.projection,
empty_from,
planner_context,
)?;
// having and group by clause may reference aliases defined in select
projection
- let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
+ let projected_plan = self.project(base_plan.clone(),
select_exprs.clone())?;
let mut combined_schema = (**projected_plan.schema()).clone();
- combined_schema.merge(plan.schema());
+ combined_schema.merge(base_plan.schema());
// this alias map is resolved and looked up in both having exprs and
group by exprs
let alias_map = extract_aliases(&select_exprs);
@@ -148,7 +148,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;
// aliases from the projection can conflict with
same-named expressions in the input
let mut alias_map = alias_map.clone();
- for f in plan.schema().fields() {
+ for f in base_plan.schema().fields() {
alias_map.remove(f.name());
}
let group_by_expr =
@@ -158,7 +158,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.unwrap_or(group_by_expr);
let group_by_expr = normalize_col(group_by_expr,
&projected_plan)?;
self.validate_schema_satisfies_exprs(
- plan.schema(),
+ base_plan.schema(),
&[group_by_expr.clone()],
)?;
Ok(group_by_expr)
@@ -171,7 +171,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.filter(|select_expr| match select_expr {
Expr::AggregateFunction(_) | Expr::AggregateUDF(_) =>
false,
- Expr::Alias(Alias { expr, name: _ }) => !matches!(
+ Expr::Alias(Alias { expr, name: _, .. }) => !matches!(
**expr,
Expr::AggregateFunction(_) | Expr::AggregateUDF(_)
),
@@ -187,16 +187,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
|| !aggr_exprs.is_empty()
{
self.aggregate(
- plan,
+ &base_plan,
&select_exprs,
having_expr_opt.as_ref(),
- group_by_exprs,
- aggr_exprs,
+ &group_by_exprs,
+ &aggr_exprs,
)?
} else {
match having_expr_opt {
Some(having_expr) => return plan_err!("HAVING clause
references: {having_expr} must appear in the GROUP BY clause or be used in an
aggregate function"),
- None => (plan, select_exprs, having_expr_opt)
+ None => (base_plan.clone(), select_exprs.clone(),
having_expr_opt)
}
};
@@ -229,19 +229,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = project(plan, select_exprs_post_aggr)?;
// process distinct clause
- let distinct = select
- .distinct
- .map(|distinct| match distinct {
- Distinct::Distinct => Ok(true),
- Distinct::On(_) => not_impl_err!("DISTINCT ON Exprs not
supported"),
- })
- .transpose()?
- .unwrap_or(false);
+ let plan = match select.distinct {
+ None => Ok(plan),
+ Some(Distinct::Distinct) => {
+ LogicalPlanBuilder::from(plan).distinct()?.build()
+ }
+ Some(Distinct::On(on_expr)) => {
+ if !aggr_exprs.is_empty()
+ || !group_by_exprs.is_empty()
+ || !window_func_exprs.is_empty()
+ {
+ return not_impl_err!("DISTINCT ON expressions with GROUP
BY, aggregation or window functions are not supported ");
+ }
- let plan = if distinct {
- LogicalPlanBuilder::from(plan).distinct()?.build()
- } else {
- Ok(plan)
+ let on_expr = on_expr
+ .into_iter()
+ .map(|e| {
+ self.sql_expr_to_logical_expr(
+ e.clone(),
+ plan.schema(),
+ planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Build the final plan
+ return LogicalPlanBuilder::from(base_plan)
+ .distinct_on(on_expr, select_exprs, None)?
+ .build();
+ }
}?;
// DISTRIBUTE BY
@@ -471,6 +487,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.clone();
*expr = Expr::Alias(Alias {
expr: Box::new(new_expr),
+ relation: None,
name: name.clone(),
});
}
@@ -511,18 +528,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// the aggregate
fn aggregate(
&self,
- input: LogicalPlan,
+ input: &LogicalPlan,
select_exprs: &[Expr],
having_expr_opt: Option<&Expr>,
- group_by_exprs: Vec<Expr>,
- aggr_exprs: Vec<Expr>,
+ group_by_exprs: &[Expr],
+ aggr_exprs: &[Expr],
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
let group_by_exprs =
- get_updated_group_by_exprs(&group_by_exprs, select_exprs,
input.schema())?;
+ get_updated_group_by_exprs(group_by_exprs, select_exprs,
input.schema())?;
// create the aggregate plan
let plan = LogicalPlanBuilder::from(input.clone())
- .aggregate(group_by_exprs.clone(), aggr_exprs.clone())?
+ .aggregate(group_by_exprs.clone(), aggr_exprs.to_vec())?
.build()?;
// in this next section of code we are re-writing the projection to
refer to columns
@@ -549,25 +566,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
_ => aggr_projection_exprs.push(expr.clone()),
}
}
- aggr_projection_exprs.extend_from_slice(&aggr_exprs);
+ aggr_projection_exprs.extend_from_slice(aggr_exprs);
// now attempt to resolve columns and replace with fully-qualified
columns
let aggr_projection_exprs = aggr_projection_exprs
.iter()
- .map(|expr| resolve_columns(expr, &input))
+ .map(|expr| resolve_columns(expr, input))
.collect::<Result<Vec<Expr>>>()?;
// next we replace any expressions that are not a column with a column
referencing
// an output column from the aggregate schema
let column_exprs_post_aggr = aggr_projection_exprs
.iter()
- .map(|expr| expr_as_column_expr(expr, &input))
+ .map(|expr| expr_as_column_expr(expr, input))
.collect::<Result<Vec<Expr>>>()?;
// next we re-write the projection
let select_exprs_post_aggr = select_exprs
.iter()
- .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input))
+ .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
.collect::<Result<Vec<Expr>>>()?;
// finally, we have some validation that the re-written projection can
be resolved
@@ -582,7 +599,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// aggregation.
let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt
{
let having_expr_post_aggr =
- rebase_expr(having_expr, &aggr_projection_exprs, &input)?;
+ rebase_expr(having_expr, &aggr_projection_exprs, input)?;
check_columns_satisfy_exprs(
&column_exprs_post_aggr,
diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt
b/datafusion/sqllogictest/test_files/distinct_on.slt
new file mode 100644
index 0000000000..8a36b49b98
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/distinct_on.slt
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# Basic example: distinct on the first column project the second one, and
+# order by the third
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3;
+----
+a 5
+b 4
+c 2
+d 1
+e 3
+
+# Basic example + reverse order of the selected column
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3 DESC;
+----
+a 1
+b 5
+c 4
+d 1
+e 1
+
+# Basic example + reverse order of the ON column
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1 DESC, c3;
+----
+e 3
+d 1
+c 2
+b 4
+a 4
+
+# Basic example + reverse order of both columns + limit
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1 DESC, c3
DESC LIMIT 3;
+----
+e 1
+d 1
+c 4
+
+# Basic example + omit ON column from selection
+query I
+SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
+----
+5
+4
+2
+1
+3
+
+# Test explain makes sense
+query TT
+EXPLAIN SELECT DISTINCT ON (c1) c3, c2 FROM aggregate_test_100 ORDER BY c1, c3;
+----
+logical_plan
+Projection: FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1
ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] AS c3,
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c3 ASC NULLS LAST] AS c2
+--Sort: aggregate_test_100.c1 ASC NULLS LAST
+----Aggregate: groupBy=[[aggregate_test_100.c1]],
aggr=[[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST],
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c3 ASC NULLS LAST]]]
+------TableScan: aggregate_test_100 projection=[c1, c2, c3]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1
as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
+--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
+----SortExec: expr=[c1@0 ASC NULLS LAST]
+------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)],
ordering_mode=Sorted
+--------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
+----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3], has_header=true
+
+# ON expressions are not a sub-set of the ORDER BY expressions
+query error SELECT DISTINCT ON expressions must match initial ORDER BY
expressions
+SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 ORDER BY
c2, c3;
+
+# ON expressions are empty
+query error DataFusion error: Error during planning: No `ON` expressions
provided
+SELECT DISTINCT ON () c1, c2 FROM aggregate_test_100 ORDER BY c1, c2;
+
+# Use expressions in the ON and ORDER BY clauses, as well as the selection
+query II
+SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 ORDER BY
c2 % 2 = 0, c3 DESC;
+----
+1 25
+4 23
+
+# Multiple complex expressions
+query TIB
+SELECT DISTINCT ON (chr(ascii(c1) + 3), c2 % 2) chr(ascii(upper(c1)) + 3), c2
% 2, c3 > 80 AND c2 % 2 = 1
+FROM aggregate_test_100
+WHERE c1 IN ('a', 'b')
+ORDER BY chr(ascii(c1) + 3), c2 % 2, c3 DESC;
+----
+D 0 false
+D 1 true
+E 0 false
+E 1 false
+
+# Joins using CTEs
+query II
+WITH t1 AS (SELECT * FROM aggregate_test_100),
+t2 AS (SELECT * FROM aggregate_test_100)
+SELECT DISTINCT ON (t1.c1, t2.c2) t2.c3, t1.c4
+FROM t1 INNER JOIN t2 ON t1.c13 = t2.c13
+ORDER BY t1.c1, t2.c2, t2.c5
+LIMIT 3;
+----
+-25 15295
+45 15673
+-72 -11122
diff --git a/datafusion/substrait/src/logical_plan/producer.rs
b/datafusion/substrait/src/logical_plan/producer.rs
index 6fe8eca337..9356a77534 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
-use datafusion::logical_expr::{Like, WindowFrameUnits};
+use datafusion::logical_expr::{Distinct, Like, WindowFrameUnits};
use datafusion::{
arrow::datatypes::{DataType, TimeUnit},
error::{DataFusionError, Result},
@@ -244,11 +244,11 @@ pub fn to_substrait_rel(
}))),
}))
}
- LogicalPlan::Distinct(distinct) => {
+ LogicalPlan::Distinct(Distinct::All(plan)) => {
// Use Substrait's AggregateRel with empty measures to represent
`select distinct`
- let input = to_substrait_rel(distinct.input.as_ref(), ctx,
extension_info)?;
+ let input = to_substrait_rel(plan.as_ref(), ctx, extension_info)?;
// Get grouping keys from the input relation's number of output
fields
- let grouping = (0..distinct.input.schema().fields().len())
+ let grouping = (0..plan.schema().fields().len())
.map(substrait_field_ref)
.collect::<Result<Vec<_>>>()?;