This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3438b35530 Move wildcard expansions to the analyzer (#11681)
3438b35530 is described below
commit 3438b355308afa23dba399f4aec5760969d054c5
Author: Jax Liu <[email protected]>
AuthorDate: Wed Aug 14 02:28:02 2024 +0800
Move wildcard expansions to the analyzer (#11681)
* allow qualified wildcard in the logical plan
* move wildcard expansions to the analyzer
* fix fmt
* fix the view tests
* expand wildcard for schema
* fix for union query
* cargo fmt clippy
* move wildcard expanding tests to expand_wildcard_rule.rs
* coercion the expanded wildcard expression in union
* remove debug message
* move wildcard options to logical plan
* remove unused function
* add the doc for expression function
* fix cargo check
* fix cargo fmt
* fix test
* extract expand_exprlist
* expand wildcard for functional_dependencies
* refine the doc
* fix tests
* fix expand exclude and except
* remove unused import
* fix check and update function
* fix check
* throw the error when exprlist to field
* fix functional_dependency and exclude
* fix projection_schema
* fix the window functions
* fix clippy and support unparsing wildcard
* fix clippy and fmt
* add the doc for util functions
* fix unique expression check for projection
* cargo fmt
* move test and solve dependency issue
* address review comments
* add the fail reason
* enhance the doc
* add more doc
---
datafusion/core/src/datasource/view.rs | 44 ++-
datafusion/core/src/execution/context/mod.rs | 1 -
datafusion/expr/src/expr.rs | 221 ++++++++++++++-
datafusion/expr/src/expr_fn.rs | 45 ++-
datafusion/expr/src/expr_rewriter/mod.rs | 1 +
datafusion/expr/src/expr_schema.rs | 21 +-
datafusion/expr/src/logical_plan/builder.rs | 41 +--
datafusion/expr/src/logical_plan/plan.rs | 55 +++-
datafusion/expr/src/utils.rs | 150 ++++++++--
.../optimizer/src/analyzer/count_wildcard_rule.rs | 8 +-
.../optimizer/src/analyzer/expand_wildcard_rule.rs | 304 +++++++++++++++++++++
.../optimizer/src/analyzer/inline_table_scan.rs | 8 +-
datafusion/optimizer/src/analyzer/mod.rs | 5 +
datafusion/optimizer/src/analyzer/type_coercion.rs | 47 +++-
.../optimizer/tests/optimizer_integration.rs | 21 ++
datafusion/proto/src/logical_plan/from_proto.rs | 7 +-
datafusion/proto/src/logical_plan/to_proto.rs | 2 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 8 +-
datafusion/sql/src/expr/function.rs | 13 +-
datafusion/sql/src/expr/mod.rs | 12 +-
datafusion/sql/src/select.rs | 120 ++++----
datafusion/sql/src/unparser/expr.rs | 54 +++-
datafusion/sql/tests/sql_integration.rs | 78 +++---
datafusion/sqllogictest/test_files/explain.slt | 1 +
datafusion/sqllogictest/test_files/select.slt | 6 +-
datafusion/sqllogictest/test_files/union.slt | 35 +++
datafusion/sqllogictest/test_files/window.slt | 3 +-
27 files changed, 1057 insertions(+), 254 deletions(-)
diff --git a/datafusion/core/src/datasource/view.rs
b/datafusion/core/src/datasource/view.rs
index 98d118c027..a81942bf76 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -19,17 +19,19 @@
use std::{any::Any, sync::Arc};
-use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_common::Column;
-use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
-
use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan},
physical_plan::ExecutionPlan,
};
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::Column;
+use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
+use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
+use datafusion_optimizer::Analyzer;
use crate::datasource::{TableProvider, TableType};
@@ -50,6 +52,7 @@ impl ViewTable {
logical_plan: LogicalPlan,
definition: Option<String>,
) -> Result<Self> {
+ let logical_plan = Self::apply_required_rule(logical_plan)?;
let table_schema = logical_plan.schema().as_ref().to_owned().into();
let view = Self {
@@ -61,6 +64,15 @@ impl ViewTable {
Ok(view)
}
+ fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
+ let options = ConfigOptions::default();
+
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
+ logical_plan,
+ &options,
+ |_, _| {},
+ )
+ }
+
/// Get definition ref
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
@@ -232,6 +244,26 @@ mod tests {
assert_batches_eq!(expected, &results);
+ let view_sql =
+ "CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as
column1) FROM xyz";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx
+ .sql("SELECT * FROM replace_xyz")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = [
+ "+---------+---------+---------+",
+ "| column1 | column2 | column3 |",
+ "+---------+---------+---------+",
+ "| 2 | 2 | 3 |",
+ "| 8 | 5 | 6 |",
+ "+---------+---------+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
Ok(())
}
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index c63ffddd81..972a6f6437 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -718,7 +718,6 @@ impl SessionContext {
}
(_, Err(_)) => {
let table = Arc::new(ViewTable::try_new((*input).clone(),
definition)?);
-
self.register_table(name, table)?;
self.return_empty_dataframe()
}
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 5030a95d3c..b4d489cc7c 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -41,7 +41,10 @@ use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DFSchema, Result,
ScalarValue,
TableReference,
};
-use sqlparser::ast::NullTreatment;
+use sqlparser::ast::{
+ display_comma_separated, ExceptSelectItem, ExcludeSelectItem,
IlikeSelectItem,
+ NullTreatment, RenameSelectItem, ReplaceSelectElement,
+};
/// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
///
@@ -315,7 +318,10 @@ pub enum Expr {
///
/// This expr has to be resolved to a list of columns before translating
logical
/// plan into physical plan.
- Wildcard { qualifier: Option<TableReference> },
+ Wildcard {
+ qualifier: Option<TableReference>,
+ options: WildcardOptions,
+ },
/// List of grouping set expressions. Only valid in the context of an
aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
@@ -970,6 +976,89 @@ impl GroupingSet {
}
}
+/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and
Bigquery `EXCEPT`.
+#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+pub struct WildcardOptions {
+ /// `[ILIKE...]`.
+ /// Snowflake syntax:
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+ pub ilike: Option<IlikeSelectItem>,
+ /// `[EXCLUDE...]`.
+ /// Snowflake syntax:
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+ pub exclude: Option<ExcludeSelectItem>,
+ /// `[EXCEPT...]`.
+ /// BigQuery syntax:
<https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_except>
+ /// Clickhouse syntax:
<https://clickhouse.com/docs/en/sql-reference/statements/select#except>
+ pub except: Option<ExceptSelectItem>,
+ /// `[REPLACE]`
+ /// BigQuery syntax:
<https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_replace>
+ /// Clickhouse syntax:
<https://clickhouse.com/docs/en/sql-reference/statements/select#replace>
+ /// Snowflake syntax:
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+ pub replace: Option<PlannedReplaceSelectItem>,
+ /// `[RENAME ...]`.
+ /// Snowflake syntax:
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+ pub rename: Option<RenameSelectItem>,
+}
+
+impl WildcardOptions {
+ pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
+ WildcardOptions {
+ ilike: self.ilike,
+ exclude: self.exclude,
+ except: self.except,
+ replace: Some(replace),
+ rename: self.rename,
+ }
+ }
+}
+
+impl Display for WildcardOptions {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ if let Some(ilike) = &self.ilike {
+ write!(f, " {ilike}")?;
+ }
+ if let Some(exclude) = &self.exclude {
+ write!(f, " {exclude}")?;
+ }
+ if let Some(except) = &self.except {
+ write!(f, " {except}")?;
+ }
+ if let Some(replace) = &self.replace {
+ write!(f, " {replace}")?;
+ }
+ if let Some(rename) = &self.rename {
+ write!(f, " {rename}")?;
+ }
+ Ok(())
+ }
+}
+
+/// The planned expressions for `REPLACE`
+#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+pub struct PlannedReplaceSelectItem {
+ /// The original ast nodes
+ pub items: Vec<ReplaceSelectElement>,
+ /// The expression planned from the ast nodes. They will be used when
expanding the wildcard.
+ pub planned_expressions: Vec<Expr>,
+}
+
+impl Display for PlannedReplaceSelectItem {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(f, "REPLACE")?;
+ write!(f, " ({})", display_comma_separated(&self.items))?;
+ Ok(())
+ }
+}
+
+impl PlannedReplaceSelectItem {
+ pub fn items(&self) -> &[ReplaceSelectElement] {
+ &self.items
+ }
+
+ pub fn expressions(&self) -> &[Expr] {
+ &self.planned_expressions
+ }
+}
+
/// Fixed seed for the hashing so that Ords are consistent across runs
const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
@@ -1720,8 +1809,9 @@ impl Expr {
Expr::ScalarSubquery(subquery) => {
subquery.hash(hasher);
}
- Expr::Wildcard { qualifier } => {
+ Expr::Wildcard { qualifier, options } => {
qualifier.hash(hasher);
+ options.hash(hasher);
}
Expr::GroupingSet(grouping_set) => {
mem::discriminant(grouping_set).hash(hasher);
@@ -2242,9 +2332,9 @@ impl fmt::Display for Expr {
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
}
}
- Expr::Wildcard { qualifier } => match qualifier {
- Some(qualifier) => write!(f, "{qualifier}.*"),
- None => write!(f, "*"),
+ Expr::Wildcard { qualifier, options } => match qualifier {
+ Some(qualifier) => write!(f, "{qualifier}.*{options}"),
+ None => write!(f, "*{options}"),
},
Expr::GroupingSet(grouping_sets) => match grouping_sets {
GroupingSet::Rollup(exprs) => {
@@ -2543,9 +2633,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool)
-> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort
expression")
}
- Expr::Wildcard { .. } => {
- internal_err!("Create physical name does not support wildcard")
- }
+ Expr::Wildcard { qualifier, options } => match qualifier {
+ Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
+ None => Ok(format!("*{}", options)),
+ },
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
@@ -2558,7 +2649,12 @@ fn create_physical_name(e: &Expr, is_first_expr: bool)
-> Result<String> {
#[cfg(test)]
mod test {
use crate::expr_fn::col;
- use crate::{case, lit, ColumnarValue, ScalarUDF, ScalarUDFImpl,
Volatility};
+ use crate::{
+ case, lit, qualified_wildcard, wildcard, wildcard_with_options,
ColumnarValue,
+ ScalarUDF, ScalarUDFImpl, Volatility,
+ };
+ use sqlparser::ast;
+ use sqlparser::ast::{Ident, IdentWithAlias};
use std::any::Any;
#[test]
@@ -2859,4 +2955,109 @@ mod test {
);
assert_eq!(find_df_window_func("not_exist"), None)
}
+
+ #[test]
+ fn test_display_wildcard() {
+ assert_eq!(format!("{}", wildcard()), "*");
+ assert_eq!(format!("{}", qualified_wildcard("t1")), "t1.*");
+ assert_eq!(
+ format!(
+ "{}",
+ wildcard_with_options(wildcard_options(
+ Some(IlikeSelectItem {
+ pattern: "c1".to_string()
+ }),
+ None,
+ None,
+ None,
+ None
+ ))
+ ),
+ "* ILIKE 'c1'"
+ );
+ assert_eq!(
+ format!(
+ "{}",
+ wildcard_with_options(wildcard_options(
+ None,
+ Some(ExcludeSelectItem::Multiple(vec![
+ Ident::from("c1"),
+ Ident::from("c2")
+ ])),
+ None,
+ None,
+ None
+ ))
+ ),
+ "* EXCLUDE (c1, c2)"
+ );
+ assert_eq!(
+ format!(
+ "{}",
+ wildcard_with_options(wildcard_options(
+ None,
+ None,
+ Some(ExceptSelectItem {
+ first_element: Ident::from("c1"),
+ additional_elements: vec![Ident::from("c2")]
+ }),
+ None,
+ None
+ ))
+ ),
+ "* EXCEPT (c1, c2)"
+ );
+ assert_eq!(
+ format!(
+ "{}",
+ wildcard_with_options(wildcard_options(
+ None,
+ None,
+ None,
+ Some(PlannedReplaceSelectItem {
+ items: vec![ReplaceSelectElement {
+ expr: ast::Expr::Identifier(Ident::from("c1")),
+ column_name: Ident::from("a1"),
+ as_keyword: false
+ }],
+ planned_expressions: vec![]
+ }),
+ None
+ ))
+ ),
+ "* REPLACE (c1 a1)"
+ );
+ assert_eq!(
+ format!(
+ "{}",
+ wildcard_with_options(wildcard_options(
+ None,
+ None,
+ None,
+ None,
+ Some(RenameSelectItem::Multiple(vec![IdentWithAlias {
+ ident: Ident::from("c1"),
+ alias: Ident::from("a1")
+ }]))
+ ))
+ ),
+ "* RENAME (c1 AS a1)"
+ )
+ }
+
+ fn wildcard_options(
+ opt_ilike: Option<IlikeSelectItem>,
+ opt_exclude: Option<ExcludeSelectItem>,
+ opt_except: Option<ExceptSelectItem>,
+ opt_replace: Option<PlannedReplaceSelectItem>,
+ opt_rename: Option<RenameSelectItem>,
+ ) -> WildcardOptions {
+ WildcardOptions {
+ ilike: opt_ilike,
+ exclude: opt_exclude,
+ except: opt_except,
+ replace: opt_replace,
+ rename: opt_rename,
+ }
+ }
}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index e9c5485656..4e60223996 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -19,7 +19,7 @@
use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList,
InSubquery,
- Placeholder, TryCast, Unnest, WindowFunction,
+ Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction,
};
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
@@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{
parse_interval_day_time, parse_interval_month_day_nano,
parse_interval_year_month,
};
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{plan_err, Column, Result, ScalarValue};
+use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
use sqlparser::ast::NullTreatment;
use std::any::Any;
use std::fmt::Debug;
@@ -119,7 +119,46 @@ pub fn placeholder(id: impl Into<String>) -> Expr {
/// assert_eq!(p.to_string(), "*")
/// ```
pub fn wildcard() -> Expr {
- Expr::Wildcard { qualifier: None }
+ Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }
+}
+
+/// Create an '*' [`Expr::Wildcard`] expression with the wildcard options
+pub fn wildcard_with_options(options: WildcardOptions) -> Expr {
+ Expr::Wildcard {
+ qualifier: None,
+ options,
+ }
+}
+
+/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns
from a specific table
+///
+/// # Example
+///
+/// ```rust
+/// # use datafusion_common::TableReference;
+/// # use datafusion_expr::{qualified_wildcard};
+/// let p = qualified_wildcard(TableReference::bare("t"));
+/// assert_eq!(p.to_string(), "t.*")
+/// ```
+pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> Expr {
+ Expr::Wildcard {
+ qualifier: Some(qualifier.into()),
+ options: WildcardOptions::default(),
+ }
+}
+
+/// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options
+pub fn qualified_wildcard_with_options(
+ qualifier: impl Into<TableReference>,
+ options: WildcardOptions,
+) -> Expr {
+ Expr::Wildcard {
+ qualifier: Some(qualifier.into()),
+ options,
+ }
}
/// Return a new expression `left <op> right`
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index 0dc41d4a9a..32e621350e 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -248,6 +248,7 @@ fn coerce_exprs_for_schema(
Expr::Alias(Alias { expr, name, .. }) => {
Ok(expr.cast_to(new_type, src_schema)?.alias(name))
}
+ Expr::Wildcard { .. } => Ok(expr),
_ => expr.cast_to(new_type, src_schema),
}
} else {
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 73123819ba..af35b9a991 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -28,8 +28,8 @@ use crate::{utils, LogicalPlan, Projection, Subquery,
WindowFunctionDefinition};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{
- internal_err, not_impl_err, plan_datafusion_err, plan_err, Column,
ExprSchema,
- Result, TableReference,
+ not_impl_err, plan_datafusion_err, plan_err, Column, ExprSchema, Result,
+ TableReference,
};
use std::collections::HashMap;
use std::sync::Arc;
@@ -244,13 +244,7 @@ impl ExprSchemable for Expr {
)
})
}
- Expr::Wildcard { qualifier } => {
- // Wildcard do not really have a type and do not appear in
projections
- match qualifier {
- Some(_) => internal_err!("QualifiedWildcard expressions
are not valid in a logical query plan"),
- None => Ok(DataType::Null)
- }
- }
+ Expr::Wildcard { .. } => Ok(DataType::Null),
Expr::GroupingSet(_) => {
// grouping sets do not really have a type and do not appear
in projections
Ok(DataType::Null)
@@ -362,12 +356,7 @@ impl ExprSchemable for Expr {
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
Ok(expr.nullable(input_schema)? ||
pattern.nullable(input_schema)?)
}
- Expr::Wildcard { qualifier } => match qualifier {
- Some(_) => internal_err!(
- "QualifiedWildcard expressions are not valid in a logical
query plan"
- ),
- None => Ok(false),
- },
+ Expr::Wildcard { .. } => Ok(false),
Expr::GroupingSet(_) => {
// grouping sets do not really have the concept of nullable
and do not appear
// in projections
@@ -548,7 +537,7 @@ mod tests {
use super::*;
use crate::{col, lit};
- use datafusion_common::{DFSchema, ScalarValue};
+ use datafusion_common::{internal_err, DFSchema, ScalarValue};
macro_rules! test_is_expr_nullable {
($EXPR_TYPE:ident) => {{
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 4ef346656f..e95fcdd128 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -38,9 +38,8 @@ use crate::logical_plan::{
};
use crate::type_coercion::binary::{comparison_coercion, values_coercion};
use crate::utils::{
- can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
- expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
- group_window_expr_by_sort_keys,
+ can_hash, columnize_expr, compare_sort_expr, expr_to_columns,
+ find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
};
use crate::{
and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr,
@@ -1316,7 +1315,7 @@ fn add_group_by_exprs_from_dependencies(
Ok(group_expr)
}
/// Errors if one or more expressions have equal names.
-pub(crate) fn validate_unique_names<'a>(
+pub fn validate_unique_names<'a>(
node_name: &str,
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
@@ -1356,6 +1355,7 @@ pub fn project_with_column_index(
ref name,
}) if name != schema.field(i).name() =>
e.alias(schema.field(i).name()),
Expr::Alias { .. } | Expr::Column { .. } => e,
+ Expr::Wildcard { .. } => e,
_ => e.alias(schema.field(i).name()),
})
.collect::<Vec<_>>();
@@ -1440,22 +1440,11 @@ pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<LogicalPlan> {
- // TODO: move it into analyzer
- let input_schema = plan.schema();
let mut projected_expr = vec![];
for e in expr {
let e = e.into();
match e {
- Expr::Wildcard { qualifier: None } => {
- projected_expr.extend(expand_wildcard(input_schema, &plan,
None)?)
- }
- Expr::Wildcard {
- qualifier: Some(qualifier),
- } => projected_expr.extend(expand_qualified_wildcard(
- &qualifier,
- input_schema,
- None,
- )?),
+ Expr::Wildcard { .. } => projected_expr.push(e),
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?,
&plan)?),
}
}
@@ -1807,26 +1796,6 @@ mod tests {
Ok(())
}
- #[test]
- fn plan_using_join_wildcard_projection() -> Result<()> {
- let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?;
-
- let plan = table_scan(Some("t1"), &employee_schema(), None)?
- .join_using(t2, JoinType::Inner, vec!["id"])?
- .project(vec![Expr::Wildcard { qualifier: None }])?
- .build()?;
-
- // id column should only show up once in projection
- let expected = "Projection: t1.id, t1.first_name, t1.last_name,
t1.state, t1.salary, t2.first_name, t2.last_name, t2.state, t2.salary\
- \n Inner Join: Using t1.id = t2.id\
- \n TableScan: t1\
- \n TableScan: t2";
-
- assert_eq!(expected, format!("{plan}"));
-
- Ok(())
- }
-
#[test]
fn plan_builder_union() -> Result<()> {
let plan =
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index c5538d8880..2bab6d516a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -31,8 +31,9 @@ use crate::logical_plan::display::{GraphvizVisitor,
IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
use crate::utils::{
- enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
- grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
+ enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
+ find_out_reference_exprs, grouping_set_expr_count,
grouping_set_to_exprlist,
+ split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
@@ -1977,7 +1978,9 @@ impl Projection {
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
) -> Result<Self> {
- if expr.len() != schema.fields().len() {
+ if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
+ && expr.len() != schema.fields().len()
+ {
return plan_err!("Projection has mismatch between number of
expressions ({}) and number of fields in schema ({})", expr.len(),
schema.fields().len());
}
Ok(Self {
@@ -2763,20 +2766,48 @@ fn calc_func_dependencies_for_project(
// Calculate expression indices (if present) in the input schema.
let proj_indices = exprs
.iter()
- .filter_map(|expr| {
- let expr_name = match expr {
- Expr::Alias(alias) => {
- format!("{}", alias.expr)
- }
- _ => format!("{}", expr),
- };
- input_fields.iter().position(|item| *item == expr_name)
+ .map(|expr| match expr {
+ Expr::Wildcard { qualifier, options } => {
+ let wildcard_fields = exprlist_to_fields(
+ vec![&Expr::Wildcard {
+ qualifier: qualifier.clone(),
+ options: options.clone(),
+ }],
+ input,
+ )?;
+ Ok::<_, DataFusionError>(
+ wildcard_fields
+ .into_iter()
+ .filter_map(|(qualifier, f)| {
+ let flat_name = qualifier
+ .map(|t| format!("{}.{}", t, f.name()))
+ .unwrap_or(f.name().clone());
+ input_fields.iter().position(|item| *item ==
flat_name)
+ })
+ .collect::<Vec<_>>(),
+ )
+ }
+ Expr::Alias(alias) => Ok(input_fields
+ .iter()
+ .position(|item| *item == format!("{}", alias.expr))
+ .map(|i| vec![i])
+ .unwrap_or(vec![])),
+ _ => Ok(input_fields
+ .iter()
+ .position(|item| *item == format!("{}", expr))
+ .map(|i| vec![i])
+ .unwrap_or(vec![])),
})
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
.collect::<Vec<_>>();
+
+ let len = exprlist_len(exprs, input.schema(),
Some(find_base_plan(input).schema()))?;
Ok(input
.schema()
.functional_dependencies()
- .project_functional_dependencies(&proj_indices, exprs.len()))
+ .project_functional_dependencies(&proj_indices, len))
}
/// Sorts its input according to a list of sort expressions.
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 7b650d1ab4..4db5061e8f 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -21,7 +21,7 @@ use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use crate::expr::{Alias, Sort, WindowFunction};
+use crate::expr::{Alias, Sort, WildcardOptions, WindowFunction};
use crate::expr_rewriter::strip_outer_reference;
use crate::{
and, BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan,
Operator,
@@ -34,11 +34,11 @@ use datafusion_common::tree_node::{
};
use datafusion_common::utils::get_at_indices;
use datafusion_common::{
- internal_err, plan_datafusion_err, plan_err, Column, DFSchema,
DFSchemaRef, Result,
- TableReference,
+ internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
+ DataFusionError, Result, TableReference,
};
-use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem,
WildcardAdditionalOptions};
+use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};
pub use
datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
@@ -377,7 +377,7 @@ fn get_exprs_except_skipped(
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
- wildcard_options: Option<&WildcardAdditionalOptions>,
+ wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
let mut columns_to_skip = using_columns
@@ -401,9 +401,9 @@ pub fn expand_wildcard(
.collect::<Vec<_>>()
})
.collect::<HashSet<_>>();
- let excluded_columns = if let Some(WildcardAdditionalOptions {
- opt_exclude,
- opt_except,
+ let excluded_columns = if let Some(WildcardOptions {
+ exclude: opt_exclude,
+ except: opt_except,
..
}) = wildcard_options
{
@@ -420,7 +420,7 @@ pub fn expand_wildcard(
pub fn expand_qualified_wildcard(
qualifier: &TableReference,
schema: &DFSchema,
- wildcard_options: Option<&WildcardAdditionalOptions>,
+ wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
let qualified_indices = schema.fields_indices_with_qualified(qualifier);
let projected_func_dependencies = schema
@@ -435,9 +435,9 @@ pub fn expand_qualified_wildcard(
let qualified_dfschema =
DFSchema::try_from_qualified_schema(qualifier.clone(),
&qualified_schema)?
.with_functional_dependencies(projected_func_dependencies)?;
- let excluded_columns = if let Some(WildcardAdditionalOptions {
- opt_exclude,
- opt_except,
+ let excluded_columns = if let Some(WildcardOptions {
+ exclude: opt_exclude,
+ except: opt_except,
..
}) = wildcard_options
{
@@ -731,11 +731,129 @@ pub fn exprlist_to_fields<'a>(
plan: &LogicalPlan,
) -> Result<Vec<(Option<TableReference>, Arc<Field>)>> {
// look for exact match in plan's output schema
- let input_schema = &plan.schema();
- exprs
+ let wildcard_schema = find_base_plan(plan).schema();
+ let input_schema = plan.schema();
+ let result = exprs
.into_iter()
- .map(|e| e.to_field(input_schema))
- .collect()
+ .map(|e| match e {
+ Expr::Wildcard { qualifier, options } => match qualifier {
+ None => {
+ let excluded: Vec<String> = get_excluded_columns(
+ options.exclude.as_ref(),
+ options.except.as_ref(),
+ wildcard_schema,
+ None,
+ )?
+ .into_iter()
+ .map(|c| c.flat_name())
+ .collect();
+ Ok::<_, DataFusionError>(
+ wildcard_schema
+ .field_names()
+ .iter()
+ .enumerate()
+ .filter(|(_, s)| !excluded.contains(s))
+ .map(|(i, _)| wildcard_schema.qualified_field(i))
+ .map(|(qualifier, f)| {
+ (qualifier.cloned(), Arc::new(f.to_owned()))
+ })
+ .collect::<Vec<_>>(),
+ )
+ }
+ Some(qualifier) => {
+ let excluded: Vec<String> = get_excluded_columns(
+ options.exclude.as_ref(),
+ options.except.as_ref(),
+ wildcard_schema,
+ Some(qualifier),
+ )?
+ .into_iter()
+ .map(|c| c.flat_name())
+ .collect();
+ Ok(wildcard_schema
+ .fields_with_qualified(qualifier)
+ .into_iter()
+ .filter_map(|field| {
+ let flat_name = format!("{}.{}", qualifier,
field.name());
+ if excluded.contains(&flat_name) {
+ None
+ } else {
+ Some((
+ Some(qualifier.clone()),
+ Arc::new(field.to_owned()),
+ ))
+ }
+ })
+ .collect::<Vec<_>>())
+ }
+ },
+ _ => Ok(vec![e.to_field(input_schema)?]),
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
+ Ok(result)
+}
+
+/// Find the suitable base plan to expand the wildcard expression recursively.
+/// When planning [LogicalPlan::Window] and [LogicalPlan::Aggregate], we will
generate
+/// an intermediate plan based on the relation plan (e.g.
[LogicalPlan::TableScan], [LogicalPlan::Subquery], ...).
+/// If we expand a wildcard expression basing the intermediate plan, we could
get some duplicate fields.
+pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan {
+ match input {
+ LogicalPlan::Window(window) => find_base_plan(&window.input),
+ LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input),
+ _ => input,
+ }
+}
+
+/// Count the number of real fields. We should expand the wildcard expression
to get the actual number.
+pub fn exprlist_len(
+ exprs: &[Expr],
+ schema: &DFSchemaRef,
+ wildcard_schema: Option<&DFSchemaRef>,
+) -> Result<usize> {
+ exprs
+ .iter()
+ .map(|e| match e {
+ Expr::Wildcard {
+ qualifier: None,
+ options,
+ } => {
+ let excluded = get_excluded_columns(
+ options.exclude.as_ref(),
+ options.except.as_ref(),
+ wildcard_schema.unwrap_or(schema),
+ None,
+ )?
+ .into_iter()
+ .collect::<HashSet<Column>>();
+ Ok(
+
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
+ .len(),
+ )
+ }
+ Expr::Wildcard {
+ qualifier: Some(qualifier),
+ options,
+ } => {
+ let excluded = get_excluded_columns(
+ options.exclude.as_ref(),
+ options.except.as_ref(),
+ wildcard_schema.unwrap_or(schema),
+ Some(qualifier),
+ )?
+ .into_iter()
+ .collect::<HashSet<Column>>();
+ Ok(
+
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
+ .len(),
+ )
+ }
+ _ => Ok(1),
+ })
+ .sum()
}
/// Convert an expression into Column expression if it's already provided as
input plan.
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 8ff00917dc..593dab2bc9 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -48,7 +48,13 @@ impl AnalyzerRule for CountWildcardRule {
}
fn is_wildcard(expr: &Expr) -> bool {
- matches!(expr, Expr::Wildcard { qualifier: None })
+ matches!(
+ expr,
+ Expr::Wildcard {
+ qualifier: None,
+ ..
+ }
+ )
}
fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool {
diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
new file mode 100644
index 0000000000..53ba3042f5
--- /dev/null
+++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::AnalyzerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult};
+use datafusion_common::{Column, Result};
+use datafusion_expr::builder::validate_unique_names;
+use datafusion_expr::expr::PlannedReplaceSelectItem;
+use datafusion_expr::utils::{
+ expand_qualified_wildcard, expand_wildcard, find_base_plan,
+};
+use datafusion_expr::{Expr, LogicalPlan, Projection, SubqueryAlias};
+
+#[derive(Default)]
+pub struct ExpandWildcardRule {}
+
+impl ExpandWildcardRule {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl AnalyzerRule for ExpandWildcardRule {
+ fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
+ // Because the wildcard expansion is based on the schema of the input
plan,
+ // using `transform_up_with_subqueries` here.
+ plan.transform_up_with_subqueries(expand_internal).data()
+ }
+
+ fn name(&self) -> &str {
+ "expand_wildcard_rule"
+ }
+}
+
+fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+ match plan {
+ LogicalPlan::Projection(Projection { expr, input, .. }) => {
+ let projected_expr = expand_exprlist(&input, expr)?;
+ validate_unique_names("Projections", projected_expr.iter())?;
+ Ok(Transformed::yes(
+ Projection::try_new(projected_expr, Arc::clone(&input))
+ .map(LogicalPlan::Projection)?,
+ ))
+ }
+ // Teh schema of the plan should also be updated if the child plan is
transformed.
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+ Ok(Transformed::yes(
+ SubqueryAlias::try_new(input,
alias).map(LogicalPlan::SubqueryAlias)?,
+ ))
+ }
+ _ => Ok(Transformed::no(plan)),
+ }
+}
+
+fn expand_exprlist(input: &LogicalPlan, expr: Vec<Expr>) -> Result<Vec<Expr>> {
+ let mut projected_expr = vec![];
+ let input = find_base_plan(input);
+ for e in expr {
+ match e {
+ Expr::Wildcard { qualifier, options } => {
+ if let Some(qualifier) = qualifier {
+ let expanded = expand_qualified_wildcard(
+ &qualifier,
+ input.schema(),
+ Some(&options),
+ )?;
+ // If there is a REPLACE statement, replace that column
with the given
+ // replace expression. Column name remains the same.
+ let replaced = if let Some(replace) = options.replace {
+ replace_columns(expanded, replace)?
+ } else {
+ expanded
+ };
+ projected_expr.extend(replaced);
+ } else {
+ let expanded =
+ expand_wildcard(input.schema(), input,
Some(&options))?;
+ // If there is a REPLACE statement, replace that column
with the given
+ // replace expression. Column name remains the same.
+ let replaced = if let Some(replace) = options.replace {
+ replace_columns(expanded, replace)?
+ } else {
+ expanded
+ };
+ projected_expr.extend(replaced);
+ }
+ }
+ // A workaround to handle the case when the column name is "*".
+ // We transform the expression to a Expr::Column through
[Column::from_name] in many places.
+ // It would also convert the wildcard expression to a column
expression with name "*".
+ Expr::Column(Column {
+ ref relation,
+ ref name,
+ }) => {
+ if name.eq("*") {
+ if let Some(qualifier) = relation {
+ projected_expr.extend(expand_qualified_wildcard(
+ qualifier,
+ input.schema(),
+ None,
+ )?);
+ } else {
+ projected_expr.extend(expand_wildcard(
+ input.schema(),
+ input,
+ None,
+ )?);
+ }
+ } else {
+ projected_expr.push(e.clone());
+ }
+ }
+ _ => projected_expr.push(e),
+ }
+ }
+ Ok(projected_expr)
+}
+
+/// If there is a REPLACE statement in the projected expression in the form of
+/// "REPLACE (some_column_within_an_expr AS some_column)", this function
replaces
+/// that column with the given replace expression. Column name remains the
same.
+/// Multiple REPLACEs are also possible with comma separations.
+fn replace_columns(
+ mut exprs: Vec<Expr>,
+ replace: PlannedReplaceSelectItem,
+) -> Result<Vec<Expr>> {
+ for expr in exprs.iter_mut() {
+ if let Expr::Column(Column { name, .. }) = expr {
+ if let Some((_, new_expr)) = replace
+ .items()
+ .iter()
+ .zip(replace.expressions().iter())
+ .find(|(item, _)| item.column_name.value == *name)
+ {
+ *expr = new_expr.clone().alias(name.clone())
+ }
+ }
+ }
+ Ok(exprs)
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::datatypes::{DataType, Field, Schema};
+
+ use datafusion_common::{JoinType, TableReference};
+ use datafusion_expr::{
+ col, in_subquery, qualified_wildcard, table_scan, wildcard,
LogicalPlanBuilder,
+ };
+
+ use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
+ use crate::Analyzer;
+
+ use super::*;
+
+ fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
+ assert_analyzed_plan_eq_display_indent(
+ Arc::new(ExpandWildcardRule::new()),
+ plan,
+ expected,
+ )
+ }
+
+ #[test]
+ fn test_expand_wildcard() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![wildcard()])?
+ .build()?;
+ let expected =
+ "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+ assert_plan_eq(plan, expected)
+ }
+
+ #[test]
+ fn test_expand_qualified_wildcard() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![qualified_wildcard(TableReference::bare("test"))])?
+ .build()?;
+ let expected =
+ "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+ assert_plan_eq(plan, expected)
+ }
+
+ #[test]
+ fn test_expand_qualified_wildcard_in_subquery() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![qualified_wildcard(TableReference::bare("test"))])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(plan)
+ .project(vec![wildcard()])?
+ .build()?;
+ let expected =
+ "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+ assert_plan_eq(plan, expected)
+ }
+
+ #[test]
+ fn test_expand_wildcard_in_subquery() -> Result<()> {
+ let projection_a = LogicalPlanBuilder::from(test_table_scan()?)
+ .project(vec![col("a")])?
+ .build()?;
+ let subquery = LogicalPlanBuilder::from(projection_a)
+ .project(vec![wildcard()])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(test_table_scan()?)
+ .filter(in_subquery(col("a"), Arc::new(subquery)))?
+ .project(vec![wildcard()])?
+ .build()?;
+ let expected = "\
+ Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n Filter: test.a IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+ \n Subquery: [a:UInt32]\
+ \n Projection: test.a [a:UInt32]\
+ \n Projection: test.a [a:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+ assert_plan_eq(plan, expected)
+ }
+
+ #[test]
+ fn test_subquery_schema() -> Result<()> {
+ let analyzer =
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
+ let options = ConfigOptions::default();
+ let subquery = LogicalPlanBuilder::from(test_table_scan()?)
+ .project(vec![wildcard()])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(subquery)
+ .alias("sub")?
+ .project(vec![wildcard()])?
+ .build()?;
+ let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _|
{})?;
+ for x in analyzed_plan.inputs() {
+ for field in x.schema().fields() {
+ assert_ne!(field.name(), "*");
+ }
+ }
+ Ok(())
+ }
+
+ fn employee_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("first_name", DataType::Utf8, false),
+ Field::new("last_name", DataType::Utf8, false),
+ Field::new("state", DataType::Utf8, false),
+ Field::new("salary", DataType::Int32, false),
+ ])
+ }
+
+ #[test]
+ fn plan_using_join_wildcard_projection() -> Result<()> {
+ let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?;
+
+ let plan = table_scan(Some("t1"), &employee_schema(), None)?
+ .join_using(t2, JoinType::Inner, vec!["id"])?
+ .project(vec![wildcard()])?
+ .build()?;
+
+ let expected = "Projection: *\
+ \n Inner Join: Using t1.id = t2.id\
+ \n TableScan: t1\
+ \n TableScan: t2";
+
+ assert_eq!(expected, format!("{plan}"));
+
+ let analyzer =
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
+ let options = ConfigOptions::default();
+
+ let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _|
{})?;
+
+ // id column should only show up once in projection
+ let expected = "Projection: t1.id, t1.first_name, t1.last_name,
t1.state, t1.salary, t2.first_name, t2.last_name, t2.state, t2.salary\
+ \n Inner Join: Using t1.id = t2.id\
+ \n TableScan: t1\
+ \n TableScan: t2";
+ assert_eq!(expected, format!("{analyzed_plan}"));
+
+ Ok(())
+ }
+}
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index 73ab37cb11..b69b8410da 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -23,6 +23,7 @@ use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
+use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder,
TableScan};
/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
@@ -93,7 +94,10 @@ fn generate_projection_expr(
)));
}
} else {
- exprs.push(Expr::Wildcard { qualifier: None });
+ exprs.push(Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ });
}
Ok(exprs)
}
@@ -178,7 +182,7 @@ mod tests {
let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
let expected = "Filter: x.a = Int32(1)\
\n SubqueryAlias: x\
- \n Projection: y.a, y.b\
+ \n Projection: *\
\n TableScan: y";
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan,
expected)
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index 91ee8a9e10..6e2afeca88 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -30,6 +30,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, LogicalPlan};
use crate::analyzer::count_wildcard_rule::CountWildcardRule;
+use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
@@ -38,6 +39,7 @@ use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;
pub mod count_wildcard_rule;
+pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod subquery;
@@ -89,6 +91,9 @@ impl Analyzer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
+ // Every rule that will generate [Expr::Wildcard] should be placed
in front of [ExpandWildcardRule].
+ Arc::new(ExpandWildcardRule::new()),
+ // [Expr::Wildcard] should be expanded before [TypeCoercion]
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
];
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 2bb859d84a..7392028ba7 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -21,16 +21,20 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, IntervalUnit};
+use crate::analyzer::AnalyzerRule;
+use crate::utils::NamePreserver;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
DFSchema,
DataFusionError, Result, ScalarValue,
};
+use datafusion_expr::builder::project_with_column_index;
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
ScalarFunction,
WindowFunction,
};
+use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
use datafusion_expr::expr_schema::cast_subquery;
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::Subquery;
@@ -47,13 +51,10 @@ use datafusion_expr::type_coercion::{is_datetime,
is_utf8_or_large_utf8};
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
not,
- AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, Operator,
ScalarUDF,
- WindowFrame, WindowFrameBound, WindowFrameUnits,
+ AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, Operator,
+ Projection, ScalarUDF, Union, WindowFrame, WindowFrameBound,
WindowFrameUnits,
};
-use crate::analyzer::AnalyzerRule;
-use crate::utils::NamePreserver;
-
#[derive(Default)]
pub struct TypeCoercion {}
@@ -122,6 +123,7 @@ fn analyze_internal(
})?
// coerce join expressions specially
.map_data(|plan| expr_rewrite.coerce_joins(plan))?
+ .map_data(|plan| expr_rewrite.coerce_union(plan))?
// recompute the schema after the expressions have been rewritten as the
types may have changed
.map_data(|plan| plan.recompute_schema())
}
@@ -168,6 +170,39 @@ impl<'a> TypeCoercionRewriter<'a> {
Ok(LogicalPlan::Join(join))
}
+ /// Corece the union inputs after expanding the wildcard expressions
+ ///
+ /// Union inputs must have the same schema, so we coerce the expressions
to match the schema
+ /// after expanding the wildcard expressions
+ fn coerce_union(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
+ let LogicalPlan::Union(union) = plan else {
+ return Ok(plan);
+ };
+
+ let inputs = union
+ .inputs
+ .into_iter()
+ .map(|p| {
+ let plan = coerce_plan_expr_for_schema(&p, &union.schema)?;
+ match plan {
+ LogicalPlan::Projection(Projection { expr, input, .. }) =>
{
+ Ok(Arc::new(project_with_column_index(
+ expr,
+ input,
+ Arc::clone(&union.schema),
+ )?))
+ }
+ other_plan => Ok(Arc::new(other_plan)),
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(LogicalPlan::Union(Union {
+ inputs,
+ schema: Arc::clone(&union.schema),
+ }))
+ }
+
fn coerce_join_filter(&self, expr: Expr) -> Result<Expr> {
let expr_type = expr.get_type(self.schema)?;
match expr_type {
@@ -1286,7 +1321,6 @@ mod test {
.eq(cast(lit("1998-03-18"), DataType::Date32));
let empty = empty();
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr],
empty)?);
- dbg!(&plan);
let expected =
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond,
None)) = CAST(CAST(Utf8(\"1998-03-18\") AS Date32) AS Timestamp(Nanosecond,
None))\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan,
expected)?;
@@ -1473,7 +1507,6 @@ mod test {
));
let empty = empty();
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr],
empty)?);
- dbg!(&plan);
let expected =
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond,
None)) - CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None))\n
EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan,
expected)?;
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs
b/datafusion/optimizer/tests/optimizer_integration.rs
index aaa5eec395..93dd49b174 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -335,6 +335,27 @@ fn test_propagate_empty_relation_inner_join_and_unions() {
assert_eq!(expected, format!("{plan}"));
}
+#[test]
+fn select_wildcard_with_repeated_column() {
+ let sql = "SELECT *, col_int32 FROM test";
+ let err = test_sql(sql).expect_err("query should have failed");
+ assert_eq!(
+ "expand_wildcard_rule\ncaused by\nError during planning: Projections
require unique expression names but the expression \"test.col_int32\" at
position 0 and \"test.col_int32\" at position 7 have the same name. Consider
aliasing (\"AS\") one of them.",
+ err.strip_backtrace()
+ );
+}
+
+#[test]
+fn select_wildcard_with_repeated_column_but_is_aliased() {
+ let sql = "SELECT *, col_int32 as col_32 FROM test";
+
+ let plan = test_sql(sql).unwrap();
+ let expected = "Projection: test.col_int32, test.col_uint32,
test.col_utf8, test.col_date32, test.col_date64, test.col_ts_nano_none,
test.col_ts_nano_utc, test.col_int32 AS col_32\
+ \n TableScan: test projection=[col_int32, col_uint32, col_utf8,
col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]";
+
+ assert_eq!(expected, format!("{plan}"));
+}
+
fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 6c4c07428b..6cbea5f0cf 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -22,8 +22,8 @@ use datafusion_common::{
exec_datafusion_err, internal_err, plan_datafusion_err, Result,
ScalarValue,
TableReference, UnnestOptions,
};
-use datafusion_expr::expr::Unnest;
use datafusion_expr::expr::{Alias, Placeholder};
+use datafusion_expr::expr::{Unnest, WildcardOptions};
use datafusion_expr::ExprFunctionExt;
use datafusion_expr::{
expr::{self, InList, Sort, WindowFunction},
@@ -556,7 +556,10 @@ pub fn parse_expr(
))),
ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
let qualifier = qualifier.to_owned().map(|x|
x.try_into()).transpose()?;
- Ok(Expr::Wildcard { qualifier })
+ Ok(Expr::Wildcard {
+ qualifier,
+ options: WildcardOptions::default(),
+ })
}
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
fun_name,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index ab81ce8af9..c7361c89c3 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -582,7 +582,7 @@ pub fn serialize_expr(
expr_type: Some(ExprType::InList(expr)),
}
}
- Expr::Wildcard { qualifier } => protobuf::LogicalExprNode {
+ Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
qualifier: qualifier.to_owned().map(|x| x.into()),
})),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a18fa03b2d..eb7cc5c4b9 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -58,7 +58,7 @@ use datafusion_common::{
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like,
ScalarFunction,
- Sort, Unnest,
+ Sort, Unnest, WildcardOptions,
};
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
use datafusion_expr::{
@@ -1977,7 +1977,10 @@ fn roundtrip_unnest() {
#[test]
fn roundtrip_wildcard() {
- let test_expr = Expr::Wildcard { qualifier: None };
+ let test_expr = Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ };
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
@@ -1987,6 +1990,7 @@ fn roundtrip_wildcard() {
fn roundtrip_qualified_wildcard() {
let test_expr = Expr::Wildcard {
qualifier: Some("foo".into()),
+ options: WildcardOptions::default(),
};
let ctx = SessionContext::new();
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index d16d08b041..b95414a8ca 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -22,6 +22,7 @@ use datafusion_common::{
internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
DFSchema,
Dependency, Result,
};
+use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{
expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame,
WindowFunctionDefinition,
@@ -420,13 +421,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
name: _,
arg: FunctionArgExpr::Wildcard,
operator: _,
- } => Ok(Expr::Wildcard { qualifier: None }),
+ } => Ok(Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }),
FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
self.sql_expr_to_logical_expr(arg, schema, planner_context)
}
- FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
- Ok(Expr::Wildcard { qualifier: None })
- }
+ FunctionArg::Unnamed(FunctionArgExpr::Wildcard) =>
Ok(Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }),
_ => not_impl_err!("Unsupported qualified wildcard argument:
{sql:?}"),
}
}
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index f2b4e0b4e4..7c94e5ead5 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -27,10 +27,10 @@ use sqlparser::ast::{
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
Result,
- ScalarValue,
+ ScalarValue, TableReference,
};
-use datafusion_expr::expr::InList;
use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::expr::{InList, WildcardOptions};
use datafusion_expr::{
lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like,
Literal,
Operator, TryCast,
@@ -661,6 +661,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
not_impl_err!("AnyOp not supported by ExprPlanner:
{binary_expr:?}")
}
+ SQLExpr::Wildcard => Ok(Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }),
+ SQLExpr::QualifiedWildcard(object_name) => Ok(Expr::Wildcard {
+ qualifier: Some(TableReference::from(object_name.to_string())),
+ options: WildcardOptions::default(),
+ }),
SQLExpr::Tuple(values) => self.parse_tuple(schema,
planner_context, values),
_ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
}
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 95a44dace3..339234d996 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -27,23 +27,23 @@ use crate::utils::{
};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
+use datafusion_common::UnnestOptions;
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
-use datafusion_common::{Column, UnnestOptions};
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
normalize_cols,
};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::utils::{
- expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
expr_to_columns,
- find_aggregate_exprs, find_window_exprs,
+ expr_as_column_expr, expr_to_columns, find_aggregate_exprs,
find_window_exprs,
};
use datafusion_expr::{
- Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
Partitioning,
+ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr,
Filter,
+ GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
- ReplaceSelectItem, WildcardAdditionalOptions, WindowType,
+ WildcardAdditionalOptions, WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem,
TableWithJoins};
@@ -82,7 +82,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
match_window_definitions(&mut select.projection,
&select.named_window)?;
- // process the SELECT expressions, with wildcards expanded.
+ // process the SELECT expressions
let select_exprs = self.prepare_select_exprs(
&base_plan,
select.projection,
@@ -515,8 +515,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
- ///
- /// Wildcards are expanded into the concrete list of columns.
fn prepare_select_exprs(
&self,
plan: &LogicalPlan,
@@ -570,49 +568,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
SelectItem::Wildcard(options) => {
Self::check_wildcard_options(&options)?;
-
if empty_from {
return plan_err!("SELECT * with no tables specified is not
valid");
}
- // do not expand from outer schema
- let expanded_exprs =
- expand_wildcard(plan.schema().as_ref(), plan,
Some(&options))?;
- // If there is a REPLACE statement, replace that column with
the given
- // replace expression. Column name remains the same.
- if let Some(replace) = options.opt_replace {
- self.replace_columns(
- plan,
- empty_from,
- planner_context,
- expanded_exprs,
- replace,
- )
- } else {
- Ok(expanded_exprs)
- }
+ let planned_options = self.plan_wildcard_options(
+ plan,
+ empty_from,
+ planner_context,
+ options,
+ )?;
+ Ok(vec![wildcard_with_options(planned_options)])
}
SelectItem::QualifiedWildcard(object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = idents_to_table_reference(object_name.0,
false)?;
- // do not expand from outer schema
- let expanded_exprs = expand_qualified_wildcard(
- &qualifier,
- plan.schema().as_ref(),
- Some(&options),
+ let planned_options = self.plan_wildcard_options(
+ plan,
+ empty_from,
+ planner_context,
+ options,
)?;
- // If there is a REPLACE statement, replace that column with
the given
- // replace expression. Column name remains the same.
- if let Some(replace) = options.opt_replace {
- self.replace_columns(
- plan,
- empty_from,
- planner_context,
- expanded_exprs,
- replace,
- )
- } else {
- Ok(expanded_exprs)
- }
+ Ok(vec![qualified_wildcard_with_options(
+ qualifier,
+ planned_options,
+ )])
}
}
}
@@ -637,40 +616,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
/// If there is a REPLACE statement in the projected expression in the
form of
- /// "REPLACE (some_column_within_an_expr AS some_column)", this function
replaces
- /// that column with the given replace expression. Column name remains the
same.
- /// Multiple REPLACEs are also possible with comma separations.
- fn replace_columns(
+ /// "REPLACE (some_column_within_an_expr AS some_column)", we should plan
the
+ /// replace expressions first.
+ fn plan_wildcard_options(
&self,
plan: &LogicalPlan,
empty_from: bool,
planner_context: &mut PlannerContext,
- mut exprs: Vec<Expr>,
- replace: ReplaceSelectItem,
- ) -> Result<Vec<Expr>> {
- for expr in exprs.iter_mut() {
- if let Expr::Column(Column { name, .. }) = expr {
- if let Some(item) = replace
- .items
- .iter()
- .find(|item| item.column_name.value == *name)
- {
- let new_expr = self.sql_select_to_rex(
+ options: WildcardAdditionalOptions,
+ ) -> Result<WildcardOptions> {
+ let planned_option = WildcardOptions {
+ ilike: options.opt_ilike,
+ exclude: options.opt_exclude,
+ except: options.opt_except,
+ replace: None,
+ rename: options.opt_rename,
+ };
+ if let Some(replace) = options.opt_replace {
+ let replace_expr = replace
+ .items
+ .iter()
+ .map(|item| {
+ Ok(self.sql_select_to_rex(
SelectItem::UnnamedExpr(item.expr.clone()),
plan,
empty_from,
planner_context,
)?[0]
- .clone();
- *expr = Expr::Alias(Alias {
- expr: Box::new(new_expr),
- relation: None,
- name: name.clone(),
- });
- }
- }
+ .clone())
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let planned_replace = PlannedReplaceSelectItem {
+ items: replace.items.into_iter().map(|i| *i).collect(),
+ planned_expressions: replace_expr,
+ };
+ Ok(planned_option.with_replace(planned_replace))
+ } else {
+ Ok(planned_option)
}
- Ok(exprs)
}
/// Wrap a plan in a projection
@@ -715,7 +698,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = LogicalPlanBuilder::from(input.clone())
.aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
.build()?;
-
let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
&agg.group_expr
} else {
diff --git a/datafusion/sql/src/unparser/expr.rs
b/datafusion/sql/src/unparser/expr.rs
index de130754ab..39511ea4d0 100644
--- a/datafusion/sql/src/unparser/expr.rs
+++ b/datafusion/sql/src/unparser/expr.rs
@@ -21,11 +21,13 @@ use datafusion_expr::ScalarUDF;
use sqlparser::ast::Value::SingleQuotedString;
use sqlparser::ast::{
self, BinaryOperator, Expr as AstExpr, Function, FunctionArg, Ident,
Interval,
- TimezoneInfo, UnaryOperator,
+ ObjectName, TimezoneInfo, UnaryOperator,
};
use std::sync::Arc;
use std::{fmt::Display, vec};
+use super::dialect::{DateFieldExtractStyle, IntervalStyle};
+use super::Unparser;
use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType};
use arrow::util::display::array_value_to_string;
use arrow_array::types::{
@@ -44,9 +46,6 @@ use datafusion_expr::{
Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Operator,
TryCast,
};
-use super::dialect::{DateFieldExtractStyle, IntervalStyle};
-use super::Unparser;
-
/// DataFusion's Exprs can represent either an `Expr` or an `OrderByExpr`
pub enum Unparsed {
// SQL Expression
@@ -159,7 +158,13 @@ impl Unparser<'_> {
let args = args
.iter()
.map(|e| {
- if matches!(e, Expr::Wildcard { qualifier: None }) {
+ if matches!(
+ e,
+ Expr::Wildcard {
+ qualifier: None,
+ ..
+ }
+ ) {
Ok(FunctionArg::Unnamed(ast::FunctionArgExpr::Wildcard))
} else {
self.expr_to_sql_inner(e).map(|e| {
@@ -477,8 +482,15 @@ impl Unparser<'_> {
format: None,
})
}
- Expr::Wildcard { qualifier: _ } => {
- not_impl_err!("Unsupported Expr conversion: {expr:?}")
+ // TODO: unparsing wildcard addition options
+ Expr::Wildcard { qualifier, .. } => {
+ if let Some(qualifier) = qualifier {
+ let idents: Vec<Ident> =
+
qualifier.to_vec().into_iter().map(Ident::new).collect();
+ Ok(ast::Expr::QualifiedWildcard(ObjectName(idents)))
+ } else {
+ Ok(ast::Expr::Wildcard)
+ }
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::GroupingSets(grouping_sets) => {
@@ -643,7 +655,13 @@ impl Unparser<'_> {
fn function_args_to_sql(&self, args: &[Expr]) ->
Result<Vec<ast::FunctionArg>> {
args.iter()
.map(|e| {
- if matches!(e, Expr::Wildcard { qualifier: None }) {
+ if matches!(
+ e,
+ Expr::Wildcard {
+ qualifier: None,
+ ..
+ }
+ ) {
Ok(ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Wildcard))
} else {
self.expr_to_sql(e)
@@ -1503,6 +1521,7 @@ mod tests {
use arrow_schema::DataType::Int8;
use ast::ObjectName;
use datafusion_common::TableReference;
+ use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::{
case, col, cube, exists, grouping_set, interval_datetime_lit,
interval_year_month_lit, lit, not, not_exists, out_ref_col,
placeholder, rollup,
@@ -1558,7 +1577,10 @@ mod tests {
fn expr_to_sql_ok() -> Result<()> {
let dummy_schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let dummy_logical_plan = table_scan(Some("t"), &dummy_schema, None)?
- .project(vec![Expr::Wildcard { qualifier: None }])?
+ .project(vec![Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }])?
.filter(col("a").eq(lit(1)))?
.build()?;
@@ -1749,7 +1771,10 @@ mod tests {
(sum(col("a")), r#"sum(a)"#),
(
count_udaf()
- .call(vec![Expr::Wildcard { qualifier: None }])
+ .call(vec![Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }])
.distinct()
.build()
.unwrap(),
@@ -1757,7 +1782,10 @@ mod tests {
),
(
count_udaf()
- .call(vec![Expr::Wildcard { qualifier: None }])
+ .call(vec![Expr::Wildcard {
+ qualifier: None,
+ options: WildcardOptions::default(),
+ }])
.filter(lit(true))
.build()
.unwrap(),
@@ -1833,11 +1861,11 @@ mod tests {
(Expr::Negative(Box::new(col("a"))), r#"-a"#),
(
exists(Arc::new(dummy_logical_plan.clone())),
- r#"EXISTS (SELECT t.a FROM t WHERE (t.a = 1))"#,
+ r#"EXISTS (SELECT * FROM t WHERE (t.a = 1))"#,
),
(
not_exists(Arc::new(dummy_logical_plan.clone())),
- r#"NOT EXISTS (SELECT t.a FROM t WHERE (t.a = 1))"#,
+ r#"NOT EXISTS (SELECT * FROM t WHERE (t.a = 1))"#,
),
(
try_cast(col("a"), DataType::Date64),
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 5a0317c47c..15efe2d2f0 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -58,7 +58,7 @@ mod common;
fn test_schema_support() {
quick_test(
"SELECT * FROM s1.test",
- "Projection: s1.test.t_date32, s1.test.t_date64\
+ "Projection: *\
\n TableScan: s1.test",
);
}
@@ -517,7 +517,7 @@ fn plan_copy_to_query() {
let plan = r#"
CopyTo: format=csv output_url=output.csv options: ()
Limit: skip=0, fetch=10
- Projection: test_decimal.id, test_decimal.price
+ Projection: *
TableScan: test_decimal
"#
.trim();
@@ -637,23 +637,13 @@ fn select_repeated_column() {
);
}
-#[test]
-fn select_wildcard_with_repeated_column() {
- let sql = "SELECT *, age FROM person";
- let err = logical_plan(sql).expect_err("query should have failed");
- assert_eq!(
- "Error during planning: Projections require unique expression names
but the expression \"person.age\" at position 3 and \"person.age\" at position
8 have the same name. Consider aliasing (\"AS\") one of them.",
- err.strip_backtrace()
- );
-}
-
#[test]
fn select_wildcard_with_repeated_column_but_is_aliased() {
quick_test(
- "SELECT *, first_name AS fn from person",
- "Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀,
person.first_name AS fn\
+ "SELECT *, first_name AS fn from person",
+ "Projection: *, person.first_name AS fn\
\n TableScan: person",
- );
+ );
}
#[test]
@@ -870,7 +860,7 @@ fn where_selection_with_ambiguous_column() {
#[test]
fn natural_join() {
let sql = "SELECT * FROM lineitem a NATURAL JOIN lineitem b";
- let expected = "Projection: a.l_item_id, a.l_description, a.price\
+ let expected = "Projection: *\
\n Inner Join: Using a.l_item_id = b.l_item_id,
a.l_description = b.l_description, a.price = b.price\
\n SubqueryAlias: a\
\n TableScan: lineitem\
@@ -906,7 +896,7 @@ fn natural_right_join() {
#[test]
fn natural_join_no_common_becomes_cross_join() {
let sql = "SELECT * FROM person a NATURAL JOIN lineitem b";
- let expected = "Projection: a.id, a.first_name, a.last_name, a.age,
a.state, a.salary, a.birth_date, a.😀, b.l_item_id, b.l_description, b.price\
+ let expected = "Projection: *\
\n CrossJoin:\
\n SubqueryAlias: a\
\n TableScan: person\
@@ -918,8 +908,7 @@ fn natural_join_no_common_becomes_cross_join() {
#[test]
fn using_join_multiple_keys() {
let sql = "SELECT * FROM person a join person b using (id, age)";
- let expected = "Projection: a.id, a.first_name, a.last_name, a.age,
a.state, a.salary, a.birth_date, a.😀, \
- b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
+ let expected = "Projection: *\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
@@ -933,8 +922,7 @@ fn using_join_multiple_keys_subquery() {
let sql =
"SELECT age FROM (SELECT * FROM person a join person b using (id, age,
state))";
let expected = "Projection: a.age\
- \n Projection: a.id, a.first_name, a.last_name,
a.age, a.state, a.salary, a.birth_date, a.😀, \
- b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
+ \n Projection: *\
\n Inner Join: Using a.id = b.id, a.age = b.age,
a.state = b.state\
\n SubqueryAlias: a\
\n TableScan: person\
@@ -946,8 +934,7 @@ fn using_join_multiple_keys_subquery() {
#[test]
fn using_join_multiple_keys_qualified_wildcard_select() {
let sql = "SELECT a.* FROM person a join person b using (id, age)";
- let expected =
- "Projection: a.id, a.first_name, a.last_name, a.age, a.state,
a.salary, a.birth_date, a.😀\
+ let expected = "Projection: a.*\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
@@ -959,8 +946,7 @@ fn using_join_multiple_keys_qualified_wildcard_select() {
#[test]
fn using_join_multiple_keys_select_all_columns() {
let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
- let expected = "Projection: a.id, a.first_name, a.last_name, a.age,
a.state, a.salary, a.birth_date, a.😀, \
- b.id, b.first_name, b.last_name, b.age, b.state, b.salary,
b.birth_date, b.😀\
+ let expected = "Projection: a.*, b.*\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
@@ -972,9 +958,7 @@ fn using_join_multiple_keys_select_all_columns() {
#[test]
fn using_join_multiple_keys_multiple_joins() {
let sql = "SELECT * FROM person a join person b using (id, age, state)
join person c using (id, age, state)";
- let expected = "Projection: a.id, a.first_name, a.last_name, a.age,
a.state, a.salary, a.birth_date, a.😀, \
- b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
- c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
+ let expected = "Projection: *\
\n Inner Join: Using a.id = c.id, a.age = c.age,
a.state = c.state\
\n Inner Join: Using a.id = b.id, a.age = b.age,
a.state = b.state\
\n SubqueryAlias: a\
@@ -1305,13 +1289,13 @@ fn select_binary_expr_nested() {
fn select_wildcard_with_groupby() {
quick_test(
r#"SELECT * FROM person GROUP BY id, first_name, last_name, age,
state, salary, birth_date, "😀""#,
- "Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀\
+ "Projection: *\
\n Aggregate: groupBy=[[person.id, person.first_name,
person.last_name, person.age, person.state, person.salary, person.birth_date,
person.😀]], aggr=[[]]\
\n TableScan: person",
);
quick_test(
"SELECT * FROM (SELECT first_name, last_name FROM person) AS a
GROUP BY first_name, last_name",
- "Projection: a.first_name, a.last_name\
+ "Projection: *\
\n Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\
\n SubqueryAlias: a\
\n Projection: person.first_name, person.last_name\
@@ -1474,7 +1458,7 @@ fn recursive_ctes() {
select * from numbers;";
quick_test(
sql,
- "Projection: numbers.n\
+ "Projection: *\
\n SubqueryAlias: numbers\
\n RecursiveQuery: is_distinct=false\
\n Projection: Int64(1) AS n\
@@ -1687,10 +1671,10 @@ fn
select_aggregate_with_non_column_inner_expression_with_groupby() {
#[test]
fn test_wildcard() {
quick_test(
- "SELECT * from person",
- "Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀\
+ "SELECT * from person",
+ "Projection: *\
\n TableScan: person",
- );
+ );
}
#[test]
@@ -2118,7 +2102,7 @@ fn project_wildcard_on_join_with_using() {
FROM lineitem \
JOIN lineitem as lineitem2 \
USING (l_item_id)";
- let expected = "Projection: lineitem.l_item_id, lineitem.l_description,
lineitem.price, lineitem2.l_description, lineitem2.price\
+ let expected = "Projection: *\
\n Inner Join: Using lineitem.l_item_id = lineitem2.l_item_id\
\n TableScan: lineitem\
\n SubqueryAlias: lineitem2\
@@ -3005,7 +2989,7 @@ fn exists_subquery_wildcard() {
let expected = "Projection: p.id\
\n Filter: EXISTS (<subquery>)\
\n Subquery:\
- \n Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀\
+ \n Projection: *\
\n Filter: person.last_name = outer_ref(p.last_name) AND
person.state = outer_ref(p.state)\
\n TableScan: person\
\n SubqueryAlias: p\
@@ -3092,13 +3076,13 @@ fn subquery_references_cte() {
cte AS (SELECT * FROM person) \
SELECT * FROM person WHERE EXISTS (SELECT * FROM cte WHERE id =
person.id)";
- let expected = "Projection: person.id, person.first_name,
person.last_name, person.age, person.state, person.salary, person.birth_date,
person.😀\
+ let expected = "Projection: *\
\n Filter: EXISTS (<subquery>)\
\n Subquery:\
- \n Projection: cte.id, cte.first_name, cte.last_name, cte.age,
cte.state, cte.salary, cte.birth_date, cte.😀\
+ \n Projection: *\
\n Filter: cte.id = outer_ref(person.id)\
\n SubqueryAlias: cte\
- \n Projection: person.id, person.first_name,
person.last_name, person.age, person.state, person.salary, person.birth_date,
person.😀\
+ \n Projection: *\
\n TableScan: person\
\n TableScan: person";
@@ -3113,7 +3097,7 @@ fn cte_with_no_column_names() {
) \
SELECT * FROM numbers;";
- let expected = "Projection: numbers.a, numbers.b, numbers.c\
+ let expected = "Projection: *\
\n SubqueryAlias: numbers\
\n Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\
\n EmptyRelation";
@@ -3129,7 +3113,7 @@ fn cte_with_column_names() {
) \
SELECT * FROM numbers;";
- let expected = "Projection: numbers.a, numbers.b, numbers.c\
+ let expected = "Projection: *\
\n SubqueryAlias: numbers\
\n Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\
\n Projection: Int64(1), Int64(2), Int64(3)\
@@ -3147,7 +3131,7 @@ fn cte_with_column_aliases_precedence() {
) \
SELECT * FROM numbers;";
- let expected = "Projection: numbers.a, numbers.b, numbers.c\
+ let expected = "Projection: *\
\n SubqueryAlias: numbers\
\n Projection: x AS a, y AS b, z AS c\
\n Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z\
@@ -3528,7 +3512,7 @@ fn test_select_all_inner_join() {
INNER JOIN orders \
ON orders.customer_id * 2 = person.id + 10";
- let expected = "Projection: person.id, person.first_name,
person.last_name, person.age, person.state, person.salary, person.birth_date,
person.😀, orders.order_id, orders.customer_id, orders.o_item_id, orders.qty,
orders.price, orders.delivered\
+ let expected = "Projection: *\
\n Inner Join: Filter: orders.customer_id * Int64(2) = person.id
+ Int64(10)\
\n TableScan: person\
\n TableScan: orders";
@@ -4245,7 +4229,7 @@ fn test_prepare_statement_to_plan_value_list() {
let sql = "PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1,
$1), (2, $2)) AS t (num, letter);";
let expected_plan = "Prepare: \"my_plan\" [Utf8, Utf8] \
- \n Projection: t.num, t.letter\
+ \n Projection: *\
\n SubqueryAlias: t\
\n Projection: column1 AS num, column2 AS letter\
\n Values: (Int64(1), $1), (Int64(2), $2)";
@@ -4260,7 +4244,7 @@ fn test_prepare_statement_to_plan_value_list() {
ScalarValue::from("a".to_string()),
ScalarValue::from("b".to_string()),
];
- let expected_plan = "Projection: t.num, t.letter\
+ let expected_plan = "Projection: *\
\n SubqueryAlias: t\
\n Projection: column1 AS num, column2 AS letter\
\n Values: (Int64(1), Utf8(\"a\")), (Int64(2), Utf8(\"b\"))";
@@ -4310,7 +4294,7 @@ fn test_table_alias() {
(select age from person) t2 \
) as f";
- let expected = "Projection: f.id, f.age\
+ let expected = "Projection: *\
\n SubqueryAlias: f\
\n CrossJoin:\
\n SubqueryAlias: t1\
@@ -4327,7 +4311,7 @@ fn test_table_alias() {
(select age from person) t2 \
) as f (c1, c2)";
- let expected = "Projection: f.c1, f.c2\
+ let expected = "Projection: *\
\n SubqueryAlias: f\
\n Projection: t1.id AS c1, t2.age AS c2\
\n CrossJoin:\
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index eae4f428b4..1e8850efad 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -177,6 +177,7 @@ initial_logical_plan
01)Projection: simple_explain_test.a, simple_explain_test.b,
simple_explain_test.c
02)--TableScan: simple_explain_test
logical_plan after inline_table_scan SAME TEXT AS ABOVE
+logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE
logical_plan after type_coercion SAME TEXT AS ABOVE
logical_plan after count_wildcard_rule SAME TEXT AS ABOVE
analyzed_logical_plan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index f217cbab07..49a18ca09d 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1195,12 +1195,12 @@ LIMIT 5
200 2000
# Trying to exclude non-existing column should give error
-statement error DataFusion error: Schema error: No field named e. Valid fields
are table1.a, table1.b, table1.c, table1.d.
+statement error
SELECT * EXCLUDE e
FROM table1
# similarly, except should raise error if excluded column is not in the table
-statement error DataFusion error: Schema error: No field named e. Valid fields
are table1.a, table1.b, table1.c, table1.d.
+statement error
SELECT * EXCEPT(e)
FROM table1
@@ -1214,7 +1214,7 @@ FROM table1
2 20 20 200 2000
# EXCEPT, or EXCLUDE shouldn't contain duplicate column names
-statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT
contains duplicate column names
+statement error
SELECT * EXCLUDE(a, a)
FROM table1
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 476ebe7ebe..ffbf54c4d9 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -637,8 +637,43 @@ SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1
SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1 WHERE (t1.v2 IS NULL);
----
+statement ok
+CREATE TABLE t3 (
+ id INT
+) as VALUES
+ (1),
+ (2),
+ (3)
+;
+
+statement ok
+CREATE TABLE t4 (
+ id TEXT
+) as VALUES
+ ('4'),
+ ('5'),
+ ('6')
+;
+
+# test type coersion for wildcard expansion
+query T rowsort
+(SELECT * FROM t3 ) UNION ALL (SELECT * FROM t4)
+----
+1
+2
+3
+4
+5
+6
+
statement ok
DROP TABLE t1;
statement ok
DROP TABLE t2;
+
+statement ok
+DROP TABLE t3;
+
+statement ok
+DROP TABLE t4;
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index dfc8826676..ddf6a7aabf 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3929,7 +3929,8 @@ b 1 3
a 1 4
b 5 5
-statement error DataFusion error: Error during planning: Projection references
non-aggregate values: Expression aggregate_test_100.c1 could not be resolved
from available columns: rn
+# Schema error: No field named aggregate_test_100.c1. Valid fields are rn.
+statement error
SELECT *
FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
FROM aggregate_test_100
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]