This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a7b113c455 Support `AS`, `UNION`, `INTERSECTION`, `EXCEPT`,
`AGGREGATE` pipe operators (#17312)
a7b113c455 is described below
commit a7b113c45509aae34595b6a62469b3173cac91bd
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Fri Oct 3 03:53:18 2025 +0200
Support `AS`, `UNION`, `INTERSECTION`, `EXCEPT`, `AGGREGATE` pipe operators
(#17312)
* support WHERE pipe operator
* support order by
* support limit
* select pipe
* extend support
* document supported pipe operators in user guide
* fmt
* fix where pipe before extend
* support AS
* support union
* support intersection
* support except
* support aggregate
* revert diff from main
* simplify using mut
* remove useless comments
* remove dummy data
* move docs to select.md
* simplify using alias_if_changed
* deduplicate fn
* add aggregate toc
* revert parquet testing
---
datafusion/sql/src/query.rs | 113 ++++++++++++++++++++-
.../sqllogictest/test_files/pipe_operator.slt | 88 ++++++++++++++++
docs/source/user-guide/sql/select.md | 91 +++++++++++++++++
3 files changed, 290 insertions(+), 2 deletions(-)
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 9f8b483b8f..633d933eb8 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -28,8 +28,9 @@ use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan,
LogicalPlanBuilder,
};
use sqlparser::ast::{
- Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy,
OrderByExpr,
- OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
+ Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset,
OffsetRows,
+ OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto,
SetExpr,
+ SetOperator, SetQuantifier, TableAlias,
};
use sqlparser::tokenizer::Span;
@@ -146,11 +147,80 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.collect();
self.project(plan, all_exprs)
}
+ PipeOperator::As { alias } => self.apply_table_alias(
+ plan,
+ TableAlias {
+ name: alias,
+ // Apply to all fields
+ columns: vec![],
+ },
+ ),
+ PipeOperator::Union {
+ set_quantifier,
+ queries,
+ } => self.pipe_operator_set(
+ plan,
+ SetOperator::Union,
+ set_quantifier,
+ queries,
+ planner_context,
+ ),
+ PipeOperator::Intersect {
+ set_quantifier,
+ queries,
+ } => self.pipe_operator_set(
+ plan,
+ SetOperator::Intersect,
+ set_quantifier,
+ queries,
+ planner_context,
+ ),
+ PipeOperator::Except {
+ set_quantifier,
+ queries,
+ } => self.pipe_operator_set(
+ plan,
+ SetOperator::Except,
+ set_quantifier,
+ queries,
+ planner_context,
+ ),
+ PipeOperator::Aggregate {
+ full_table_exprs,
+ group_by_expr,
+ } => self.pipe_operator_aggregate(
+ plan,
+ full_table_exprs,
+ group_by_expr,
+ planner_context,
+ ),
x => not_impl_err!("`{x}` pipe operator is not supported yet"),
}
}
+ /// Handle Union/Intersect/Except pipe operators
+ fn pipe_operator_set(
+ &self,
+ mut plan: LogicalPlan,
+ set_operator: SetOperator,
+ set_quantifier: SetQuantifier,
+ queries: Vec<Query>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<LogicalPlan> {
+ for query in queries {
+ let right_plan = self.query_to_plan(query, planner_context)?;
+ plan = self.set_operation_to_plan(
+ set_operator,
+ plan,
+ right_plan,
+ set_quantifier,
+ )?;
+ }
+
+ Ok(plan)
+ }
+
/// Wrap a plan in a limit
fn limit(
&self,
@@ -227,6 +297,45 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}
+ /// Handle AGGREGATE pipe operator
+ fn pipe_operator_aggregate(
+ &self,
+ plan: LogicalPlan,
+ full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
+ group_by_expr: Vec<ExprWithAliasAndOrderBy>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<LogicalPlan> {
+ let plan_schema = plan.schema();
+ let process_expr =
+ |expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
+ planner_context: &mut PlannerContext| {
+ let expr_with_alias = expr_with_alias_and_order_by.expr;
+ let sql_expr = expr_with_alias.expr;
+ let alias = expr_with_alias.alias;
+
+ let df_expr = self.sql_to_expr(sql_expr, plan_schema,
planner_context)?;
+
+ match alias {
+ Some(alias_ident) =>
df_expr.alias_if_changed(alias_ident.value),
+ None => Ok(df_expr),
+ }
+ };
+
+ let aggr_exprs: Vec<Expr> = full_table_exprs
+ .into_iter()
+ .map(|e| process_expr(e, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+
+ let group_by_exprs: Vec<Expr> = group_by_expr
+ .into_iter()
+ .map(|e| process_expr(e, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+
+ LogicalPlanBuilder::from(plan)
+ .aggregate(group_by_exprs, aggr_exprs)?
+ .build()
+ }
+
/// Wrap the logical plan in a `SelectInto`
fn select_into(
&self,
diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt
b/datafusion/sqllogictest/test_files/pipe_operator.slt
index 6b92df9431..57d1fc0642 100644
--- a/datafusion/sqllogictest/test_files/pipe_operator.slt
+++ b/datafusion/sqllogictest/test_files/pipe_operator.slt
@@ -89,3 +89,91 @@ FROM test
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1
+
+# AS pipe
+query I
+SELECT *
+FROM test
+|> as test_pipe
+|> select test_pipe.a
+----
+1
+2
+3
+
+# UNION pipe
+query I
+SELECT *
+FROM test
+|> select a
+|> UNION ALL (
+ SELECT a FROM test
+);
+----
+1
+2
+3
+1
+2
+3
+
+# INTERSECT pipe
+query I rowsort
+SELECT * FROM range(0,3)
+|> INTERSECT DISTINCT
+ (SELECT * FROM range(1,3));
+----
+1
+2
+
+# EXCEPT pipe
+query I rowsort
+select * from range(0,10)
+|> EXCEPT DISTINCT (select * from range(5,10));
+----
+0
+1
+2
+3
+4
+
+# AGGREGATE pipe
+query II
+(
+ SELECT 'apples' AS item, 2 AS sales
+ UNION ALL
+ SELECT 'bananas' AS item, 5 AS sales
+ UNION ALL
+ SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales;
+----
+3 14
+
+query TII rowsort
+(
+ SELECT 'apples' AS item, 2 AS sales
+ UNION ALL
+ SELECT 'bananas' AS item, 5 AS sales
+ UNION ALL
+ SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
+ GROUP BY item;
+----
+apples 2 9
+bananas 1 5
+
+query TII rowsort
+(
+ SELECT 'apples' AS item, 2 AS sales
+ UNION ALL
+ SELECT 'bananas' AS item, 5 AS sales
+ UNION ALL
+ SELECT 'apples' AS item, 7 AS sales
+)
+|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
+ GROUP BY item
+|> WHERE num_items > 1;
+----
+apples 2 9
diff --git a/docs/source/user-guide/sql/select.md
b/docs/source/user-guide/sql/select.md
index 87e940245b..8c1bc401d3 100644
--- a/docs/source/user-guide/sql/select.md
+++ b/docs/source/user-guide/sql/select.md
@@ -345,6 +345,11 @@ DataFusion currently supports the following pipe operators:
- [LIMIT](#pipe_limit)
- [SELECT](#pipe_select)
- [EXTEND](#pipe_extend)
+- [AS](#pipe_as)
+- [UNION](#pipe_union)
+- [INTERSECT](#pipe_intersect)
+- [EXCEPT](#pipe_except)
+- [AGGREGATE](#pipe_aggregate)
(pipe_where)=
@@ -423,3 +428,89 @@ select * from range(0,3)
| 2 | -2 |
+-------+-------------+
```
+
+(pipe_as)=
+
+### AS
+
+```sql
+select * from range(0,3)
+|> as my_range
+|> SELECT my_range.value;
++-------+
+| value |
++-------+
+| 0 |
+| 1 |
+| 2 |
++-------+
+```
+
+(pipe_union)=
+
+### UNION
+
+```sql
+select * from range(0,3)
+|> union all (
+ select * from range(3,6)
+);
++-------+
+| value |
++-------+
+| 0 |
+| 1 |
+| 2 |
+| 3 |
+| 4 |
+| 5 |
++-------+
+```
+
+(pipe_intersect)=
+
+### INTERSECT
+
+```sql
+select * from range(0,100)
+|> INTERSECT DISTINCT (
+ select 3
+);
++-------+
+| value |
++-------+
+| 3 |
++-------+
+```
+
+(pipe_except)=
+
+### EXCEPT
+
+```sql
+select * from range(0,10)
+|> EXCEPT DISTINCT (select * from range(5,10));
++-------+
+| value |
++-------+
+| 0 |
+| 1 |
+| 2 |
+| 3 |
+| 4 |
++-------+
+```
+
+(pipe_aggregate)=
+
+### AGGREGATE
+
+```sql
+select * from range(0,3)
+|> aggregate sum(value) AS total;
++-------+
+| total |
++-------+
+| 3 |
++-------+
+```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]