This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 839a61896 Add LogicalPlan::Distinct (#2792)
839a61896 is described below

commit 839a61896f0fc3f617a79da1dd4018b2aa6af283
Author: Mike Roberts <[email protected]>
AuthorDate: Tue Jun 28 22:38:42 2022 +0100

    Add LogicalPlan::Distinct (#2792)
---
 datafusion/core/src/logical_plan/plan.rs           |  6 +-
 datafusion/core/src/physical_plan/planner.rs       | 19 +++++-
 datafusion/expr/src/logical_plan/builder.rs        | 71 +++++++++++++++++++---
 datafusion/expr/src/logical_plan/mod.rs            |  9 +--
 datafusion/expr/src/logical_plan/plan.rs           | 19 +++++-
 datafusion/expr/src/utils.rs                       |  9 ++-
 .../optimizer/src/common_subexpr_eliminate.rs      |  1 +
 datafusion/optimizer/src/projection_push_down.rs   |  1 +
 datafusion/proto/proto/datafusion.proto            |  5 ++
 datafusion/proto/src/logical_plan.rs               | 24 +++++++-
 datafusion/sql/src/planner.rs                      | 29 +++++----
 11 files changed, 154 insertions(+), 39 deletions(-)

diff --git a/datafusion/core/src/logical_plan/plan.rs 
b/datafusion/core/src/logical_plan/plan.rs
index 5c9e3d16e..c4cfcc3dc 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -22,9 +22,9 @@ pub use datafusion_expr::{
     logical_plan::{
         display::{GraphvizVisitor, IndentVisitor},
         Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-        CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, 
Explain,
-        Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, 
LogicalPlan,
-        Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+        CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, 
EmptyRelation,
+        Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, 
Limit,
+        LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, 
Repartition, Sort,
         StringifiedPlan, Subquery, SubqueryAlias, TableScan, 
ToStringifiedPlan, Union,
         UserDefinedLogicalNode, Values, Window,
     },
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 4a8ad5d77..0cb037620 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -26,8 +26,8 @@ use crate::datasource::source_as_provider;
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
 use crate::logical_plan::plan::{
-    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, 
TableScan,
-    Window,
+    Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, 
SubqueryAlias,
+    TableScan, Window,
 };
 use crate::logical_plan::{
     unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
@@ -59,7 +59,8 @@ use arrow::datatypes::DataType;
 use arrow::datatypes::{Schema, SchemaRef};
 use async_trait::async_trait;
 use datafusion_common::ScalarValue;
-use datafusion_expr::{expr::GroupingSet, utils::expr_to_columns};
+use datafusion_expr::expr::GroupingSet;
+use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
 use datafusion_physical_expr::expressions::Literal;
 use datafusion_sql::utils::window_expr_common_partition_keys;
 use futures::future::BoxFuture;
@@ -616,6 +617,18 @@ impl DefaultPhysicalPlanner {
                         physical_input_schema.clone(),
                     )?) )
                 }
+                LogicalPlan::Distinct(Distinct {input}) => {
+                    // Convert distinct to groupby with no aggregations
+                    let group_expr = expand_wildcard(input.schema(), input)?;
+                    let aggregate =  LogicalPlan::Aggregate(Aggregate {
+                            input: input.clone(),
+                            group_expr,
+                            aggr_expr: vec![],
+                            schema: input.schema().clone()
+                        }
+                    );
+                    Ok(self.create_initial_plan(&aggregate, 
session_state).await?)
+                }
                 LogicalPlan::Projection(Projection { input, expr, .. }) => {
                     let input_exec = self.create_initial_plan(input, 
session_state).await?;
                     let input_schema = input.as_ref().schema();
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 6692b18d1..fd1886e20 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -24,7 +24,7 @@ use crate::utils::{
 use crate::{and, binary_expr, Operator};
 use crate::{
     logical_plan::{
-        Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
+        Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, 
Filter, Join,
         JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, 
Projection,
         Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, 
Values,
         Window,
@@ -42,7 +42,6 @@ use datafusion_common::{
 };
 use std::any::Any;
 use std::convert::TryFrom;
-use std::iter;
 use std::{
     collections::{HashMap, HashSet},
     sync::Arc,
@@ -437,16 +436,27 @@ impl LogicalPlanBuilder {
 
     /// Apply a union, removing duplicate rows
     pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
-        self.union(plan)?.distinct()
+        // unwrap top-level Distincts, to avoid duplication
+        let left_plan = self.plan.clone();
+        let left_plan: LogicalPlan = match left_plan {
+            LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
+            _ => left_plan,
+        };
+        let right_plan: LogicalPlan = match plan {
+            LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
+            _ => plan,
+        };
+
+        Ok(Self::from(LogicalPlan::Distinct(Distinct {
+            input: Arc::new(union_with_alias(left_plan, right_plan, None)?),
+        })))
     }
 
     /// Apply deduplication: Only distinct (different) values are returned)
     pub fn distinct(&self) -> Result<Self> {
-        let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
-        let plan = LogicalPlanBuilder::from(self.plan.clone())
-            .aggregate(projection_expr, iter::empty::<Expr>())?
-            .build()?;
-        Self::from(plan).project(vec![Expr::Wildcard])
+        Ok(Self::from(LogicalPlan::Distinct(Distinct {
+            input: Arc::new(self.plan.clone()),
+        })))
     }
 
     /// Apply a join with on constraint.
@@ -1141,6 +1151,51 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn plan_builder_union_distinct_combined_single_union() -> Result<()> {
+        let plan =
+            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 
4]))?;
+
+        let plan = plan
+            .union_distinct(plan.build()?)?
+            .union_distinct(plan.build()?)?
+            .union_distinct(plan.build()?)?
+            .build()?;
+
+        // output has only one union
+        let expected = "\
+        Distinct:\
+        \n  Union\
+        \n    TableScan: employee_csv projection=[state, salary]\
+        \n    TableScan: employee_csv projection=[state, salary]\
+        \n    TableScan: employee_csv projection=[state, salary]\
+        \n    TableScan: employee_csv projection=[state, salary]";
+
+        assert_eq!(expected, format!("{:?}", plan));
+
+        Ok(())
+    }
+
+    #[test]
+    fn plan_builder_simple_distinct() -> Result<()> {
+        let plan =
+            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 
3]))?
+                .filter(col("state").eq(lit("CO")))?
+                .project(vec![col("id")])?
+                .distinct()?
+                .build()?;
+
+        let expected = "\
+        Distinct:\
+        \n  Projection: #employee_csv.id\
+        \n    Filter: #employee_csv.state = Utf8(\"CO\")\
+        \n      TableScan: employee_csv projection=[id, state]";
+
+        assert_eq!(expected, format!("{:?}", plan));
+
+        Ok(())
+    }
+
     #[test]
     fn exists_subquery() -> Result<()> {
         let foo = test_table_scan_with_name("foo")?;
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index 25700ced1..ff0f59254 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,10 +23,11 @@ mod plan;
 pub use builder::{table_scan, LogicalPlanBuilder};
 pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-    CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, 
Explain,
-    Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, 
LogicalPlan,
-    Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, 
StringifiedPlan,
-    Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, 
Window,
+    CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, 
EmptyRelation,
+    Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, 
Limit,
+    LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, 
Sort,
+    StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, 
Union,
+    Values, Window,
 };
 
 pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 002461972..5e7e0cfc1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -97,6 +97,8 @@ pub enum LogicalPlan {
     Analyze(Analyze),
     /// Extension operator defined outside of DataFusion
     Extension(Extension),
+    /// Remove duplicate rows from the input
+    Distinct(Distinct),
 }
 
 impl LogicalPlan {
@@ -110,6 +112,7 @@ impl LogicalPlan {
             }) => projected_schema,
             LogicalPlan::Projection(Projection { schema, .. }) => schema,
             LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
+            LogicalPlan::Distinct(Distinct { input }) => input.schema(),
             LogicalPlan::Window(Window { schema, .. }) => schema,
             LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
             LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
@@ -188,6 +191,7 @@ impl LogicalPlan {
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::CreateView(CreateView { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
+            LogicalPlan::Distinct(Distinct { input, .. }) => 
input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
         }
     }
@@ -250,7 +254,8 @@ impl LogicalPlan {
             | LogicalPlan::CrossJoin(_)
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
-            | LogicalPlan::Union(_) => {
+            | LogicalPlan::Union(_)
+            | LogicalPlan::Distinct(_) => {
                 vec![]
             }
         }
@@ -273,6 +278,7 @@ impl LogicalPlan {
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => 
vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
             LogicalPlan::Union(Union { inputs, .. }) => 
inputs.iter().collect(),
+            LogicalPlan::Distinct(Distinct { input }) => vec![input],
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
@@ -408,6 +414,7 @@ impl LogicalPlan {
                 }
                 true
             }
+            LogicalPlan::Distinct(Distinct { input }) => 
input.accept(visitor)?,
             LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
             LogicalPlan::Subquery(Subquery { subquery, .. }) => {
                 subquery.accept(visitor)?
@@ -853,6 +860,9 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "DropTable: {:?} if not exist:={}", name, 
if_exists)
                     }
+                    LogicalPlan::Distinct(Distinct { .. }) => {
+                        write!(f, "Distinct:")
+                    }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
                     LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
                     LogicalPlan::Union(_) => write!(f, "Union"),
@@ -1171,6 +1181,13 @@ pub struct Limit {
     pub input: Arc<LogicalPlan>,
 }
 
+/// Removes duplicate rows from the input
+#[derive(Clone)]
+pub struct Distinct {
+    /// The logical plan that is being DISTINCT'd
+    pub input: Arc<LogicalPlan>,
+}
+
 /// Aggregates its input based on a set of grouping and aggregate
 /// expressions (e.g. SUM).
 #[derive(Clone)]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 75180189a..707316fa4 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -20,9 +20,9 @@
 use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 use crate::logical_plan::builder::build_join_schema;
 use crate::logical_plan::{
-    Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter, 
Join, Limit,
-    Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, 
Union, Values,
-    Window,
+    Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension, 
Filter, Join,
+    Limit, Partitioning, Projection, Repartition, Sort, Subquery, 
SubqueryAlias, Union,
+    Values, Window,
 };
 use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
 use arrow::datatypes::{DataType, TimeUnit};
@@ -477,6 +477,9 @@ pub fn from_plan(
                 alias: alias.clone(),
             }))
         }
+        LogicalPlan::Distinct(Distinct { .. }) => 
Ok(LogicalPlan::Distinct(Distinct {
+            input: Arc::new(inputs[0].clone()),
+        })),
         LogicalPlan::Analyze(a) => {
             assert!(expr.is_empty());
             assert_eq!(inputs.len(), 1);
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index c15efb2a3..da7e22640 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -231,6 +231,7 @@ fn optimize(
         | LogicalPlan::CreateCatalogSchema(_)
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
+        | LogicalPlan::Distinct(_)
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
             let expr = plan.expressions();
diff --git a/datafusion/optimizer/src/projection_push_down.rs 
b/datafusion/optimizer/src/projection_push_down.rs
index 83dee9717..a08bf698e 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -503,6 +503,7 @@ fn optimize_plan(
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::CrossJoin(_)
+        | LogicalPlan::Distinct(_)
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
             // collect all required columns by this plan
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index e12760d9a..b3d31400c 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -68,6 +68,7 @@ message LogicalPlanNode {
     CreateCatalogNode create_catalog = 20;
     SubqueryAliasNode subquery_alias = 21;
     CreateViewNode create_view = 22;
+    DistinctNode distinct = 23;
   }
 }
 
@@ -232,6 +233,10 @@ message JoinNode {
   LogicalExprNode filter = 8;
 }
 
+message DistinctNode {
+  LogicalPlanNode input = 1;
+}
+
 message UnionNode {
   repeated LogicalPlanNode inputs = 1;
 }
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index 1850d57fd..3ea3d3f02 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -38,8 +38,9 @@ use datafusion_common::{Column, DataFusionError};
 use datafusion_expr::{
     logical_plan::{
         Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, 
CreateView,
-        CrossJoin, EmptyRelation, Extension, Filter, Join, JoinConstraint, 
JoinType,
-        Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, 
Values, Window,
+        CrossJoin, Distinct, EmptyRelation, Extension, Filter, Join, 
JoinConstraint,
+        JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias, 
TableScan, Values,
+        Window,
     },
     Expr, LogicalPlan, LogicalPlanBuilder,
 };
@@ -668,6 +669,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                     extension_codec.try_decode(node, &input_plans, ctx)?;
                 Ok(LogicalPlan::Extension(extension_node))
             }
+            LogicalPlanType::Distinct(distinct) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
+                LogicalPlanBuilder::from(input).distinct()?.build()
+            }
         }
     }
 
@@ -823,6 +829,20 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
+            LogicalPlan::Distinct(Distinct { input }) => {
+                let input: protobuf::LogicalPlanNode =
+                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                        input.as_ref(),
+                        extension_codec,
+                    )?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
+                        protobuf::DistinctNode {
+                            input: Some(Box::new(input)),
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Window(Window {
                 input, window_expr, ..
             }) => {
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index a20e3a013..9452d2168 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -1072,17 +1072,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
         };
 
+        // final projection
+        let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?;
+
         // process distinct clause
-        let plan = if select.distinct {
-            return LogicalPlanBuilder::from(plan)
-                .aggregate(select_exprs_post_aggr, iter::empty::<Expr>())?
-                .build();
+        if select.distinct {
+            LogicalPlanBuilder::from(plan).distinct()?.build()
         } else {
-            plan
-        };
-
-        // generate the final projection plan
-        project_with_alias(plan, select_exprs_post_aggr, alias)
+            Ok(plan)
+        }
     }
 
     /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
@@ -3963,12 +3961,13 @@ mod tests {
     #[test]
     fn union() {
         let sql = "SELECT order_id from orders UNION SELECT order_id FROM 
orders";
-        let expected = "Projection: #order_id\
-        \n  Aggregate: groupBy=[[#order_id]], aggr=[[]]\
-        \n    Union\n      Projection: #orders.order_id\
-        \n        TableScan: orders\
-        \n      Projection: #orders.order_id\
-        \n        TableScan: orders";
+        let expected = "\
+        Distinct:\
+        \n  Union\
+        \n    Projection: #orders.order_id\
+        \n      TableScan: orders\
+        \n    Projection: #orders.order_id\
+        \n      TableScan: orders";
         quick_test(sql, expected);
     }
 

Reply via email to