This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 839a61896 Add LogicalPlan::Distinct (#2792)
839a61896 is described below
commit 839a61896f0fc3f617a79da1dd4018b2aa6af283
Author: Mike Roberts <[email protected]>
AuthorDate: Tue Jun 28 22:38:42 2022 +0100
Add LogicalPlan::Distinct (#2792)
---
datafusion/core/src/logical_plan/plan.rs | 6 +-
datafusion/core/src/physical_plan/planner.rs | 19 +++++-
datafusion/expr/src/logical_plan/builder.rs | 71 +++++++++++++++++++---
datafusion/expr/src/logical_plan/mod.rs | 9 +--
datafusion/expr/src/logical_plan/plan.rs | 19 +++++-
datafusion/expr/src/utils.rs | 9 ++-
.../optimizer/src/common_subexpr_eliminate.rs | 1 +
datafusion/optimizer/src/projection_push_down.rs | 1 +
datafusion/proto/proto/datafusion.proto | 5 ++
datafusion/proto/src/logical_plan.rs | 24 +++++++-
datafusion/sql/src/planner.rs | 29 +++++----
11 files changed, 154 insertions(+), 39 deletions(-)
diff --git a/datafusion/core/src/logical_plan/plan.rs
b/datafusion/core/src/logical_plan/plan.rs
index 5c9e3d16e..c4cfcc3dc 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -22,9 +22,9 @@ pub use datafusion_expr::{
logical_plan::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation,
Explain,
- Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan,
- Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+ CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable,
EmptyRelation,
+ Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType,
Limit,
+ LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection,
Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 4a8ad5d77..0cb037620 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -26,8 +26,8 @@ use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
TableScan,
- Window,
+ Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort,
SubqueryAlias,
+ TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
@@ -59,7 +59,8 @@ use arrow::datatypes::DataType;
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::ScalarValue;
-use datafusion_expr::{expr::GroupingSet, utils::expr_to_columns};
+use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
use futures::future::BoxFuture;
@@ -616,6 +617,18 @@ impl DefaultPhysicalPlanner {
physical_input_schema.clone(),
)?) )
}
+ LogicalPlan::Distinct(Distinct {input}) => {
+ // Convert distinct to groupby with no aggregations
+ let group_expr = expand_wildcard(input.schema(), input)?;
+ let aggregate = LogicalPlan::Aggregate(Aggregate {
+ input: input.clone(),
+ group_expr,
+ aggr_expr: vec![],
+ schema: input.schema().clone()
+ }
+ );
+ Ok(self.create_initial_plan(&aggregate,
session_state).await?)
+ }
LogicalPlan::Projection(Projection { input, expr, .. }) => {
let input_exec = self.create_initial_plan(input,
session_state).await?;
let input_schema = input.as_ref().schema();
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 6692b18d1..fd1886e20 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -24,7 +24,7 @@ use crate::utils::{
use crate::{and, binary_expr, Operator};
use crate::{
logical_plan::{
- Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
+ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain,
Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection,
Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values,
Window,
@@ -42,7 +42,6 @@ use datafusion_common::{
};
use std::any::Any;
use std::convert::TryFrom;
-use std::iter;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
@@ -437,16 +436,27 @@ impl LogicalPlanBuilder {
/// Apply a union, removing duplicate rows
pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
- self.union(plan)?.distinct()
+ // unwrap top-level Distincts, to avoid duplication
+ let left_plan = self.plan.clone();
+ let left_plan: LogicalPlan = match left_plan {
+ LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
+ _ => left_plan,
+ };
+ let right_plan: LogicalPlan = match plan {
+ LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
+ _ => plan,
+ };
+
+ Ok(Self::from(LogicalPlan::Distinct(Distinct {
+ input: Arc::new(union_with_alias(left_plan, right_plan, None)?),
+ })))
}
/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(&self) -> Result<Self> {
- let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
- let plan = LogicalPlanBuilder::from(self.plan.clone())
- .aggregate(projection_expr, iter::empty::<Expr>())?
- .build()?;
- Self::from(plan).project(vec![Expr::Wildcard])
+ Ok(Self::from(LogicalPlan::Distinct(Distinct {
+ input: Arc::new(self.plan.clone()),
+ })))
}
/// Apply a join with on constraint.
@@ -1141,6 +1151,51 @@ mod tests {
Ok(())
}
+ #[test]
+ fn plan_builder_union_distinct_combined_single_union() -> Result<()> {
+ let plan =
+ table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3,
4]))?;
+
+ let plan = plan
+ .union_distinct(plan.build()?)?
+ .union_distinct(plan.build()?)?
+ .union_distinct(plan.build()?)?
+ .build()?;
+
+ // output has only one union
+ let expected = "\
+ Distinct:\
+ \n Union\
+ \n TableScan: employee_csv projection=[state, salary]\
+ \n TableScan: employee_csv projection=[state, salary]\
+ \n TableScan: employee_csv projection=[state, salary]\
+ \n TableScan: employee_csv projection=[state, salary]";
+
+ assert_eq!(expected, format!("{:?}", plan));
+
+ Ok(())
+ }
+
+ #[test]
+ fn plan_builder_simple_distinct() -> Result<()> {
+ let plan =
+ table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0,
3]))?
+ .filter(col("state").eq(lit("CO")))?
+ .project(vec![col("id")])?
+ .distinct()?
+ .build()?;
+
+ let expected = "\
+ Distinct:\
+ \n Projection: #employee_csv.id\
+ \n Filter: #employee_csv.state = Utf8(\"CO\")\
+ \n TableScan: employee_csv projection=[id, state]";
+
+ assert_eq!(expected, format!("{:?}", plan));
+
+ Ok(())
+ }
+
#[test]
fn exists_subquery() -> Result<()> {
let foo = test_table_scan_with_name("foo")?;
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index 25700ced1..ff0f59254 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,10 +23,11 @@ mod plan;
pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation,
Explain,
- Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan,
- Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan,
- Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values,
Window,
+ CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable,
EmptyRelation,
+ Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType,
Limit,
+ LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
Sort,
+ StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan,
Union,
+ Values, Window,
};
pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 002461972..5e7e0cfc1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -97,6 +97,8 @@ pub enum LogicalPlan {
Analyze(Analyze),
/// Extension operator defined outside of DataFusion
Extension(Extension),
+ /// Remove duplicate rows from the input
+ Distinct(Distinct),
}
impl LogicalPlan {
@@ -110,6 +112,7 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
+ LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
@@ -188,6 +191,7 @@ impl LogicalPlan {
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
+ LogicalPlan::Distinct(Distinct { input, .. }) =>
input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
}
@@ -250,7 +254,8 @@ impl LogicalPlan {
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
- | LogicalPlan::Union(_) => {
+ | LogicalPlan::Union(_)
+ | LogicalPlan::Distinct(_) => {
vec![]
}
}
@@ -273,6 +278,7 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) =>
vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) =>
inputs.iter().collect(),
+ LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
@@ -408,6 +414,7 @@ impl LogicalPlan {
}
true
}
+ LogicalPlan::Distinct(Distinct { input }) =>
input.accept(visitor)?,
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
subquery.accept(visitor)?
@@ -853,6 +860,9 @@ impl LogicalPlan {
}) => {
write!(f, "DropTable: {:?} if not exist:={}", name,
if_exists)
}
+ LogicalPlan::Distinct(Distinct { .. }) => {
+ write!(f, "Distinct:")
+ }
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
@@ -1171,6 +1181,13 @@ pub struct Limit {
pub input: Arc<LogicalPlan>,
}
+/// Removes duplicate rows from the input
+#[derive(Clone)]
+pub struct Distinct {
+ /// The logical plan that is being DISTINCT'd
+ pub input: Arc<LogicalPlan>,
+}
+
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
#[derive(Clone)]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 75180189a..707316fa4 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -20,9 +20,9 @@
use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
use crate::logical_plan::builder::build_join_schema;
use crate::logical_plan::{
- Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter,
Join, Limit,
- Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias,
Union, Values,
- Window,
+ Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension,
Filter, Join,
+ Limit, Partitioning, Projection, Repartition, Sort, Subquery,
SubqueryAlias, Union,
+ Values, Window,
};
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use arrow::datatypes::{DataType, TimeUnit};
@@ -477,6 +477,9 @@ pub fn from_plan(
alias: alias.clone(),
}))
}
+ LogicalPlan::Distinct(Distinct { .. }) =>
Ok(LogicalPlan::Distinct(Distinct {
+ input: Arc::new(inputs[0].clone()),
+ })),
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index c15efb2a3..da7e22640 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -231,6 +231,7 @@ fn optimize(
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
+ | LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
diff --git a/datafusion/optimizer/src/projection_push_down.rs
b/datafusion/optimizer/src/projection_push_down.rs
index 83dee9717..a08bf698e 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -503,6 +503,7 @@ fn optimize_plan(
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index e12760d9a..b3d31400c 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -68,6 +68,7 @@ message LogicalPlanNode {
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
+ DistinctNode distinct = 23;
}
}
@@ -232,6 +233,10 @@ message JoinNode {
LogicalExprNode filter = 8;
}
+message DistinctNode {
+ LogicalPlanNode input = 1;
+}
+
message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index 1850d57fd..3ea3d3f02 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -38,8 +38,9 @@ use datafusion_common::{Column, DataFusionError};
use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateView,
- CrossJoin, EmptyRelation, Extension, Filter, Join, JoinConstraint,
JoinType,
- Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan,
Values, Window,
+ CrossJoin, Distinct, EmptyRelation, Extension, Filter, Join,
JoinConstraint,
+ JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias,
TableScan, Values,
+ Window,
},
Expr, LogicalPlan, LogicalPlanBuilder,
};
@@ -668,6 +669,11 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec.try_decode(node, &input_plans, ctx)?;
Ok(LogicalPlan::Extension(extension_node))
}
+ LogicalPlanType::Distinct(distinct) => {
+ let input: LogicalPlan =
+ into_logical_plan!(distinct.input, ctx, extension_codec)?;
+ LogicalPlanBuilder::from(input).distinct()?.build()
+ }
}
}
@@ -823,6 +829,20 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
+ LogicalPlan::Distinct(Distinct { input }) => {
+ let input: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ input.as_ref(),
+ extension_codec,
+ )?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
+ protobuf::DistinctNode {
+ input: Some(Box::new(input)),
+ },
+ ))),
+ })
+ }
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index a20e3a013..9452d2168 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -1072,17 +1072,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
};
+ // final projection
+ let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?;
+
// process distinct clause
- let plan = if select.distinct {
- return LogicalPlanBuilder::from(plan)
- .aggregate(select_exprs_post_aggr, iter::empty::<Expr>())?
- .build();
+ if select.distinct {
+ LogicalPlanBuilder::from(plan).distinct()?.build()
} else {
- plan
- };
-
- // generate the final projection plan
- project_with_alias(plan, select_exprs_post_aggr, alias)
+ Ok(plan)
+ }
}
/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
@@ -3963,12 +3961,13 @@ mod tests {
#[test]
fn union() {
let sql = "SELECT order_id from orders UNION SELECT order_id FROM
orders";
- let expected = "Projection: #order_id\
- \n Aggregate: groupBy=[[#order_id]], aggr=[[]]\
- \n Union\n Projection: #orders.order_id\
- \n TableScan: orders\
- \n Projection: #orders.order_id\
- \n TableScan: orders";
+ let expected = "\
+ Distinct:\
+ \n Union\
+ \n Projection: #orders.order_id\
+ \n TableScan: orders\
+ \n Projection: #orders.order_id\
+ \n TableScan: orders";
quick_test(sql, expected);
}