This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1ba1e539b0 Unparse `SubqueryAlias` without projections to SQL (#12896)
1ba1e539b0 is described below
commit 1ba1e539b01bbfc7f9001423cfe1ff0015a99db7
Author: Jax Liu <[email protected]>
AuthorDate: Fri Oct 18 01:21:59 2024 +0800
Unparse `SubqueryAlias` without projections to SQL (#12896)
* change pub function comment to doc
* unparse subquery alias without projections
* fix tests
* rollback the empty line
* rollback the empty line
* exclude the table_scan with pushdown case
* fmt and clippy
* simplify the ast to string and remove unused debug code
---
datafusion/sql/src/unparser/plan.rs | 64 ++++++++-------
datafusion/sql/src/unparser/rewrite.rs | 93 +++++++++++-----------
datafusion/sql/tests/cases/plan_to_sql.rs | 124 ++++++++++++++++++++++++------
3 files changed, 184 insertions(+), 97 deletions(-)
diff --git a/datafusion/sql/src/unparser/plan.rs
b/datafusion/sql/src/unparser/plan.rs
index d150f0e532..9b4818b98c 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -15,19 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use crate::unparser::utils::unproject_agg_exprs;
-use datafusion_common::{
- internal_err, not_impl_err,
- tree_node::{TransformedResult, TreeNode},
- Column, DataFusionError, Result, TableReference,
-};
-use datafusion_expr::{
- expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
- LogicalPlanBuilder, Projection, SortExpr,
-};
-use sqlparser::ast::{self, Ident, SetExpr};
-use std::sync::Arc;
-
use super::{
ast::{
BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
@@ -44,6 +31,18 @@ use super::{
},
Unparser,
};
+use crate::unparser::utils::unproject_agg_exprs;
+use datafusion_common::{
+ internal_err, not_impl_err,
+ tree_node::{TransformedResult, TreeNode},
+ Column, DataFusionError, Result, TableReference,
+};
+use datafusion_expr::{
+ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
+ LogicalPlanBuilder, Projection, SortExpr, TableScan,
+};
+use sqlparser::ast::{self, Ident, SetExpr};
+use std::sync::Arc;
/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
///
@@ -249,12 +248,9 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
- if scan.projection.is_some()
- || !scan.filters.is_empty()
- || scan.fetch.is_some()
+ if let Some(unparsed_table_scan) =
+ Self::unparse_table_scan_pushdown(plan, None)?
{
- let unparsed_table_scan =
- Self::unparse_table_scan_pushdown(plan, None)?;
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
@@ -498,10 +494,18 @@ impl Unparser<'_> {
LogicalPlan::SubqueryAlias(plan_alias) => {
let (plan, mut columns) =
subquery_alias_inner_query_and_columns(plan_alias);
- let plan = Self::unparse_table_scan_pushdown(
+ let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
)?;
+ // if the child plan is a TableScan with pushdown operations,
we don't need to
+ // create an additional subquery for it
+ if !select.already_projected() &&
unparsed_table_scan.is_none() {
+ select.projection(vec![ast::SelectItem::Wildcard(
+ ast::WildcardAdditionalOptions::default(),
+ )]);
+ }
+ let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
if !columns.is_empty()
&& !self.dialect.supports_column_alias_in_table_alias()
{
@@ -582,12 +586,21 @@ impl Unparser<'_> {
}
}
+ fn is_scan_with_pushdown(scan: &TableScan) -> bool {
+ scan.projection.is_some() || !scan.filters.is_empty() ||
scan.fetch.is_some()
+ }
+
+ /// Try to unparse a table scan with pushdown operations into a new
subquery plan.
+ /// If the table scan is without any pushdown operations, return None.
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
- ) -> Result<LogicalPlan> {
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
+ if !Self::is_scan_with_pushdown(table_scan) {
+ return Ok(None);
+ }
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: table_scan.source.schema(),
@@ -648,18 +661,15 @@ impl Unparser<'_> {
builder = builder.limit(0, Some(fetch))?;
}
- builder.build()
+ Ok(Some(builder.build()?))
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
- let new_plan = Self::unparse_table_scan_pushdown(
+ Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
- )?;
- LogicalPlanBuilder::from(new_plan)
- .alias(subquery_alias.alias.clone())?
- .build()
+ )
}
- _ => Ok(plan.clone()),
+ _ => Ok(None),
}
}
diff --git a/datafusion/sql/src/unparser/rewrite.rs
b/datafusion/sql/src/unparser/rewrite.rs
index 304a02f037..3049df9396 100644
--- a/datafusion/sql/src/unparser/rewrite.rs
+++ b/datafusion/sql/src/unparser/rewrite.rs
@@ -101,25 +101,25 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) ->
Result<Vec<SortExpr>> {
Ok(sort_exprs)
}
-// Rewrite logic plan for query that order by columns are not in projections
-// Plan before rewrite:
-//
-// Projection: j1.j1_string, j2.j2_string
-// Sort: j1.j1_id DESC NULLS FIRST, j2.j2_id DESC NULLS FIRST
-// Projection: j1.j1_string, j2.j2_string, j1.j1_id, j2.j2_id
-// Inner Join: Filter: j1.j1_id = j2.j2_id
-// TableScan: j1
-// TableScan: j2
-//
-// Plan after rewrite
-//
-// Sort: j1.j1_id DESC NULLS FIRST, j2.j2_id DESC NULLS FIRST
-// Projection: j1.j1_string, j2.j2_string
-// Inner Join: Filter: j1.j1_id = j2.j2_id
-// TableScan: j1
-// TableScan: j2
-//
-// This prevents the original plan generate query with derived table but
missing alias.
+/// Rewrite logic plan for query that order by columns are not in projections
+/// Plan before rewrite:
+///
+/// Projection: j1.j1_string, j2.j2_string
+/// Sort: j1.j1_id DESC NULLS FIRST, j2.j2_id DESC NULLS FIRST
+/// Projection: j1.j1_string, j2.j2_string, j1.j1_id, j2.j2_id
+/// Inner Join: Filter: j1.j1_id = j2.j2_id
+/// TableScan: j1
+/// TableScan: j2
+///
+/// Plan after rewrite
+///
+/// Sort: j1.j1_id DESC NULLS FIRST, j2.j2_id DESC NULLS FIRST
+/// Projection: j1.j1_string, j2.j2_string
+/// Inner Join: Filter: j1.j1_id = j2.j2_id
+/// TableScan: j1
+/// TableScan: j2
+///
+/// This prevents the original plan generate query with derived table but
missing alias.
pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
p: &Projection,
) -> Option<LogicalPlan> {
@@ -191,33 +191,33 @@ pub(super) fn
rewrite_plan_for_sort_on_non_projected_fields(
}
}
-// This logic is to work out the columns and inner query for SubqueryAlias
plan for both types of
-// subquery
-// - `(SELECT column_a as a from table) AS A`
-// - `(SELECT column_a from table) AS A (a)`
-//
-// A roundtrip example for table alias with columns
-//
-// query: SELECT id FROM (SELECT j1_id from j1) AS c (id)
-//
-// LogicPlan:
-// Projection: c.id
-// SubqueryAlias: c
-// Projection: j1.j1_id AS id
-// Projection: j1.j1_id
-// TableScan: j1
-//
-// Before introducing this logic, the unparsed query would be `SELECT c.id
FROM (SELECT j1.j1_id AS
-// id FROM (SELECT j1.j1_id FROM j1)) AS c`.
-// The query is invalid as `j1.j1_id` is not a valid identifier in the derived
table
-// `(SELECT j1.j1_id FROM j1)`
-//
-// With this logic, the unparsed query will be:
-// `SELECT c.id FROM (SELECT j1.j1_id FROM j1) AS c (id)`
-//
-// Caveat: this won't handle the case like `select * from (select 1, 2) AS a
(b, c)`
-// as the parser gives a wrong plan which has mismatch `Int(1)` types: Literal
and
-// Column in the Projections. Once the parser side is fixed, this logic should
work
+/// This logic is to work out the columns and inner query for SubqueryAlias
plan for both types of
+/// subquery
+/// - `(SELECT column_a as a from table) AS A`
+/// - `(SELECT column_a from table) AS A (a)`
+///
+/// A roundtrip example for table alias with columns
+///
+/// query: SELECT id FROM (SELECT j1_id from j1) AS c (id)
+///
+/// LogicPlan:
+/// Projection: c.id
+/// SubqueryAlias: c
+/// Projection: j1.j1_id AS id
+/// Projection: j1.j1_id
+/// TableScan: j1
+///
+/// Before introducing this logic, the unparsed query would be `SELECT c.id
FROM (SELECT j1.j1_id AS
+/// id FROM (SELECT j1.j1_id FROM j1)) AS c`.
+/// The query is invalid as `j1.j1_id` is not a valid identifier in the
derived table
+/// `(SELECT j1.j1_id FROM j1)`
+///
+/// With this logic, the unparsed query will be:
+/// `SELECT c.id FROM (SELECT j1.j1_id FROM j1) AS c (id)`
+///
+/// Caveat: this won't handle the case like `select * from (select 1, 2) AS a
(b, c)`
+/// as the parser gives a wrong plan which has mismatch `Int(1)` types:
Literal and
+/// Column in the Projections. Once the parser side is fixed, this logic
should work
pub(super) fn subquery_alias_inner_query_and_columns(
subquery_alias: &datafusion_expr::SubqueryAlias,
) -> (&LogicalPlan, Vec<Ident>) {
@@ -330,6 +330,7 @@ fn find_projection(logical_plan: &LogicalPlan) ->
Option<&Projection> {
_ => None,
}
}
+
/// A `TreeNodeRewriter` implementation that rewrites `Expr::Column`
expressions by
/// replacing the column's name with an alias if the column exists in the
provided schema.
///
diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs
b/datafusion/sql/tests/cases/plan_to_sql.rs
index aff9f99c8c..e4e5d6a929 100644
--- a/datafusion/sql/tests/cases/plan_to_sql.rs
+++ b/datafusion/sql/tests/cases/plan_to_sql.rs
@@ -71,7 +71,7 @@ fn roundtrip_expr() {
let ast = expr_to_sql(&expr)?;
- Ok(format!("{}", ast))
+ Ok(ast.to_string())
};
for (table, query, expected) in tests {
@@ -192,7 +192,7 @@ fn roundtrip_statement() -> Result<()> {
let roundtrip_statement = plan_to_sql(&plan)?;
- let actual = format!("{}", &roundtrip_statement);
+ let actual = &roundtrip_statement.to_string();
println!("roundtrip sql: {actual}");
println!("plan {}", plan.display_indent());
@@ -224,7 +224,7 @@ fn roundtrip_crossjoin() -> Result<()> {
let roundtrip_statement = plan_to_sql(&plan)?;
- let actual = format!("{}", &roundtrip_statement);
+ let actual = &roundtrip_statement.to_string();
println!("roundtrip sql: {actual}");
println!("plan {}", plan.display_indent());
@@ -237,7 +237,7 @@ fn roundtrip_crossjoin() -> Result<()> {
\n TableScan: j1\
\n TableScan: j2";
- assert_eq!(format!("{plan_roundtrip}"), expected);
+ assert_eq!(plan_roundtrip.to_string(), expected);
Ok(())
}
@@ -478,7 +478,7 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
let unparser = Unparser::new(&*query.unparser_dialect);
let roundtrip_statement = unparser.plan_to_sql(&plan)?;
- let actual = format!("{}", &roundtrip_statement);
+ let actual = &roundtrip_statement.to_string();
println!("roundtrip sql: {actual}");
println!("plan {}", plan.display_indent());
@@ -508,7 +508,7 @@ Projection:
unnest_placeholder(unnest_table.struct_col).field1, unnest_placehold
Projection: unnest_table.struct_col AS
unnest_placeholder(unnest_table.struct_col), unnest_table.array_col AS
unnest_placeholder(unnest_table.array_col), unnest_table.struct_col,
unnest_table.array_col
TableScan: unnest_table"#.trim_start();
- assert_eq!(format!("{plan}"), expected);
+ assert_eq!(plan.to_string(), expected);
Ok(())
}
@@ -528,7 +528,7 @@ fn test_table_references_in_plan_to_sql() {
.unwrap();
let sql = plan_to_sql(&plan).unwrap();
- assert_eq!(format!("{}", sql), expected_sql)
+ assert_eq!(sql.to_string(), expected_sql)
}
test(
@@ -558,7 +558,7 @@ fn test_table_scan_with_no_projection_in_plan_to_sql() {
.build()
.unwrap();
let sql = plan_to_sql(&plan).unwrap();
- assert_eq!(format!("{}", sql), expected_sql)
+ assert_eq!(sql.to_string(), expected_sql)
}
test(
@@ -667,27 +667,103 @@ where
}
#[test]
-fn test_table_scan_pushdown() -> Result<()> {
+fn test_table_scan_alias() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Utf8, false),
]);
+ let plan = table_scan(Some("t1"), &schema, None)?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+ let sql = plan_to_sql(&plan)?;
+ assert_eq!(sql.to_string(), "SELECT * FROM (SELECT t1.id FROM t1) AS a");
+
+ let plan = table_scan(Some("t1"), &schema, None)?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+
+ let sql = plan_to_sql(&plan)?;
+ assert_eq!(sql.to_string(), "SELECT * FROM (SELECT t1.id FROM t1) AS a");
+
+ let plan = table_scan(Some("t1"), &schema, None)?
+ .filter(col("id").gt(lit(5)))?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+ let sql = plan_to_sql(&plan)?;
+ assert_eq!(
+ sql.to_string(),
+ "SELECT * FROM (SELECT t1.id FROM t1 WHERE (t1.id > 5)) AS a"
+ );
+
+ let table_scan_with_two_filter = table_scan_with_filters(
+ Some("t1"),
+ &schema,
+ None,
+ vec![col("id").gt(lit(1)), col("age").lt(lit(2))],
+ )?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+ let table_scan_with_two_filter = plan_to_sql(&table_scan_with_two_filter)?;
+ assert_eq!(
+ table_scan_with_two_filter.to_string(),
+ "SELECT * FROM (SELECT t1.id FROM t1 WHERE ((t1.id > 1) AND (t1.age <
2))) AS a"
+ );
+
+ let table_scan_with_fetch =
+ table_scan_with_filter_and_fetch(Some("t1"), &schema, None, vec![],
Some(10))?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+ let table_scan_with_fetch = plan_to_sql(&table_scan_with_fetch)?;
+ assert_eq!(
+ table_scan_with_fetch.to_string(),
+ "SELECT * FROM (SELECT t1.id FROM (SELECT * FROM t1 LIMIT 10)) AS a"
+ );
+
+ let table_scan_with_pushdown_all = table_scan_with_filter_and_fetch(
+ Some("t1"),
+ &schema,
+ Some(vec![0, 1]),
+ vec![col("id").gt(lit(1))],
+ Some(10),
+ )?
+ .project(vec![col("id")])?
+ .alias("a")?
+ .build()?;
+ let table_scan_with_pushdown_all =
plan_to_sql(&table_scan_with_pushdown_all)?;
+ assert_eq!(
+ table_scan_with_pushdown_all.to_string(),
+ "SELECT * FROM (SELECT t1.id FROM (SELECT t1.id, t1.age FROM t1 WHERE
(t1.id > 1) LIMIT 10)) AS a"
+ );
+ Ok(())
+}
+
+#[test]
+fn test_table_scan_pushdown() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Utf8, false),
+ Field::new("age", DataType::Utf8, false),
+ ]);
let scan_with_projection =
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?;
let scan_with_projection = plan_to_sql(&scan_with_projection)?;
assert_eq!(
- format!("{}", scan_with_projection),
+ scan_with_projection.to_string(),
"SELECT t1.id, t1.age FROM t1"
);
let scan_with_projection = table_scan(Some("t1"), &schema,
Some(vec![1]))?.build()?;
let scan_with_projection = plan_to_sql(&scan_with_projection)?;
- assert_eq!(format!("{}", scan_with_projection), "SELECT t1.age FROM t1");
+ assert_eq!(scan_with_projection.to_string(), "SELECT t1.age FROM t1");
let scan_with_no_projection = table_scan(Some("t1"), &schema,
None)?.build()?;
let scan_with_no_projection = plan_to_sql(&scan_with_no_projection)?;
- assert_eq!(format!("{}", scan_with_no_projection), "SELECT * FROM t1");
+ assert_eq!(scan_with_no_projection.to_string(), "SELECT * FROM t1");
let table_scan_with_projection_alias =
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?
@@ -696,7 +772,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_projection_alias =
plan_to_sql(&table_scan_with_projection_alias)?;
assert_eq!(
- format!("{}", table_scan_with_projection_alias),
+ table_scan_with_projection_alias.to_string(),
"SELECT ta.id, ta.age FROM t1 AS ta"
);
@@ -707,7 +783,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_projection_alias =
plan_to_sql(&table_scan_with_projection_alias)?;
assert_eq!(
- format!("{}", table_scan_with_projection_alias),
+ table_scan_with_projection_alias.to_string(),
"SELECT ta.age FROM t1 AS ta"
);
@@ -717,7 +793,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_no_projection_alias =
plan_to_sql(&table_scan_with_no_projection_alias)?;
assert_eq!(
- format!("{}", table_scan_with_no_projection_alias),
+ table_scan_with_no_projection_alias.to_string(),
"SELECT * FROM t1 AS ta"
);
@@ -729,7 +805,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection =
plan_to_sql(&query_from_table_scan_with_projection)?;
assert_eq!(
- format!("{}", query_from_table_scan_with_projection),
+ query_from_table_scan_with_projection.to_string(),
"SELECT * FROM (SELECT t1.id, t1.age FROM t1)"
);
@@ -742,7 +818,7 @@ fn test_table_scan_pushdown() -> Result<()> {
.build()?;
let table_scan_with_filter = plan_to_sql(&table_scan_with_filter)?;
assert_eq!(
- format!("{}", table_scan_with_filter),
+ table_scan_with_filter.to_string(),
"SELECT * FROM t1 WHERE (t1.id > t1.age)"
);
@@ -755,7 +831,7 @@ fn test_table_scan_pushdown() -> Result<()> {
.build()?;
let table_scan_with_two_filter = plan_to_sql(&table_scan_with_two_filter)?;
assert_eq!(
- format!("{}", table_scan_with_two_filter),
+ table_scan_with_two_filter.to_string(),
"SELECT * FROM t1 WHERE ((t1.id > 1) AND (t1.age < 2))"
);
@@ -769,7 +845,7 @@ fn test_table_scan_pushdown() -> Result<()> {
.build()?;
let table_scan_with_filter_alias =
plan_to_sql(&table_scan_with_filter_alias)?;
assert_eq!(
- format!("{}", table_scan_with_filter_alias),
+ table_scan_with_filter_alias.to_string(),
"SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)"
);
@@ -783,7 +859,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_projection_and_filter =
plan_to_sql(&table_scan_with_projection_and_filter)?;
assert_eq!(
- format!("{}", table_scan_with_projection_and_filter),
+ table_scan_with_projection_and_filter.to_string(),
"SELECT t1.id, t1.age FROM t1 WHERE (t1.id > t1.age)"
);
@@ -797,7 +873,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_projection_and_filter =
plan_to_sql(&table_scan_with_projection_and_filter)?;
assert_eq!(
- format!("{}", table_scan_with_projection_and_filter),
+ table_scan_with_projection_and_filter.to_string(),
"SELECT t1.age FROM t1 WHERE (t1.id > t1.age)"
);
@@ -806,7 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> {
.build()?;
let table_scan_with_inline_fetch =
plan_to_sql(&table_scan_with_inline_fetch)?;
assert_eq!(
- format!("{}", table_scan_with_inline_fetch),
+ table_scan_with_inline_fetch.to_string(),
"SELECT * FROM t1 LIMIT 10"
);
@@ -821,7 +897,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let table_scan_with_projection_and_inline_fetch =
plan_to_sql(&table_scan_with_projection_and_inline_fetch)?;
assert_eq!(
- format!("{}", table_scan_with_projection_and_inline_fetch),
+ table_scan_with_projection_and_inline_fetch.to_string(),
"SELECT t1.id, t1.age FROM t1 LIMIT 10"
);
@@ -835,7 +911,7 @@ fn test_table_scan_pushdown() -> Result<()> {
.build()?;
let table_scan_with_all = plan_to_sql(&table_scan_with_all)?;
assert_eq!(
- format!("{}", table_scan_with_all),
+ table_scan_with_all.to_string(),
"SELECT t1.id, t1.age FROM t1 WHERE (t1.id > t1.age) LIMIT 10"
);
Ok(())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]