This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a7b113c455 Support `AS`, `UNION`, `INTERSECTION`, `EXCEPT`, 
`AGGREGATE` pipe operators (#17312)
a7b113c455 is described below

commit a7b113c45509aae34595b6a62469b3173cac91bd
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Fri Oct 3 03:53:18 2025 +0200

    Support `AS`, `UNION`, `INTERSECTION`, `EXCEPT`, `AGGREGATE` pipe operators 
(#17312)
    
    * support WHERE pipe operator
    
    * support order by
    
    * support limit
    
    * select pipe
    
    * extend support
    
    * document supported pipe operators in user guide
    
    * fmt
    
    * fix where pipe before extend
    
    * support AS
    
    * support union
    
    * support intersection
    
    * support except
    
    * support aggregate
    
    * revert diff from main
    
    * simplify using mut
    
    * remove useless comments
    
    * remove dummy data
    
    * move docs to select.md
    
    * simplify using alias_if_changed
    
    * deduplicate fn
    
    * add aggregate toc
    
    * revert parquet testing
---
 datafusion/sql/src/query.rs                        | 113 ++++++++++++++++++++-
 .../sqllogictest/test_files/pipe_operator.slt      |  88 ++++++++++++++++
 docs/source/user-guide/sql/select.md               |  91 +++++++++++++++++
 3 files changed, 290 insertions(+), 2 deletions(-)

diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 9f8b483b8f..633d933eb8 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -28,8 +28,9 @@ use datafusion_expr::{
     CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, 
LogicalPlanBuilder,
 };
 use sqlparser::ast::{
-    Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, 
OrderByExpr,
-    OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
+    Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, 
OffsetRows,
+    OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, 
SetExpr,
+    SetOperator, SetQuantifier, TableAlias,
 };
 use sqlparser::tokenizer::Span;
 
@@ -146,11 +147,80 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                         .collect();
                 self.project(plan, all_exprs)
             }
+            PipeOperator::As { alias } => self.apply_table_alias(
+                plan,
+                TableAlias {
+                    name: alias,
+                    // Apply to all fields
+                    columns: vec![],
+                },
+            ),
+            PipeOperator::Union {
+                set_quantifier,
+                queries,
+            } => self.pipe_operator_set(
+                plan,
+                SetOperator::Union,
+                set_quantifier,
+                queries,
+                planner_context,
+            ),
+            PipeOperator::Intersect {
+                set_quantifier,
+                queries,
+            } => self.pipe_operator_set(
+                plan,
+                SetOperator::Intersect,
+                set_quantifier,
+                queries,
+                planner_context,
+            ),
+            PipeOperator::Except {
+                set_quantifier,
+                queries,
+            } => self.pipe_operator_set(
+                plan,
+                SetOperator::Except,
+                set_quantifier,
+                queries,
+                planner_context,
+            ),
+            PipeOperator::Aggregate {
+                full_table_exprs,
+                group_by_expr,
+            } => self.pipe_operator_aggregate(
+                plan,
+                full_table_exprs,
+                group_by_expr,
+                planner_context,
+            ),
 
             x => not_impl_err!("`{x}` pipe operator is not supported yet"),
         }
     }
 
+    /// Handle Union/Intersect/Except pipe operators
+    fn pipe_operator_set(
+        &self,
+        mut plan: LogicalPlan,
+        set_operator: SetOperator,
+        set_quantifier: SetQuantifier,
+        queries: Vec<Query>,
+        planner_context: &mut PlannerContext,
+    ) -> Result<LogicalPlan> {
+        for query in queries {
+            let right_plan = self.query_to_plan(query, planner_context)?;
+            plan = self.set_operation_to_plan(
+                set_operator,
+                plan,
+                right_plan,
+                set_quantifier,
+            )?;
+        }
+
+        Ok(plan)
+    }
+
     /// Wrap a plan in a limit
     fn limit(
         &self,
@@ -227,6 +297,45 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         }
     }
 
+    /// Handle AGGREGATE pipe operator
+    fn pipe_operator_aggregate(
+        &self,
+        plan: LogicalPlan,
+        full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
+        group_by_expr: Vec<ExprWithAliasAndOrderBy>,
+        planner_context: &mut PlannerContext,
+    ) -> Result<LogicalPlan> {
+        let plan_schema = plan.schema();
+        let process_expr =
+            |expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
+             planner_context: &mut PlannerContext| {
+                let expr_with_alias = expr_with_alias_and_order_by.expr;
+                let sql_expr = expr_with_alias.expr;
+                let alias = expr_with_alias.alias;
+
+                let df_expr = self.sql_to_expr(sql_expr, plan_schema, 
planner_context)?;
+
+                match alias {
+                    Some(alias_ident) => 
df_expr.alias_if_changed(alias_ident.value),
+                    None => Ok(df_expr),
+                }
+            };
+
+        let aggr_exprs: Vec<Expr> = full_table_exprs
+            .into_iter()
+            .map(|e| process_expr(e, planner_context))
+            .collect::<Result<Vec<_>>>()?;
+
+        let group_by_exprs: Vec<Expr> = group_by_expr
+            .into_iter()
+            .map(|e| process_expr(e, planner_context))
+            .collect::<Result<Vec<_>>>()?;
+
+        LogicalPlanBuilder::from(plan)
+            .aggregate(group_by_exprs, aggr_exprs)?
+            .build()
+    }
+
     /// Wrap the logical plan in a `SelectInto`
     fn select_into(
         &self,
diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt 
b/datafusion/sqllogictest/test_files/pipe_operator.slt
index 6b92df9431..57d1fc0642 100644
--- a/datafusion/sqllogictest/test_files/pipe_operator.slt
+++ b/datafusion/sqllogictest/test_files/pipe_operator.slt
@@ -89,3 +89,91 @@ FROM test
 |> EXTEND a + b AS a_plus_b
 ----
 1 1.1 2.1
+
+# AS pipe
+query I
+SELECT *
+FROM test
+|> as test_pipe
+|> select test_pipe.a
+----
+1
+2
+3
+
+# UNION pipe
+query I
+SELECT *
+FROM test
+|> select a
+|> UNION ALL (
+  SELECT a FROM test
+);
+----
+1
+2
+3
+1
+2
+3
+
+# INTERSECT pipe
+query I rowsort
+SELECT * FROM range(0,3)
+|> INTERSECT DISTINCT
+    (SELECT * FROM range(1,3));
+----
+1
+2
+
+# EXCEPT pipe
+query I rowsort
+select * from range(0,10)
+|> EXCEPT DISTINCT (select * from range(5,10));
+----
+0
+1
+2
+3
+4
+
+# AGGREGATE pipe
+query II
+(
+  SELECT 'apples' AS item, 2 AS sales
+  UNION ALL
+  SELECT 'bananas' AS item, 5 AS sales
+  UNION ALL
+  SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales;
+----
+3 14
+
+query TII rowsort
+(
+  SELECT 'apples' AS item, 2 AS sales
+  UNION ALL
+  SELECT 'bananas' AS item, 5 AS sales
+  UNION ALL
+  SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
+   GROUP BY item;
+----
+apples 2 9
+bananas 1 5
+
+query TII rowsort
+(
+  SELECT 'apples' AS item, 2 AS sales
+  UNION ALL
+  SELECT 'bananas' AS item, 5 AS sales
+  UNION ALL
+  SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
+   GROUP BY item
+|> WHERE num_items > 1;
+----
+apples 2 9
diff --git a/docs/source/user-guide/sql/select.md 
b/docs/source/user-guide/sql/select.md
index 87e940245b..8c1bc401d3 100644
--- a/docs/source/user-guide/sql/select.md
+++ b/docs/source/user-guide/sql/select.md
@@ -345,6 +345,11 @@ DataFusion currently supports the following pipe operators:
 - [LIMIT](#pipe_limit)
 - [SELECT](#pipe_select)
 - [EXTEND](#pipe_extend)
+- [AS](#pipe_as)
+- [UNION](#pipe_union)
+- [INTERSECT](#pipe_intersect)
+- [EXCEPT](#pipe_except)
+- [AGGREGATE](#pipe_aggregate)
 
 (pipe_where)=
 
@@ -423,3 +428,89 @@ select * from range(0,3)
 | 2     | -2          |
 +-------+-------------+
 ```
+
+(pipe_as)=
+
+### AS
+
+```sql
+select * from range(0,3)
+|> as my_range
+|> SELECT my_range.value;
++-------+
+| value |
++-------+
+| 0     |
+| 1     |
+| 2     |
++-------+
+```
+
+(pipe_union)=
+
+### UNION
+
+```sql
+select * from range(0,3)
+|> union all (
+  select * from range(3,6)
+);
++-------+
+| value |
++-------+
+| 0     |
+| 1     |
+| 2     |
+| 3     |
+| 4     |
+| 5     |
++-------+
+```
+
+(pipe_intersect)=
+
+### INTERSECT
+
+```sql
+select * from range(0,100)
+|> INTERSECT DISTINCT (
+  select 3
+);
++-------+
+| value |
++-------+
+| 3     |
++-------+
+```
+
+(pipe_except)=
+
+### EXCEPT
+
+```sql
+select * from range(0,10)
+|> EXCEPT DISTINCT (select * from range(5,10));
++-------+
+| value |
++-------+
+| 0     |
+| 1     |
+| 2     |
+| 3     |
+| 4     |
++-------+
+```
+
+(pipe_aggregate)=
+
+### AGGREGATE
+
+```sql
+select * from range(0,3)
+|> aggregate sum(value) AS total;
++-------+
+| total |
++-------+
+| 3     |
++-------+
+```


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to