This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3438b35530 Move wildcard expansions to the analyzer (#11681)
3438b35530 is described below

commit 3438b355308afa23dba399f4aec5760969d054c5
Author: Jax Liu <[email protected]>
AuthorDate: Wed Aug 14 02:28:02 2024 +0800

    Move wildcard expansions to the analyzer (#11681)
    
    * allow qualified wildcard in the logical plan
    
    * move wildcard expansions to the analyzer
    
    * fix fmt
    
    * fix the view tests
    
    * expand wildcard for schema
    
    * fix for union query
    
    * cargo fmt clippy
    
    * move wildcard expanding tests to expand_wildcard_rule.rs
    
    * coercion the expanded wildcard expression in union
    
    * remove debug message
    
    * move wildcard options to logical plan
    
    * remove unused function
    
    * add the doc for expression function
    
    * fix cargo check
    
    * fix cargo fmt
    
    * fix test
    
    * extract expand_exprlist
    
    * expand wildcard for functional_dependencies
    
    * refine the doc
    
    * fix tests
    
    * fix expand exclude and except
    
    * remove unused import
    
    * fix check and update function
    
    * fix check
    
    * throw the error when exprlist to field
    
    * fix functional_dependency and exclude
    
    * fix projection_schema
    
    * fix the window functions
    
    * fix clippy and support unparsing wildcard
    
    * fix clippy and fmt
    
    * add the doc for util functions
    
    * fix unique expression check for projection
    
    * cargo fmt
    
    * move test and solve dependency issue
    
    * address review comments
    
    * add the fail reason
    
    * enhance the doc
    
    * add more doc
---
 datafusion/core/src/datasource/view.rs             |  44 ++-
 datafusion/core/src/execution/context/mod.rs       |   1 -
 datafusion/expr/src/expr.rs                        | 221 ++++++++++++++-
 datafusion/expr/src/expr_fn.rs                     |  45 ++-
 datafusion/expr/src/expr_rewriter/mod.rs           |   1 +
 datafusion/expr/src/expr_schema.rs                 |  21 +-
 datafusion/expr/src/logical_plan/builder.rs        |  41 +--
 datafusion/expr/src/logical_plan/plan.rs           |  55 +++-
 datafusion/expr/src/utils.rs                       | 150 ++++++++--
 .../optimizer/src/analyzer/count_wildcard_rule.rs  |   8 +-
 .../optimizer/src/analyzer/expand_wildcard_rule.rs | 304 +++++++++++++++++++++
 .../optimizer/src/analyzer/inline_table_scan.rs    |   8 +-
 datafusion/optimizer/src/analyzer/mod.rs           |   5 +
 datafusion/optimizer/src/analyzer/type_coercion.rs |  47 +++-
 .../optimizer/tests/optimizer_integration.rs       |  21 ++
 datafusion/proto/src/logical_plan/from_proto.rs    |   7 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |   2 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   8 +-
 datafusion/sql/src/expr/function.rs                |  13 +-
 datafusion/sql/src/expr/mod.rs                     |  12 +-
 datafusion/sql/src/select.rs                       | 120 ++++----
 datafusion/sql/src/unparser/expr.rs                |  54 +++-
 datafusion/sql/tests/sql_integration.rs            |  78 +++---
 datafusion/sqllogictest/test_files/explain.slt     |   1 +
 datafusion/sqllogictest/test_files/select.slt      |   6 +-
 datafusion/sqllogictest/test_files/union.slt       |  35 +++
 datafusion/sqllogictest/test_files/window.slt      |   3 +-
 27 files changed, 1057 insertions(+), 254 deletions(-)

diff --git a/datafusion/core/src/datasource/view.rs 
b/datafusion/core/src/datasource/view.rs
index 98d118c027..a81942bf76 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -19,17 +19,19 @@
 
 use std::{any::Any, sync::Arc};
 
-use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_common::Column;
-use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
-
 use crate::{
     error::Result,
     logical_expr::{Expr, LogicalPlan},
     physical_plan::ExecutionPlan,
 };
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::Column;
+use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
+use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
+use datafusion_optimizer::Analyzer;
 
 use crate::datasource::{TableProvider, TableType};
 
@@ -50,6 +52,7 @@ impl ViewTable {
         logical_plan: LogicalPlan,
         definition: Option<String>,
     ) -> Result<Self> {
+        let logical_plan = Self::apply_required_rule(logical_plan)?;
         let table_schema = logical_plan.schema().as_ref().to_owned().into();
 
         let view = Self {
@@ -61,6 +64,15 @@ impl ViewTable {
         Ok(view)
     }
 
+    fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
+        let options = ConfigOptions::default();
+        
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
+            logical_plan,
+            &options,
+            |_, _| {},
+        )
+    }
+
     /// Get definition ref
     pub fn definition(&self) -> Option<&String> {
         self.definition.as_ref()
@@ -232,6 +244,26 @@ mod tests {
 
         assert_batches_eq!(expected, &results);
 
+        let view_sql =
+            "CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as 
column1) FROM xyz";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx
+            .sql("SELECT * FROM replace_xyz")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = [
+            "+---------+---------+---------+",
+            "| column1 | column2 | column3 |",
+            "+---------+---------+---------+",
+            "| 2       | 2       | 3       |",
+            "| 8       | 5       | 6       |",
+            "+---------+---------+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
         Ok(())
     }
 
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index c63ffddd81..972a6f6437 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -718,7 +718,6 @@ impl SessionContext {
             }
             (_, Err(_)) => {
                 let table = Arc::new(ViewTable::try_new((*input).clone(), 
definition)?);
-
                 self.register_table(name, table)?;
                 self.return_empty_dataframe()
             }
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 5030a95d3c..b4d489cc7c 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -41,7 +41,10 @@ use datafusion_common::{
     internal_err, not_impl_err, plan_err, Column, DFSchema, Result, 
ScalarValue,
     TableReference,
 };
-use sqlparser::ast::NullTreatment;
+use sqlparser::ast::{
+    display_comma_separated, ExceptSelectItem, ExcludeSelectItem, 
IlikeSelectItem,
+    NullTreatment, RenameSelectItem, ReplaceSelectElement,
+};
 
 /// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
 ///
@@ -315,7 +318,10 @@ pub enum Expr {
     ///
     /// This expr has to be resolved to a list of columns before translating 
logical
     /// plan into physical plan.
-    Wildcard { qualifier: Option<TableReference> },
+    Wildcard {
+        qualifier: Option<TableReference>,
+        options: WildcardOptions,
+    },
     /// List of grouping set expressions. Only valid in the context of an 
aggregate
     /// GROUP BY expression list
     GroupingSet(GroupingSet),
@@ -970,6 +976,89 @@ impl GroupingSet {
     }
 }
 
+/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and 
Bigquery `EXCEPT`.
+#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+pub struct WildcardOptions {
+    /// `[ILIKE...]`.
+    ///  Snowflake syntax: 
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+    pub ilike: Option<IlikeSelectItem>,
+    /// `[EXCLUDE...]`.
+    ///  Snowflake syntax: 
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+    pub exclude: Option<ExcludeSelectItem>,
+    /// `[EXCEPT...]`.
+    ///  BigQuery syntax: 
<https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_except>
+    ///  Clickhouse syntax: 
<https://clickhouse.com/docs/en/sql-reference/statements/select#except>
+    pub except: Option<ExceptSelectItem>,
+    /// `[REPLACE]`
+    ///  BigQuery syntax: 
<https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_replace>
+    ///  Clickhouse syntax: 
<https://clickhouse.com/docs/en/sql-reference/statements/select#replace>
+    ///  Snowflake syntax: 
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+    pub replace: Option<PlannedReplaceSelectItem>,
+    /// `[RENAME ...]`.
+    ///  Snowflake syntax: 
<https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
+    pub rename: Option<RenameSelectItem>,
+}
+
+impl WildcardOptions {
+    pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
+        WildcardOptions {
+            ilike: self.ilike,
+            exclude: self.exclude,
+            except: self.except,
+            replace: Some(replace),
+            rename: self.rename,
+        }
+    }
+}
+
+impl Display for WildcardOptions {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        if let Some(ilike) = &self.ilike {
+            write!(f, " {ilike}")?;
+        }
+        if let Some(exclude) = &self.exclude {
+            write!(f, " {exclude}")?;
+        }
+        if let Some(except) = &self.except {
+            write!(f, " {except}")?;
+        }
+        if let Some(replace) = &self.replace {
+            write!(f, " {replace}")?;
+        }
+        if let Some(rename) = &self.rename {
+            write!(f, " {rename}")?;
+        }
+        Ok(())
+    }
+}
+
+/// The planned expressions for `REPLACE`
+#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
+pub struct PlannedReplaceSelectItem {
+    /// The original ast nodes
+    pub items: Vec<ReplaceSelectElement>,
+    /// The expression planned from the ast nodes. They will be used when 
expanding the wildcard.
+    pub planned_expressions: Vec<Expr>,
+}
+
+impl Display for PlannedReplaceSelectItem {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(f, "REPLACE")?;
+        write!(f, " ({})", display_comma_separated(&self.items))?;
+        Ok(())
+    }
+}
+
+impl PlannedReplaceSelectItem {
+    pub fn items(&self) -> &[ReplaceSelectElement] {
+        &self.items
+    }
+
+    pub fn expressions(&self) -> &[Expr] {
+        &self.planned_expressions
+    }
+}
+
 /// Fixed seed for the hashing so that Ords are consistent across runs
 const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
 
@@ -1720,8 +1809,9 @@ impl Expr {
             Expr::ScalarSubquery(subquery) => {
                 subquery.hash(hasher);
             }
-            Expr::Wildcard { qualifier } => {
+            Expr::Wildcard { qualifier, options } => {
                 qualifier.hash(hasher);
+                options.hash(hasher);
             }
             Expr::GroupingSet(grouping_set) => {
                 mem::discriminant(grouping_set).hash(hasher);
@@ -2242,9 +2332,9 @@ impl fmt::Display for Expr {
                     write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
                 }
             }
-            Expr::Wildcard { qualifier } => match qualifier {
-                Some(qualifier) => write!(f, "{qualifier}.*"),
-                None => write!(f, "*"),
+            Expr::Wildcard { qualifier, options } => match qualifier {
+                Some(qualifier) => write!(f, "{qualifier}.*{options}"),
+                None => write!(f, "*{options}"),
             },
             Expr::GroupingSet(grouping_sets) => match grouping_sets {
                 GroupingSet::Rollup(exprs) => {
@@ -2543,9 +2633,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) 
-> Result<String> {
         Expr::Sort { .. } => {
             internal_err!("Create physical name does not support sort 
expression")
         }
-        Expr::Wildcard { .. } => {
-            internal_err!("Create physical name does not support wildcard")
-        }
+        Expr::Wildcard { qualifier, options } => match qualifier {
+            Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
+            None => Ok(format!("*{}", options)),
+        },
         Expr::Placeholder(_) => {
             internal_err!("Create physical name does not support placeholder")
         }
@@ -2558,7 +2649,12 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) 
-> Result<String> {
 #[cfg(test)]
 mod test {
     use crate::expr_fn::col;
-    use crate::{case, lit, ColumnarValue, ScalarUDF, ScalarUDFImpl, 
Volatility};
+    use crate::{
+        case, lit, qualified_wildcard, wildcard, wildcard_with_options, 
ColumnarValue,
+        ScalarUDF, ScalarUDFImpl, Volatility,
+    };
+    use sqlparser::ast;
+    use sqlparser::ast::{Ident, IdentWithAlias};
     use std::any::Any;
 
     #[test]
@@ -2859,4 +2955,109 @@ mod test {
         );
         assert_eq!(find_df_window_func("not_exist"), None)
     }
+
+    #[test]
+    fn test_display_wildcard() {
+        assert_eq!(format!("{}", wildcard()), "*");
+        assert_eq!(format!("{}", qualified_wildcard("t1")), "t1.*");
+        assert_eq!(
+            format!(
+                "{}",
+                wildcard_with_options(wildcard_options(
+                    Some(IlikeSelectItem {
+                        pattern: "c1".to_string()
+                    }),
+                    None,
+                    None,
+                    None,
+                    None
+                ))
+            ),
+            "* ILIKE 'c1'"
+        );
+        assert_eq!(
+            format!(
+                "{}",
+                wildcard_with_options(wildcard_options(
+                    None,
+                    Some(ExcludeSelectItem::Multiple(vec![
+                        Ident::from("c1"),
+                        Ident::from("c2")
+                    ])),
+                    None,
+                    None,
+                    None
+                ))
+            ),
+            "* EXCLUDE (c1, c2)"
+        );
+        assert_eq!(
+            format!(
+                "{}",
+                wildcard_with_options(wildcard_options(
+                    None,
+                    None,
+                    Some(ExceptSelectItem {
+                        first_element: Ident::from("c1"),
+                        additional_elements: vec![Ident::from("c2")]
+                    }),
+                    None,
+                    None
+                ))
+            ),
+            "* EXCEPT (c1, c2)"
+        );
+        assert_eq!(
+            format!(
+                "{}",
+                wildcard_with_options(wildcard_options(
+                    None,
+                    None,
+                    None,
+                    Some(PlannedReplaceSelectItem {
+                        items: vec![ReplaceSelectElement {
+                            expr: ast::Expr::Identifier(Ident::from("c1")),
+                            column_name: Ident::from("a1"),
+                            as_keyword: false
+                        }],
+                        planned_expressions: vec![]
+                    }),
+                    None
+                ))
+            ),
+            "* REPLACE (c1 a1)"
+        );
+        assert_eq!(
+            format!(
+                "{}",
+                wildcard_with_options(wildcard_options(
+                    None,
+                    None,
+                    None,
+                    None,
+                    Some(RenameSelectItem::Multiple(vec![IdentWithAlias {
+                        ident: Ident::from("c1"),
+                        alias: Ident::from("a1")
+                    }]))
+                ))
+            ),
+            "* RENAME (c1 AS a1)"
+        )
+    }
+
+    fn wildcard_options(
+        opt_ilike: Option<IlikeSelectItem>,
+        opt_exclude: Option<ExcludeSelectItem>,
+        opt_except: Option<ExceptSelectItem>,
+        opt_replace: Option<PlannedReplaceSelectItem>,
+        opt_rename: Option<RenameSelectItem>,
+    ) -> WildcardOptions {
+        WildcardOptions {
+            ilike: opt_ilike,
+            exclude: opt_exclude,
+            except: opt_except,
+            replace: opt_replace,
+            rename: opt_rename,
+        }
+    }
 }
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index e9c5485656..4e60223996 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -19,7 +19,7 @@
 
 use crate::expr::{
     AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, 
InSubquery,
-    Placeholder, TryCast, Unnest, WindowFunction,
+    Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction,
 };
 use crate::function::{
     AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
@@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{
     parse_interval_day_time, parse_interval_month_day_nano, 
parse_interval_year_month,
 };
 use arrow::datatypes::{DataType, Field};
-use datafusion_common::{plan_err, Column, Result, ScalarValue};
+use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
 use sqlparser::ast::NullTreatment;
 use std::any::Any;
 use std::fmt::Debug;
@@ -119,7 +119,46 @@ pub fn placeholder(id: impl Into<String>) -> Expr {
 /// assert_eq!(p.to_string(), "*")
 /// ```
 pub fn wildcard() -> Expr {
-    Expr::Wildcard { qualifier: None }
+    Expr::Wildcard {
+        qualifier: None,
+        options: WildcardOptions::default(),
+    }
+}
+
+/// Create an '*' [`Expr::Wildcard`] expression with the wildcard options
+pub fn wildcard_with_options(options: WildcardOptions) -> Expr {
+    Expr::Wildcard {
+        qualifier: None,
+        options,
+    }
+}
+
+/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns 
from a specific table
+///
+/// # Example
+///
+/// ```rust
+/// # use datafusion_common::TableReference;
+/// # use datafusion_expr::{qualified_wildcard};
+/// let p = qualified_wildcard(TableReference::bare("t"));
+/// assert_eq!(p.to_string(), "t.*")
+/// ```
+pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> Expr {
+    Expr::Wildcard {
+        qualifier: Some(qualifier.into()),
+        options: WildcardOptions::default(),
+    }
+}
+
+/// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options
+pub fn qualified_wildcard_with_options(
+    qualifier: impl Into<TableReference>,
+    options: WildcardOptions,
+) -> Expr {
+    Expr::Wildcard {
+        qualifier: Some(qualifier.into()),
+        options,
+    }
 }
 
 /// Return a new expression `left <op> right`
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs 
b/datafusion/expr/src/expr_rewriter/mod.rs
index 0dc41d4a9a..32e621350e 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -248,6 +248,7 @@ fn coerce_exprs_for_schema(
                     Expr::Alias(Alias { expr, name, .. }) => {
                         Ok(expr.cast_to(new_type, src_schema)?.alias(name))
                     }
+                    Expr::Wildcard { .. } => Ok(expr),
                     _ => expr.cast_to(new_type, src_schema),
                 }
             } else {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 73123819ba..af35b9a991 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -28,8 +28,8 @@ use crate::{utils, LogicalPlan, Projection, Subquery, 
WindowFunctionDefinition};
 use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::{
-    internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, 
ExprSchema,
-    Result, TableReference,
+    not_impl_err, plan_datafusion_err, plan_err, Column, ExprSchema, Result,
+    TableReference,
 };
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -244,13 +244,7 @@ impl ExprSchemable for Expr {
                     )
                 })
             }
-            Expr::Wildcard { qualifier } => {
-                // Wildcard do not really have a type and do not appear in 
projections
-                match qualifier {
-                    Some(_) => internal_err!("QualifiedWildcard expressions 
are not valid in a logical query plan"),
-                    None => Ok(DataType::Null)
-                }
-            }
+            Expr::Wildcard { .. } => Ok(DataType::Null),
             Expr::GroupingSet(_) => {
                 // grouping sets do not really have a type and do not appear 
in projections
                 Ok(DataType::Null)
@@ -362,12 +356,7 @@ impl ExprSchemable for Expr {
             | Expr::SimilarTo(Like { expr, pattern, .. }) => {
                 Ok(expr.nullable(input_schema)? || 
pattern.nullable(input_schema)?)
             }
-            Expr::Wildcard { qualifier } => match qualifier {
-                Some(_) => internal_err!(
-                    "QualifiedWildcard expressions are not valid in a logical 
query plan"
-                ),
-                None => Ok(false),
-            },
+            Expr::Wildcard { .. } => Ok(false),
             Expr::GroupingSet(_) => {
                 // grouping sets do not really have the concept of nullable 
and do not appear
                 // in projections
@@ -548,7 +537,7 @@ mod tests {
     use super::*;
     use crate::{col, lit};
 
-    use datafusion_common::{DFSchema, ScalarValue};
+    use datafusion_common::{internal_err, DFSchema, ScalarValue};
 
     macro_rules! test_is_expr_nullable {
         ($EXPR_TYPE:ident) => {{
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 4ef346656f..e95fcdd128 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -38,9 +38,8 @@ use crate::logical_plan::{
 };
 use crate::type_coercion::binary::{comparison_coercion, values_coercion};
 use crate::utils::{
-    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
-    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
-    group_window_expr_by_sort_keys,
+    can_hash, columnize_expr, compare_sort_expr, expr_to_columns,
+    find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
 };
 use crate::{
     and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr,
@@ -1316,7 +1315,7 @@ fn add_group_by_exprs_from_dependencies(
     Ok(group_expr)
 }
 /// Errors if one or more expressions have equal names.
-pub(crate) fn validate_unique_names<'a>(
+pub fn validate_unique_names<'a>(
     node_name: &str,
     expressions: impl IntoIterator<Item = &'a Expr>,
 ) -> Result<()> {
@@ -1356,6 +1355,7 @@ pub fn project_with_column_index(
                 ref name,
             }) if name != schema.field(i).name() => 
e.alias(schema.field(i).name()),
             Expr::Alias { .. } | Expr::Column { .. } => e,
+            Expr::Wildcard { .. } => e,
             _ => e.alias(schema.field(i).name()),
         })
         .collect::<Vec<_>>();
@@ -1440,22 +1440,11 @@ pub fn project(
     plan: LogicalPlan,
     expr: impl IntoIterator<Item = impl Into<Expr>>,
 ) -> Result<LogicalPlan> {
-    // TODO: move it into analyzer
-    let input_schema = plan.schema();
     let mut projected_expr = vec![];
     for e in expr {
         let e = e.into();
         match e {
-            Expr::Wildcard { qualifier: None } => {
-                projected_expr.extend(expand_wildcard(input_schema, &plan, 
None)?)
-            }
-            Expr::Wildcard {
-                qualifier: Some(qualifier),
-            } => projected_expr.extend(expand_qualified_wildcard(
-                &qualifier,
-                input_schema,
-                None,
-            )?),
+            Expr::Wildcard { .. } => projected_expr.push(e),
             _ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, 
&plan)?),
         }
     }
@@ -1807,26 +1796,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn plan_using_join_wildcard_projection() -> Result<()> {
-        let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?;
-
-        let plan = table_scan(Some("t1"), &employee_schema(), None)?
-            .join_using(t2, JoinType::Inner, vec!["id"])?
-            .project(vec![Expr::Wildcard { qualifier: None }])?
-            .build()?;
-
-        // id column should only show up once in projection
-        let expected = "Projection: t1.id, t1.first_name, t1.last_name, 
t1.state, t1.salary, t2.first_name, t2.last_name, t2.state, t2.salary\
-        \n  Inner Join: Using t1.id = t2.id\
-        \n    TableScan: t1\
-        \n    TableScan: t2";
-
-        assert_eq!(expected, format!("{plan}"));
-
-        Ok(())
-    }
-
     #[test]
     fn plan_builder_union() -> Result<()> {
         let plan =
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index c5538d8880..2bab6d516a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -31,8 +31,9 @@ use crate::logical_plan::display::{GraphvizVisitor, 
IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
 use crate::logical_plan::{DmlStatement, Statement};
 use crate::utils::{
-    enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
-    grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
+    enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
+    find_out_reference_exprs, grouping_set_expr_count, 
grouping_set_to_exprlist,
+    split_conjunction,
 };
 use crate::{
     build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
@@ -1977,7 +1978,9 @@ impl Projection {
         input: Arc<LogicalPlan>,
         schema: DFSchemaRef,
     ) -> Result<Self> {
-        if expr.len() != schema.fields().len() {
+        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
+            && expr.len() != schema.fields().len()
+        {
             return plan_err!("Projection has mismatch between number of 
expressions ({}) and number of fields in schema ({})", expr.len(), 
schema.fields().len());
         }
         Ok(Self {
@@ -2763,20 +2766,48 @@ fn calc_func_dependencies_for_project(
     // Calculate expression indices (if present) in the input schema.
     let proj_indices = exprs
         .iter()
-        .filter_map(|expr| {
-            let expr_name = match expr {
-                Expr::Alias(alias) => {
-                    format!("{}", alias.expr)
-                }
-                _ => format!("{}", expr),
-            };
-            input_fields.iter().position(|item| *item == expr_name)
+        .map(|expr| match expr {
+            Expr::Wildcard { qualifier, options } => {
+                let wildcard_fields = exprlist_to_fields(
+                    vec![&Expr::Wildcard {
+                        qualifier: qualifier.clone(),
+                        options: options.clone(),
+                    }],
+                    input,
+                )?;
+                Ok::<_, DataFusionError>(
+                    wildcard_fields
+                        .into_iter()
+                        .filter_map(|(qualifier, f)| {
+                            let flat_name = qualifier
+                                .map(|t| format!("{}.{}", t, f.name()))
+                                .unwrap_or(f.name().clone());
+                            input_fields.iter().position(|item| *item == 
flat_name)
+                        })
+                        .collect::<Vec<_>>(),
+                )
+            }
+            Expr::Alias(alias) => Ok(input_fields
+                .iter()
+                .position(|item| *item == format!("{}", alias.expr))
+                .map(|i| vec![i])
+                .unwrap_or(vec![])),
+            _ => Ok(input_fields
+                .iter()
+                .position(|item| *item == format!("{}", expr))
+                .map(|i| vec![i])
+                .unwrap_or(vec![])),
         })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .flatten()
         .collect::<Vec<_>>();
+
+    let len = exprlist_len(exprs, input.schema(), 
Some(find_base_plan(input).schema()))?;
     Ok(input
         .schema()
         .functional_dependencies()
-        .project_functional_dependencies(&proj_indices, exprs.len()))
+        .project_functional_dependencies(&proj_indices, len))
 }
 
 /// Sorts its input according to a list of sort expressions.
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 7b650d1ab4..4db5061e8f 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -21,7 +21,7 @@ use std::cmp::Ordering;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-use crate::expr::{Alias, Sort, WindowFunction};
+use crate::expr::{Alias, Sort, WildcardOptions, WindowFunction};
 use crate::expr_rewriter::strip_outer_reference;
 use crate::{
     and, BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, 
Operator,
@@ -34,11 +34,11 @@ use datafusion_common::tree_node::{
 };
 use datafusion_common::utils::get_at_indices;
 use datafusion_common::{
-    internal_err, plan_datafusion_err, plan_err, Column, DFSchema, 
DFSchemaRef, Result,
-    TableReference,
+    internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
+    DataFusionError, Result, TableReference,
 };
 
-use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, 
WildcardAdditionalOptions};
+use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};
 
 pub use 
datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
 
@@ -377,7 +377,7 @@ fn get_exprs_except_skipped(
 pub fn expand_wildcard(
     schema: &DFSchema,
     plan: &LogicalPlan,
-    wildcard_options: Option<&WildcardAdditionalOptions>,
+    wildcard_options: Option<&WildcardOptions>,
 ) -> Result<Vec<Expr>> {
     let using_columns = plan.using_columns()?;
     let mut columns_to_skip = using_columns
@@ -401,9 +401,9 @@ pub fn expand_wildcard(
                 .collect::<Vec<_>>()
         })
         .collect::<HashSet<_>>();
-    let excluded_columns = if let Some(WildcardAdditionalOptions {
-        opt_exclude,
-        opt_except,
+    let excluded_columns = if let Some(WildcardOptions {
+        exclude: opt_exclude,
+        except: opt_except,
         ..
     }) = wildcard_options
     {
@@ -420,7 +420,7 @@ pub fn expand_wildcard(
 pub fn expand_qualified_wildcard(
     qualifier: &TableReference,
     schema: &DFSchema,
-    wildcard_options: Option<&WildcardAdditionalOptions>,
+    wildcard_options: Option<&WildcardOptions>,
 ) -> Result<Vec<Expr>> {
     let qualified_indices = schema.fields_indices_with_qualified(qualifier);
     let projected_func_dependencies = schema
@@ -435,9 +435,9 @@ pub fn expand_qualified_wildcard(
     let qualified_dfschema =
         DFSchema::try_from_qualified_schema(qualifier.clone(), 
&qualified_schema)?
             .with_functional_dependencies(projected_func_dependencies)?;
-    let excluded_columns = if let Some(WildcardAdditionalOptions {
-        opt_exclude,
-        opt_except,
+    let excluded_columns = if let Some(WildcardOptions {
+        exclude: opt_exclude,
+        except: opt_except,
         ..
     }) = wildcard_options
     {
@@ -731,11 +731,129 @@ pub fn exprlist_to_fields<'a>(
     plan: &LogicalPlan,
 ) -> Result<Vec<(Option<TableReference>, Arc<Field>)>> {
     // look for exact match in plan's output schema
-    let input_schema = &plan.schema();
-    exprs
+    let wildcard_schema = find_base_plan(plan).schema();
+    let input_schema = plan.schema();
+    let result = exprs
         .into_iter()
-        .map(|e| e.to_field(input_schema))
-        .collect()
+        .map(|e| match e {
+            Expr::Wildcard { qualifier, options } => match qualifier {
+                None => {
+                    let excluded: Vec<String> = get_excluded_columns(
+                        options.exclude.as_ref(),
+                        options.except.as_ref(),
+                        wildcard_schema,
+                        None,
+                    )?
+                    .into_iter()
+                    .map(|c| c.flat_name())
+                    .collect();
+                    Ok::<_, DataFusionError>(
+                        wildcard_schema
+                            .field_names()
+                            .iter()
+                            .enumerate()
+                            .filter(|(_, s)| !excluded.contains(s))
+                            .map(|(i, _)| wildcard_schema.qualified_field(i))
+                            .map(|(qualifier, f)| {
+                                (qualifier.cloned(), Arc::new(f.to_owned()))
+                            })
+                            .collect::<Vec<_>>(),
+                    )
+                }
+                Some(qualifier) => {
+                    let excluded: Vec<String> = get_excluded_columns(
+                        options.exclude.as_ref(),
+                        options.except.as_ref(),
+                        wildcard_schema,
+                        Some(qualifier),
+                    )?
+                    .into_iter()
+                    .map(|c| c.flat_name())
+                    .collect();
+                    Ok(wildcard_schema
+                        .fields_with_qualified(qualifier)
+                        .into_iter()
+                        .filter_map(|field| {
+                            let flat_name = format!("{}.{}", qualifier, 
field.name());
+                            if excluded.contains(&flat_name) {
+                                None
+                            } else {
+                                Some((
+                                    Some(qualifier.clone()),
+                                    Arc::new(field.to_owned()),
+                                ))
+                            }
+                        })
+                        .collect::<Vec<_>>())
+                }
+            },
+            _ => Ok(vec![e.to_field(input_schema)?]),
+        })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .flatten()
+        .collect();
+    Ok(result)
+}
+
+/// Find the suitable base plan to expand the wildcard expression recursively.
+/// When planning [LogicalPlan::Window] and [LogicalPlan::Aggregate], we will 
generate
+/// an intermediate plan based on the relation plan (e.g. 
[LogicalPlan::TableScan], [LogicalPlan::Subquery], ...).
+/// If we expand a wildcard expression basing the intermediate plan, we could 
get some duplicate fields.
+pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan {
+    match input {
+        LogicalPlan::Window(window) => find_base_plan(&window.input),
+        LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input),
+        _ => input,
+    }
+}
+
+/// Count the number of real fields. We should expand the wildcard expression 
to get the actual number.
+pub fn exprlist_len(
+    exprs: &[Expr],
+    schema: &DFSchemaRef,
+    wildcard_schema: Option<&DFSchemaRef>,
+) -> Result<usize> {
+    exprs
+        .iter()
+        .map(|e| match e {
+            Expr::Wildcard {
+                qualifier: None,
+                options,
+            } => {
+                let excluded = get_excluded_columns(
+                    options.exclude.as_ref(),
+                    options.except.as_ref(),
+                    wildcard_schema.unwrap_or(schema),
+                    None,
+                )?
+                .into_iter()
+                .collect::<HashSet<Column>>();
+                Ok(
+                    
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
+                        .len(),
+                )
+            }
+            Expr::Wildcard {
+                qualifier: Some(qualifier),
+                options,
+            } => {
+                let excluded = get_excluded_columns(
+                    options.exclude.as_ref(),
+                    options.except.as_ref(),
+                    wildcard_schema.unwrap_or(schema),
+                    Some(qualifier),
+                )?
+                .into_iter()
+                .collect::<HashSet<Column>>();
+                Ok(
+                    
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
+                        .len(),
+                )
+            }
+            _ => Ok(1),
+        })
+        .sum()
 }
 
 /// Convert an expression into Column expression if it's already provided as 
input plan.
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 8ff00917dc..593dab2bc9 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -48,7 +48,13 @@ impl AnalyzerRule for CountWildcardRule {
 }
 
 fn is_wildcard(expr: &Expr) -> bool {
-    matches!(expr, Expr::Wildcard { qualifier: None })
+    matches!(
+        expr,
+        Expr::Wildcard {
+            qualifier: None,
+            ..
+        }
+    )
 }
 
 fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool {
diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
new file mode 100644
index 0000000000..53ba3042f5
--- /dev/null
+++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::AnalyzerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult};
+use datafusion_common::{Column, Result};
+use datafusion_expr::builder::validate_unique_names;
+use datafusion_expr::expr::PlannedReplaceSelectItem;
+use datafusion_expr::utils::{
+    expand_qualified_wildcard, expand_wildcard, find_base_plan,
+};
+use datafusion_expr::{Expr, LogicalPlan, Projection, SubqueryAlias};
+
+#[derive(Default)]
+pub struct ExpandWildcardRule {}
+
+impl ExpandWildcardRule {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl AnalyzerRule for ExpandWildcardRule {
+    fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
+        // Because the wildcard expansion is based on the schema of the input 
plan,
+        // using `transform_up_with_subqueries` here.
+        plan.transform_up_with_subqueries(expand_internal).data()
+    }
+
+    fn name(&self) -> &str {
+        "expand_wildcard_rule"
+    }
+}
+
+fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        LogicalPlan::Projection(Projection { expr, input, .. }) => {
+            let projected_expr = expand_exprlist(&input, expr)?;
+            validate_unique_names("Projections", projected_expr.iter())?;
+            Ok(Transformed::yes(
+                Projection::try_new(projected_expr, Arc::clone(&input))
+                    .map(LogicalPlan::Projection)?,
+            ))
+        }
+        // Teh schema of the plan should also be updated if the child plan is 
transformed.
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            Ok(Transformed::yes(
+                SubqueryAlias::try_new(input, 
alias).map(LogicalPlan::SubqueryAlias)?,
+            ))
+        }
+        _ => Ok(Transformed::no(plan)),
+    }
+}
+
+fn expand_exprlist(input: &LogicalPlan, expr: Vec<Expr>) -> Result<Vec<Expr>> {
+    let mut projected_expr = vec![];
+    let input = find_base_plan(input);
+    for e in expr {
+        match e {
+            Expr::Wildcard { qualifier, options } => {
+                if let Some(qualifier) = qualifier {
+                    let expanded = expand_qualified_wildcard(
+                        &qualifier,
+                        input.schema(),
+                        Some(&options),
+                    )?;
+                    // If there is a REPLACE statement, replace that column 
with the given
+                    // replace expression. Column name remains the same.
+                    let replaced = if let Some(replace) = options.replace {
+                        replace_columns(expanded, replace)?
+                    } else {
+                        expanded
+                    };
+                    projected_expr.extend(replaced);
+                } else {
+                    let expanded =
+                        expand_wildcard(input.schema(), input, 
Some(&options))?;
+                    // If there is a REPLACE statement, replace that column 
with the given
+                    // replace expression. Column name remains the same.
+                    let replaced = if let Some(replace) = options.replace {
+                        replace_columns(expanded, replace)?
+                    } else {
+                        expanded
+                    };
+                    projected_expr.extend(replaced);
+                }
+            }
+            // A workaround to handle the case when the column name is "*".
+            // We transform the expression to a Expr::Column through 
[Column::from_name] in many places.
+            // It would also convert the wildcard expression to a column 
expression with name "*".
+            Expr::Column(Column {
+                ref relation,
+                ref name,
+            }) => {
+                if name.eq("*") {
+                    if let Some(qualifier) = relation {
+                        projected_expr.extend(expand_qualified_wildcard(
+                            qualifier,
+                            input.schema(),
+                            None,
+                        )?);
+                    } else {
+                        projected_expr.extend(expand_wildcard(
+                            input.schema(),
+                            input,
+                            None,
+                        )?);
+                    }
+                } else {
+                    projected_expr.push(e.clone());
+                }
+            }
+            _ => projected_expr.push(e),
+        }
+    }
+    Ok(projected_expr)
+}
+
+/// If there is a REPLACE statement in the projected expression in the form of
+/// "REPLACE (some_column_within_an_expr AS some_column)", this function 
replaces
+/// that column with the given replace expression. Column name remains the 
same.
+/// Multiple REPLACEs are also possible with comma separations.
+fn replace_columns(
+    mut exprs: Vec<Expr>,
+    replace: PlannedReplaceSelectItem,
+) -> Result<Vec<Expr>> {
+    for expr in exprs.iter_mut() {
+        if let Expr::Column(Column { name, .. }) = expr {
+            if let Some((_, new_expr)) = replace
+                .items()
+                .iter()
+                .zip(replace.expressions().iter())
+                .find(|(item, _)| item.column_name.value == *name)
+            {
+                *expr = new_expr.clone().alias(name.clone())
+            }
+        }
+    }
+    Ok(exprs)
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::datatypes::{DataType, Field, Schema};
+
+    use datafusion_common::{JoinType, TableReference};
+    use datafusion_expr::{
+        col, in_subquery, qualified_wildcard, table_scan, wildcard, 
LogicalPlanBuilder,
+    };
+
+    use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
+    use crate::Analyzer;
+
+    use super::*;
+
+    fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
+        assert_analyzed_plan_eq_display_indent(
+            Arc::new(ExpandWildcardRule::new()),
+            plan,
+            expected,
+        )
+    }
+
+    #[test]
+    fn test_expand_wildcard() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![wildcard()])?
+            .build()?;
+        let expected =
+            "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        assert_plan_eq(plan, expected)
+    }
+
+    #[test]
+    fn test_expand_qualified_wildcard() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![qualified_wildcard(TableReference::bare("test"))])?
+            .build()?;
+        let expected =
+            "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        assert_plan_eq(plan, expected)
+    }
+
+    #[test]
+    fn test_expand_qualified_wildcard_in_subquery() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![qualified_wildcard(TableReference::bare("test"))])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(plan)
+            .project(vec![wildcard()])?
+            .build()?;
+        let expected =
+            "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        assert_plan_eq(plan, expected)
+    }
+
+    #[test]
+    fn test_expand_wildcard_in_subquery() -> Result<()> {
+        let projection_a = LogicalPlanBuilder::from(test_table_scan()?)
+            .project(vec![col("a")])?
+            .build()?;
+        let subquery = LogicalPlanBuilder::from(projection_a)
+            .project(vec![wildcard()])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(test_table_scan()?)
+            .filter(in_subquery(col("a"), Arc::new(subquery)))?
+            .project(vec![wildcard()])?
+            .build()?;
+        let expected = "\
+        Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  Filter: test.a IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Subquery: [a:UInt32]\
+        \n      Projection: test.a [a:UInt32]\
+        \n        Projection: test.a [a:UInt32]\
+        \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        assert_plan_eq(plan, expected)
+    }
+
+    #[test]
+    fn test_subquery_schema() -> Result<()> {
+        let analyzer = 
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
+        let options = ConfigOptions::default();
+        let subquery = LogicalPlanBuilder::from(test_table_scan()?)
+            .project(vec![wildcard()])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(subquery)
+            .alias("sub")?
+            .project(vec![wildcard()])?
+            .build()?;
+        let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| 
{})?;
+        for x in analyzed_plan.inputs() {
+            for field in x.schema().fields() {
+                assert_ne!(field.name(), "*");
+            }
+        }
+        Ok(())
+    }
+
+    fn employee_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("first_name", DataType::Utf8, false),
+            Field::new("last_name", DataType::Utf8, false),
+            Field::new("state", DataType::Utf8, false),
+            Field::new("salary", DataType::Int32, false),
+        ])
+    }
+
+    #[test]
+    fn plan_using_join_wildcard_projection() -> Result<()> {
+        let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?;
+
+        let plan = table_scan(Some("t1"), &employee_schema(), None)?
+            .join_using(t2, JoinType::Inner, vec!["id"])?
+            .project(vec![wildcard()])?
+            .build()?;
+
+        let expected = "Projection: *\
+        \n  Inner Join: Using t1.id = t2.id\
+        \n    TableScan: t1\
+        \n    TableScan: t2";
+
+        assert_eq!(expected, format!("{plan}"));
+
+        let analyzer = 
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
+        let options = ConfigOptions::default();
+
+        let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| 
{})?;
+
+        // id column should only show up once in projection
+        let expected = "Projection: t1.id, t1.first_name, t1.last_name, 
t1.state, t1.salary, t2.first_name, t2.last_name, t2.state, t2.salary\
+        \n  Inner Join: Using t1.id = t2.id\
+        \n    TableScan: t1\
+        \n    TableScan: t2";
+        assert_eq!(expected, format!("{analyzed_plan}"));
+
+        Ok(())
+    }
+}
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs 
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index 73ab37cb11..b69b8410da 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -23,6 +23,7 @@ use crate::analyzer::AnalyzerRule;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Column, Result};
+use datafusion_expr::expr::WildcardOptions;
 use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, 
TableScan};
 
 /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
@@ -93,7 +94,10 @@ fn generate_projection_expr(
             )));
         }
     } else {
-        exprs.push(Expr::Wildcard { qualifier: None });
+        exprs.push(Expr::Wildcard {
+            qualifier: None,
+            options: WildcardOptions::default(),
+        });
     }
     Ok(exprs)
 }
@@ -178,7 +182,7 @@ mod tests {
         let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
         let expected = "Filter: x.a = Int32(1)\
         \n  SubqueryAlias: x\
-        \n    Projection: y.a, y.b\
+        \n    Projection: *\
         \n      TableScan: y";
 
         assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, 
expected)
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index 91ee8a9e10..6e2afeca88 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -30,6 +30,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite;
 use datafusion_expr::{Expr, LogicalPlan};
 
 use crate::analyzer::count_wildcard_rule::CountWildcardRule;
+use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
 use crate::analyzer::inline_table_scan::InlineTableScan;
 use crate::analyzer::subquery::check_subquery_expr;
 use crate::analyzer::type_coercion::TypeCoercion;
@@ -38,6 +39,7 @@ use crate::utils::log_plan;
 use self::function_rewrite::ApplyFunctionRewrites;
 
 pub mod count_wildcard_rule;
+pub mod expand_wildcard_rule;
 pub mod function_rewrite;
 pub mod inline_table_scan;
 pub mod subquery;
@@ -89,6 +91,9 @@ impl Analyzer {
     pub fn new() -> Self {
         let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
             Arc::new(InlineTableScan::new()),
+            // Every rule that will generate [Expr::Wildcard] should be placed 
in front of [ExpandWildcardRule].
+            Arc::new(ExpandWildcardRule::new()),
+            // [Expr::Wildcard] should be expanded before [TypeCoercion]
             Arc::new(TypeCoercion::new()),
             Arc::new(CountWildcardRule::new()),
         ];
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 2bb859d84a..7392028ba7 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -21,16 +21,20 @@ use std::sync::Arc;
 
 use arrow::datatypes::{DataType, IntervalUnit};
 
+use crate::analyzer::AnalyzerRule;
+use crate::utils::NamePreserver;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
 use datafusion_common::{
     exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, 
DFSchema,
     DataFusionError, Result, ScalarValue,
 };
+use datafusion_expr::builder::project_with_column_index;
 use datafusion_expr::expr::{
     self, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, 
ScalarFunction,
     WindowFunction,
 };
+use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
 use datafusion_expr::expr_schema::cast_subquery;
 use datafusion_expr::logical_plan::tree_node::unwrap_arc;
 use datafusion_expr::logical_plan::Subquery;
@@ -47,13 +51,10 @@ use datafusion_expr::type_coercion::{is_datetime, 
is_utf8_or_large_utf8};
 use datafusion_expr::utils::merge_schema;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, 
not,
-    AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, Operator, 
ScalarUDF,
-    WindowFrame, WindowFrameBound, WindowFrameUnits,
+    AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, Operator,
+    Projection, ScalarUDF, Union, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
 };
 
-use crate::analyzer::AnalyzerRule;
-use crate::utils::NamePreserver;
-
 #[derive(Default)]
 pub struct TypeCoercion {}
 
@@ -122,6 +123,7 @@ fn analyze_internal(
     })?
     // coerce join expressions specially
     .map_data(|plan| expr_rewrite.coerce_joins(plan))?
+    .map_data(|plan| expr_rewrite.coerce_union(plan))?
     // recompute the schema after the expressions have been rewritten as the 
types may have changed
     .map_data(|plan| plan.recompute_schema())
 }
@@ -168,6 +170,39 @@ impl<'a> TypeCoercionRewriter<'a> {
         Ok(LogicalPlan::Join(join))
     }
 
+    /// Corece the union inputs after expanding the wildcard expressions
+    ///
+    /// Union inputs must have the same schema, so we coerce the expressions 
to match the schema
+    /// after expanding the wildcard expressions
+    fn coerce_union(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
+        let LogicalPlan::Union(union) = plan else {
+            return Ok(plan);
+        };
+
+        let inputs = union
+            .inputs
+            .into_iter()
+            .map(|p| {
+                let plan = coerce_plan_expr_for_schema(&p, &union.schema)?;
+                match plan {
+                    LogicalPlan::Projection(Projection { expr, input, .. }) => 
{
+                        Ok(Arc::new(project_with_column_index(
+                            expr,
+                            input,
+                            Arc::clone(&union.schema),
+                        )?))
+                    }
+                    other_plan => Ok(Arc::new(other_plan)),
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(LogicalPlan::Union(Union {
+            inputs,
+            schema: Arc::clone(&union.schema),
+        }))
+    }
+
     fn coerce_join_filter(&self, expr: Expr) -> Result<Expr> {
         let expr_type = expr.get_type(self.schema)?;
         match expr_type {
@@ -1286,7 +1321,6 @@ mod test {
         .eq(cast(lit("1998-03-18"), DataType::Date32));
         let empty = empty();
         let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], 
empty)?);
-        dbg!(&plan);
         let expected =
             "Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, 
None)) = CAST(CAST(Utf8(\"1998-03-18\") AS Date32) AS Timestamp(Nanosecond, 
None))\n  EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, 
expected)?;
@@ -1473,7 +1507,6 @@ mod test {
         ));
         let empty = empty();
         let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], 
empty)?);
-        dbg!(&plan);
         let expected =
             "Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, 
None)) - CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None))\n  
EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, 
expected)?;
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index aaa5eec395..93dd49b174 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -335,6 +335,27 @@ fn test_propagate_empty_relation_inner_join_and_unions() {
     assert_eq!(expected, format!("{plan}"));
 }
 
+#[test]
+fn select_wildcard_with_repeated_column() {
+    let sql = "SELECT *, col_int32 FROM test";
+    let err = test_sql(sql).expect_err("query should have failed");
+    assert_eq!(
+        "expand_wildcard_rule\ncaused by\nError during planning: Projections 
require unique expression names but the expression \"test.col_int32\" at 
position 0 and \"test.col_int32\" at position 7 have the same name. Consider 
aliasing (\"AS\") one of them.",
+        err.strip_backtrace()
+    );
+}
+
+#[test]
+fn select_wildcard_with_repeated_column_but_is_aliased() {
+    let sql = "SELECT *, col_int32 as col_32 FROM test";
+
+    let plan = test_sql(sql).unwrap();
+    let expected = "Projection: test.col_int32, test.col_uint32, 
test.col_utf8, test.col_date32, test.col_date64, test.col_ts_nano_none, 
test.col_ts_nano_utc, test.col_int32 AS col_32\
+    \n  TableScan: test projection=[col_int32, col_uint32, col_utf8, 
col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]";
+
+    assert_eq!(expected, format!("{plan}"));
+}
+
 fn test_sql(sql: &str) -> Result<LogicalPlan> {
     // parse the SQL
     let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 6c4c07428b..6cbea5f0cf 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -22,8 +22,8 @@ use datafusion_common::{
     exec_datafusion_err, internal_err, plan_datafusion_err, Result, 
ScalarValue,
     TableReference, UnnestOptions,
 };
-use datafusion_expr::expr::Unnest;
 use datafusion_expr::expr::{Alias, Placeholder};
+use datafusion_expr::expr::{Unnest, WildcardOptions};
 use datafusion_expr::ExprFunctionExt;
 use datafusion_expr::{
     expr::{self, InList, Sort, WindowFunction},
@@ -556,7 +556,10 @@ pub fn parse_expr(
         ))),
         ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
             let qualifier = qualifier.to_owned().map(|x| 
x.try_into()).transpose()?;
-            Ok(Expr::Wildcard { qualifier })
+            Ok(Expr::Wildcard {
+                qualifier,
+                options: WildcardOptions::default(),
+            })
         }
         ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
             fun_name,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index ab81ce8af9..c7361c89c3 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -582,7 +582,7 @@ pub fn serialize_expr(
                 expr_type: Some(ExprType::InList(expr)),
             }
         }
-        Expr::Wildcard { qualifier } => protobuf::LogicalExprNode {
+        Expr::Wildcard { qualifier, .. } => protobuf::LogicalExprNode {
             expr_type: Some(ExprType::Wildcard(protobuf::Wildcard {
                 qualifier: qualifier.to_owned().map(|x| x.into()),
             })),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a18fa03b2d..eb7cc5c4b9 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -58,7 +58,7 @@ use datafusion_common::{
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
     self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like, 
ScalarFunction,
-    Sort, Unnest,
+    Sort, Unnest, WildcardOptions,
 };
 use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
 use datafusion_expr::{
@@ -1977,7 +1977,10 @@ fn roundtrip_unnest() {
 
 #[test]
 fn roundtrip_wildcard() {
-    let test_expr = Expr::Wildcard { qualifier: None };
+    let test_expr = Expr::Wildcard {
+        qualifier: None,
+        options: WildcardOptions::default(),
+    };
 
     let ctx = SessionContext::new();
     roundtrip_expr_test(test_expr, ctx);
@@ -1987,6 +1990,7 @@ fn roundtrip_wildcard() {
 fn roundtrip_qualified_wildcard() {
     let test_expr = Expr::Wildcard {
         qualifier: Some("foo".into()),
+        options: WildcardOptions::default(),
     };
 
     let ctx = SessionContext::new();
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index d16d08b041..b95414a8ca 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -22,6 +22,7 @@ use datafusion_common::{
     internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, 
DFSchema,
     Dependency, Result,
 };
+use datafusion_expr::expr::WildcardOptions;
 use datafusion_expr::planner::PlannerResult;
 use datafusion_expr::{
     expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, 
WindowFunctionDefinition,
@@ -420,13 +421,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 name: _,
                 arg: FunctionArgExpr::Wildcard,
                 operator: _,
-            } => Ok(Expr::Wildcard { qualifier: None }),
+            } => Ok(Expr::Wildcard {
+                qualifier: None,
+                options: WildcardOptions::default(),
+            }),
             FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
                 self.sql_expr_to_logical_expr(arg, schema, planner_context)
             }
-            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
-                Ok(Expr::Wildcard { qualifier: None })
-            }
+            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => 
Ok(Expr::Wildcard {
+                qualifier: None,
+                options: WildcardOptions::default(),
+            }),
             _ => not_impl_err!("Unsupported qualified wildcard argument: 
{sql:?}"),
         }
     }
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index f2b4e0b4e4..7c94e5ead5 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -27,10 +27,10 @@ use sqlparser::ast::{
 
 use datafusion_common::{
     internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, 
Result,
-    ScalarValue,
+    ScalarValue, TableReference,
 };
-use datafusion_expr::expr::InList;
 use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::expr::{InList, WildcardOptions};
 use datafusion_expr::{
     lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, 
Literal,
     Operator, TryCast,
@@ -661,6 +661,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 }
                 not_impl_err!("AnyOp not supported by ExprPlanner: 
{binary_expr:?}")
             }
+            SQLExpr::Wildcard => Ok(Expr::Wildcard {
+                qualifier: None,
+                options: WildcardOptions::default(),
+            }),
+            SQLExpr::QualifiedWildcard(object_name) => Ok(Expr::Wildcard {
+                qualifier: Some(TableReference::from(object_name.to_string())),
+                options: WildcardOptions::default(),
+            }),
             SQLExpr::Tuple(values) => self.parse_tuple(schema, 
planner_context, values),
             _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
         }
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 95a44dace3..339234d996 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -27,23 +27,23 @@ use crate::utils::{
 };
 
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
+use datafusion_common::UnnestOptions;
 use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
-use datafusion_common::{Column, UnnestOptions};
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
 use datafusion_expr::expr_rewriter::{
     normalize_col, normalize_col_with_schemas_and_ambiguity_check, 
normalize_cols,
 };
 use datafusion_expr::logical_plan::tree_node::unwrap_arc;
 use datafusion_expr::utils::{
-    expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, 
expr_to_columns,
-    find_aggregate_exprs, find_window_exprs,
+    expr_as_column_expr, expr_to_columns, find_aggregate_exprs, 
find_window_exprs,
 };
 use datafusion_expr::{
-    Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, 
Partitioning,
+    qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, 
Filter,
+    GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
 };
 use sqlparser::ast::{
     Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
-    ReplaceSelectItem, WildcardAdditionalOptions, WindowType,
+    WildcardAdditionalOptions, WindowType,
 };
 use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, 
TableWithJoins};
 
@@ -82,7 +82,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // handle named windows before processing the projection expression
         check_conflicting_windows(&select.named_window)?;
         match_window_definitions(&mut select.projection, 
&select.named_window)?;
-        // process the SELECT expressions, with wildcards expanded.
+        // process the SELECT expressions
         let select_exprs = self.prepare_select_exprs(
             &base_plan,
             select.projection,
@@ -515,8 +515,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
-    ///
-    /// Wildcards are expanded into the concrete list of columns.
     fn prepare_select_exprs(
         &self,
         plan: &LogicalPlan,
@@ -570,49 +568,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             }
             SelectItem::Wildcard(options) => {
                 Self::check_wildcard_options(&options)?;
-
                 if empty_from {
                     return plan_err!("SELECT * with no tables specified is not 
valid");
                 }
-                // do not expand from outer schema
-                let expanded_exprs =
-                    expand_wildcard(plan.schema().as_ref(), plan, 
Some(&options))?;
-                // If there is a REPLACE statement, replace that column with 
the given
-                // replace expression. Column name remains the same.
-                if let Some(replace) = options.opt_replace {
-                    self.replace_columns(
-                        plan,
-                        empty_from,
-                        planner_context,
-                        expanded_exprs,
-                        replace,
-                    )
-                } else {
-                    Ok(expanded_exprs)
-                }
+                let planned_options = self.plan_wildcard_options(
+                    plan,
+                    empty_from,
+                    planner_context,
+                    options,
+                )?;
+                Ok(vec![wildcard_with_options(planned_options)])
             }
             SelectItem::QualifiedWildcard(object_name, options) => {
                 Self::check_wildcard_options(&options)?;
                 let qualifier = idents_to_table_reference(object_name.0, 
false)?;
-                // do not expand from outer schema
-                let expanded_exprs = expand_qualified_wildcard(
-                    &qualifier,
-                    plan.schema().as_ref(),
-                    Some(&options),
+                let planned_options = self.plan_wildcard_options(
+                    plan,
+                    empty_from,
+                    planner_context,
+                    options,
                 )?;
-                // If there is a REPLACE statement, replace that column with 
the given
-                // replace expression. Column name remains the same.
-                if let Some(replace) = options.opt_replace {
-                    self.replace_columns(
-                        plan,
-                        empty_from,
-                        planner_context,
-                        expanded_exprs,
-                        replace,
-                    )
-                } else {
-                    Ok(expanded_exprs)
-                }
+                Ok(vec![qualified_wildcard_with_options(
+                    qualifier,
+                    planned_options,
+                )])
             }
         }
     }
@@ -637,40 +616,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// If there is a REPLACE statement in the projected expression in the 
form of
-    /// "REPLACE (some_column_within_an_expr AS some_column)", this function 
replaces
-    /// that column with the given replace expression. Column name remains the 
same.
-    /// Multiple REPLACEs are also possible with comma separations.
-    fn replace_columns(
+    /// "REPLACE (some_column_within_an_expr AS some_column)", we should plan 
the
+    /// replace expressions first.
+    fn plan_wildcard_options(
         &self,
         plan: &LogicalPlan,
         empty_from: bool,
         planner_context: &mut PlannerContext,
-        mut exprs: Vec<Expr>,
-        replace: ReplaceSelectItem,
-    ) -> Result<Vec<Expr>> {
-        for expr in exprs.iter_mut() {
-            if let Expr::Column(Column { name, .. }) = expr {
-                if let Some(item) = replace
-                    .items
-                    .iter()
-                    .find(|item| item.column_name.value == *name)
-                {
-                    let new_expr = self.sql_select_to_rex(
+        options: WildcardAdditionalOptions,
+    ) -> Result<WildcardOptions> {
+        let planned_option = WildcardOptions {
+            ilike: options.opt_ilike,
+            exclude: options.opt_exclude,
+            except: options.opt_except,
+            replace: None,
+            rename: options.opt_rename,
+        };
+        if let Some(replace) = options.opt_replace {
+            let replace_expr = replace
+                .items
+                .iter()
+                .map(|item| {
+                    Ok(self.sql_select_to_rex(
                         SelectItem::UnnamedExpr(item.expr.clone()),
                         plan,
                         empty_from,
                         planner_context,
                     )?[0]
-                        .clone();
-                    *expr = Expr::Alias(Alias {
-                        expr: Box::new(new_expr),
-                        relation: None,
-                        name: name.clone(),
-                    });
-                }
-            }
+                        .clone())
+                })
+                .collect::<Result<Vec<_>>>()?;
+            let planned_replace = PlannedReplaceSelectItem {
+                items: replace.items.into_iter().map(|i| *i).collect(),
+                planned_expressions: replace_expr,
+            };
+            Ok(planned_option.with_replace(planned_replace))
+        } else {
+            Ok(planned_option)
         }
-        Ok(exprs)
     }
 
     /// Wrap a plan in a projection
@@ -715,7 +698,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let plan = LogicalPlanBuilder::from(input.clone())
             .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
             .build()?;
-
         let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
             &agg.group_expr
         } else {
diff --git a/datafusion/sql/src/unparser/expr.rs 
b/datafusion/sql/src/unparser/expr.rs
index de130754ab..39511ea4d0 100644
--- a/datafusion/sql/src/unparser/expr.rs
+++ b/datafusion/sql/src/unparser/expr.rs
@@ -21,11 +21,13 @@ use datafusion_expr::ScalarUDF;
 use sqlparser::ast::Value::SingleQuotedString;
 use sqlparser::ast::{
     self, BinaryOperator, Expr as AstExpr, Function, FunctionArg, Ident, 
Interval,
-    TimezoneInfo, UnaryOperator,
+    ObjectName, TimezoneInfo, UnaryOperator,
 };
 use std::sync::Arc;
 use std::{fmt::Display, vec};
 
+use super::dialect::{DateFieldExtractStyle, IntervalStyle};
+use super::Unparser;
 use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType};
 use arrow::util::display::array_value_to_string;
 use arrow_array::types::{
@@ -44,9 +46,6 @@ use datafusion_expr::{
     Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Operator, 
TryCast,
 };
 
-use super::dialect::{DateFieldExtractStyle, IntervalStyle};
-use super::Unparser;
-
 /// DataFusion's Exprs can represent either an `Expr` or an `OrderByExpr`
 pub enum Unparsed {
     // SQL Expression
@@ -159,7 +158,13 @@ impl Unparser<'_> {
                 let args = args
                     .iter()
                     .map(|e| {
-                        if matches!(e, Expr::Wildcard { qualifier: None }) {
+                        if matches!(
+                            e,
+                            Expr::Wildcard {
+                                qualifier: None,
+                                ..
+                            }
+                        ) {
                             
Ok(FunctionArg::Unnamed(ast::FunctionArgExpr::Wildcard))
                         } else {
                             self.expr_to_sql_inner(e).map(|e| {
@@ -477,8 +482,15 @@ impl Unparser<'_> {
                     format: None,
                 })
             }
-            Expr::Wildcard { qualifier: _ } => {
-                not_impl_err!("Unsupported Expr conversion: {expr:?}")
+            // TODO: unparsing wildcard addition options
+            Expr::Wildcard { qualifier, .. } => {
+                if let Some(qualifier) = qualifier {
+                    let idents: Vec<Ident> =
+                        
qualifier.to_vec().into_iter().map(Ident::new).collect();
+                    Ok(ast::Expr::QualifiedWildcard(ObjectName(idents)))
+                } else {
+                    Ok(ast::Expr::Wildcard)
+                }
             }
             Expr::GroupingSet(grouping_set) => match grouping_set {
                 GroupingSet::GroupingSets(grouping_sets) => {
@@ -643,7 +655,13 @@ impl Unparser<'_> {
     fn function_args_to_sql(&self, args: &[Expr]) -> 
Result<Vec<ast::FunctionArg>> {
         args.iter()
             .map(|e| {
-                if matches!(e, Expr::Wildcard { qualifier: None }) {
+                if matches!(
+                    e,
+                    Expr::Wildcard {
+                        qualifier: None,
+                        ..
+                    }
+                ) {
                     
Ok(ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Wildcard))
                 } else {
                     self.expr_to_sql(e)
@@ -1503,6 +1521,7 @@ mod tests {
     use arrow_schema::DataType::Int8;
     use ast::ObjectName;
     use datafusion_common::TableReference;
+    use datafusion_expr::expr::WildcardOptions;
     use datafusion_expr::{
         case, col, cube, exists, grouping_set, interval_datetime_lit,
         interval_year_month_lit, lit, not, not_exists, out_ref_col, 
placeholder, rollup,
@@ -1558,7 +1577,10 @@ mod tests {
     fn expr_to_sql_ok() -> Result<()> {
         let dummy_schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
         let dummy_logical_plan = table_scan(Some("t"), &dummy_schema, None)?
-            .project(vec![Expr::Wildcard { qualifier: None }])?
+            .project(vec![Expr::Wildcard {
+                qualifier: None,
+                options: WildcardOptions::default(),
+            }])?
             .filter(col("a").eq(lit(1)))?
             .build()?;
 
@@ -1749,7 +1771,10 @@ mod tests {
             (sum(col("a")), r#"sum(a)"#),
             (
                 count_udaf()
-                    .call(vec![Expr::Wildcard { qualifier: None }])
+                    .call(vec![Expr::Wildcard {
+                        qualifier: None,
+                        options: WildcardOptions::default(),
+                    }])
                     .distinct()
                     .build()
                     .unwrap(),
@@ -1757,7 +1782,10 @@ mod tests {
             ),
             (
                 count_udaf()
-                    .call(vec![Expr::Wildcard { qualifier: None }])
+                    .call(vec![Expr::Wildcard {
+                        qualifier: None,
+                        options: WildcardOptions::default(),
+                    }])
                     .filter(lit(true))
                     .build()
                     .unwrap(),
@@ -1833,11 +1861,11 @@ mod tests {
             (Expr::Negative(Box::new(col("a"))), r#"-a"#),
             (
                 exists(Arc::new(dummy_logical_plan.clone())),
-                r#"EXISTS (SELECT t.a FROM t WHERE (t.a = 1))"#,
+                r#"EXISTS (SELECT * FROM t WHERE (t.a = 1))"#,
             ),
             (
                 not_exists(Arc::new(dummy_logical_plan.clone())),
-                r#"NOT EXISTS (SELECT t.a FROM t WHERE (t.a = 1))"#,
+                r#"NOT EXISTS (SELECT * FROM t WHERE (t.a = 1))"#,
             ),
             (
                 try_cast(col("a"), DataType::Date64),
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 5a0317c47c..15efe2d2f0 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -58,7 +58,7 @@ mod common;
 fn test_schema_support() {
     quick_test(
         "SELECT * FROM s1.test",
-        "Projection: s1.test.t_date32, s1.test.t_date64\
+        "Projection: *\
              \n  TableScan: s1.test",
     );
 }
@@ -517,7 +517,7 @@ fn plan_copy_to_query() {
     let plan = r#"
 CopyTo: format=csv output_url=output.csv options: ()
   Limit: skip=0, fetch=10
-    Projection: test_decimal.id, test_decimal.price
+    Projection: *
       TableScan: test_decimal
     "#
     .trim();
@@ -637,23 +637,13 @@ fn select_repeated_column() {
     );
 }
 
-#[test]
-fn select_wildcard_with_repeated_column() {
-    let sql = "SELECT *, age FROM person";
-    let err = logical_plan(sql).expect_err("query should have failed");
-    assert_eq!(
-        "Error during planning: Projections require unique expression names 
but the expression \"person.age\" at position 3 and \"person.age\" at position 
8 have the same name. Consider aliasing (\"AS\") one of them.",
-        err.strip_backtrace()
-    );
-}
-
 #[test]
 fn select_wildcard_with_repeated_column_but_is_aliased() {
     quick_test(
-            "SELECT *, first_name AS fn from person",
-            "Projection: person.id, person.first_name, person.last_name, 
person.age, person.state, person.salary, person.birth_date, person.😀, 
person.first_name AS fn\
+        "SELECT *, first_name AS fn from person",
+        "Projection: *, person.first_name AS fn\
             \n  TableScan: person",
-        );
+    );
 }
 
 #[test]
@@ -870,7 +860,7 @@ fn where_selection_with_ambiguous_column() {
 #[test]
 fn natural_join() {
     let sql = "SELECT * FROM lineitem a NATURAL JOIN lineitem b";
-    let expected = "Projection: a.l_item_id, a.l_description, a.price\
+    let expected = "Projection: *\
                         \n  Inner Join: Using a.l_item_id = b.l_item_id, 
a.l_description = b.l_description, a.price = b.price\
                         \n    SubqueryAlias: a\
                         \n      TableScan: lineitem\
@@ -906,7 +896,7 @@ fn natural_right_join() {
 #[test]
 fn natural_join_no_common_becomes_cross_join() {
     let sql = "SELECT * FROM person a NATURAL JOIN lineitem b";
-    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, 
a.state, a.salary, a.birth_date, a.😀, b.l_item_id, b.l_description, b.price\
+    let expected = "Projection: *\
                         \n  CrossJoin:\
                         \n    SubqueryAlias: a\
                         \n      TableScan: person\
@@ -918,8 +908,7 @@ fn natural_join_no_common_becomes_cross_join() {
 #[test]
 fn using_join_multiple_keys() {
     let sql = "SELECT * FROM person a join person b using (id, age)";
-    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, 
a.state, a.salary, a.birth_date, a.😀, \
-        b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
+    let expected = "Projection: *\
                         \n  Inner Join: Using a.id = b.id, a.age = b.age\
                         \n    SubqueryAlias: a\
                         \n      TableScan: person\
@@ -933,8 +922,7 @@ fn using_join_multiple_keys_subquery() {
     let sql =
         "SELECT age FROM (SELECT * FROM person a join person b using (id, age, 
state))";
     let expected = "Projection: a.age\
-                        \n  Projection: a.id, a.first_name, a.last_name, 
a.age, a.state, a.salary, a.birth_date, a.😀, \
-        b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
+                        \n  Projection: *\
                         \n    Inner Join: Using a.id = b.id, a.age = b.age, 
a.state = b.state\
                         \n      SubqueryAlias: a\
                         \n        TableScan: person\
@@ -946,8 +934,7 @@ fn using_join_multiple_keys_subquery() {
 #[test]
 fn using_join_multiple_keys_qualified_wildcard_select() {
     let sql = "SELECT a.* FROM person a join person b using (id, age)";
-    let expected =
-        "Projection: a.id, a.first_name, a.last_name, a.age, a.state, 
a.salary, a.birth_date, a.😀\
+    let expected = "Projection: a.*\
                         \n  Inner Join: Using a.id = b.id, a.age = b.age\
                         \n    SubqueryAlias: a\
                         \n      TableScan: person\
@@ -959,8 +946,7 @@ fn using_join_multiple_keys_qualified_wildcard_select() {
 #[test]
 fn using_join_multiple_keys_select_all_columns() {
     let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
-    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, 
a.state, a.salary, a.birth_date, a.😀, \
-        b.id, b.first_name, b.last_name, b.age, b.state, b.salary, 
b.birth_date, b.😀\
+    let expected = "Projection: a.*, b.*\
                         \n  Inner Join: Using a.id = b.id, a.age = b.age\
                         \n    SubqueryAlias: a\
                         \n      TableScan: person\
@@ -972,9 +958,7 @@ fn using_join_multiple_keys_select_all_columns() {
 #[test]
 fn using_join_multiple_keys_multiple_joins() {
     let sql = "SELECT * FROM person a join person b using (id, age, state) 
join person c using (id, age, state)";
-    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, 
a.state, a.salary, a.birth_date, a.😀, \
-        b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
-        c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
+    let expected = "Projection: *\
                         \n  Inner Join: Using a.id = c.id, a.age = c.age, 
a.state = c.state\
                         \n    Inner Join: Using a.id = b.id, a.age = b.age, 
a.state = b.state\
                         \n      SubqueryAlias: a\
@@ -1305,13 +1289,13 @@ fn select_binary_expr_nested() {
 fn select_wildcard_with_groupby() {
     quick_test(
             r#"SELECT * FROM person GROUP BY id, first_name, last_name, age, 
state, salary, birth_date, "😀""#,
-            "Projection: person.id, person.first_name, person.last_name, 
person.age, person.state, person.salary, person.birth_date, person.😀\
+            "Projection: *\
              \n  Aggregate: groupBy=[[person.id, person.first_name, 
person.last_name, person.age, person.state, person.salary, person.birth_date, 
person.😀]], aggr=[[]]\
              \n    TableScan: person",
         );
     quick_test(
             "SELECT * FROM (SELECT first_name, last_name FROM person) AS a 
GROUP BY first_name, last_name",
-            "Projection: a.first_name, a.last_name\
+            "Projection: *\
             \n  Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\
             \n    SubqueryAlias: a\
             \n      Projection: person.first_name, person.last_name\
@@ -1474,7 +1458,7 @@ fn recursive_ctes() {
         select * from numbers;";
     quick_test(
         sql,
-        "Projection: numbers.n\
+        "Projection: *\
     \n  SubqueryAlias: numbers\
     \n    RecursiveQuery: is_distinct=false\
     \n      Projection: Int64(1) AS n\
@@ -1687,10 +1671,10 @@ fn 
select_aggregate_with_non_column_inner_expression_with_groupby() {
 #[test]
 fn test_wildcard() {
     quick_test(
-            "SELECT * from person",
-            "Projection: person.id, person.first_name, person.last_name, 
person.age, person.state, person.salary, person.birth_date, person.😀\
+        "SELECT * from person",
+        "Projection: *\
             \n  TableScan: person",
-        );
+    );
 }
 
 #[test]
@@ -2118,7 +2102,7 @@ fn project_wildcard_on_join_with_using() {
             FROM lineitem \
             JOIN lineitem as lineitem2 \
             USING (l_item_id)";
-    let expected = "Projection: lineitem.l_item_id, lineitem.l_description, 
lineitem.price, lineitem2.l_description, lineitem2.price\
+    let expected = "Projection: *\
         \n  Inner Join: Using lineitem.l_item_id = lineitem2.l_item_id\
         \n    TableScan: lineitem\
         \n    SubqueryAlias: lineitem2\
@@ -3005,7 +2989,7 @@ fn exists_subquery_wildcard() {
     let expected = "Projection: p.id\
         \n  Filter: EXISTS (<subquery>)\
         \n    Subquery:\
-        \n      Projection: person.id, person.first_name, person.last_name, 
person.age, person.state, person.salary, person.birth_date, person.😀\
+        \n      Projection: *\
         \n        Filter: person.last_name = outer_ref(p.last_name) AND 
person.state = outer_ref(p.state)\
         \n          TableScan: person\
         \n    SubqueryAlias: p\
@@ -3092,13 +3076,13 @@ fn subquery_references_cte() {
         cte AS (SELECT * FROM person) \
         SELECT * FROM person WHERE EXISTS (SELECT * FROM cte WHERE id = 
person.id)";
 
-    let expected = "Projection: person.id, person.first_name, 
person.last_name, person.age, person.state, person.salary, person.birth_date, 
person.😀\
+    let expected = "Projection: *\
         \n  Filter: EXISTS (<subquery>)\
         \n    Subquery:\
-        \n      Projection: cte.id, cte.first_name, cte.last_name, cte.age, 
cte.state, cte.salary, cte.birth_date, cte.😀\
+        \n      Projection: *\
         \n        Filter: cte.id = outer_ref(person.id)\
         \n          SubqueryAlias: cte\
-        \n            Projection: person.id, person.first_name, 
person.last_name, person.age, person.state, person.salary, person.birth_date, 
person.😀\
+        \n            Projection: *\
         \n              TableScan: person\
         \n    TableScan: person";
 
@@ -3113,7 +3097,7 @@ fn cte_with_no_column_names() {
         ) \
         SELECT * FROM numbers;";
 
-    let expected = "Projection: numbers.a, numbers.b, numbers.c\
+    let expected = "Projection: *\
         \n  SubqueryAlias: numbers\
         \n    Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\
         \n      EmptyRelation";
@@ -3129,7 +3113,7 @@ fn cte_with_column_names() {
         ) \
         SELECT * FROM numbers;";
 
-    let expected = "Projection: numbers.a, numbers.b, numbers.c\
+    let expected = "Projection: *\
         \n  SubqueryAlias: numbers\
         \n    Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\
         \n      Projection: Int64(1), Int64(2), Int64(3)\
@@ -3147,7 +3131,7 @@ fn cte_with_column_aliases_precedence() {
         ) \
         SELECT * FROM numbers;";
 
-    let expected = "Projection: numbers.a, numbers.b, numbers.c\
+    let expected = "Projection: *\
         \n  SubqueryAlias: numbers\
         \n    Projection: x AS a, y AS b, z AS c\
         \n      Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z\
@@ -3528,7 +3512,7 @@ fn test_select_all_inner_join() {
             INNER JOIN orders \
             ON orders.customer_id * 2 = person.id + 10";
 
-    let expected = "Projection: person.id, person.first_name, 
person.last_name, person.age, person.state, person.salary, person.birth_date, 
person.😀, orders.order_id, orders.customer_id, orders.o_item_id, orders.qty, 
orders.price, orders.delivered\
+    let expected = "Projection: *\
             \n  Inner Join:  Filter: orders.customer_id * Int64(2) = person.id 
+ Int64(10)\
             \n    TableScan: person\
             \n    TableScan: orders";
@@ -4245,7 +4229,7 @@ fn test_prepare_statement_to_plan_value_list() {
     let sql = "PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, 
$1), (2, $2)) AS t (num, letter);";
 
     let expected_plan = "Prepare: \"my_plan\" [Utf8, Utf8] \
-        \n  Projection: t.num, t.letter\
+        \n  Projection: *\
         \n    SubqueryAlias: t\
         \n      Projection: column1 AS num, column2 AS letter\
         \n        Values: (Int64(1), $1), (Int64(2), $2)";
@@ -4260,7 +4244,7 @@ fn test_prepare_statement_to_plan_value_list() {
         ScalarValue::from("a".to_string()),
         ScalarValue::from("b".to_string()),
     ];
-    let expected_plan = "Projection: t.num, t.letter\
+    let expected_plan = "Projection: *\
         \n  SubqueryAlias: t\
         \n    Projection: column1 AS num, column2 AS letter\
         \n      Values: (Int64(1), Utf8(\"a\")), (Int64(2), Utf8(\"b\"))";
@@ -4310,7 +4294,7 @@ fn test_table_alias() {
           (select age from person) t2 \
         ) as f";
 
-    let expected = "Projection: f.id, f.age\
+    let expected = "Projection: *\
         \n  SubqueryAlias: f\
         \n    CrossJoin:\
         \n      SubqueryAlias: t1\
@@ -4327,7 +4311,7 @@ fn test_table_alias() {
           (select age from person) t2 \
         ) as f (c1, c2)";
 
-    let expected = "Projection: f.c1, f.c2\
+    let expected = "Projection: *\
         \n  SubqueryAlias: f\
         \n    Projection: t1.id AS c1, t2.age AS c2\
         \n      CrossJoin:\
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index eae4f428b4..1e8850efad 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -177,6 +177,7 @@ initial_logical_plan
 01)Projection: simple_explain_test.a, simple_explain_test.b, 
simple_explain_test.c
 02)--TableScan: simple_explain_test
 logical_plan after inline_table_scan SAME TEXT AS ABOVE
+logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE
 logical_plan after type_coercion SAME TEXT AS ABOVE
 logical_plan after count_wildcard_rule SAME TEXT AS ABOVE
 analyzed_logical_plan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index f217cbab07..49a18ca09d 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1195,12 +1195,12 @@ LIMIT 5
 200 2000
 
 # Trying to exclude non-existing column should give error
-statement error DataFusion error: Schema error: No field named e. Valid fields 
are table1.a, table1.b, table1.c, table1.d.
+statement error
 SELECT * EXCLUDE e
 FROM table1
 
 # similarly, except should raise error if excluded column is not in the table
-statement error DataFusion error: Schema error: No field named e. Valid fields 
are table1.a, table1.b, table1.c, table1.d.
+statement error
 SELECT * EXCEPT(e)
 FROM table1
 
@@ -1214,7 +1214,7 @@ FROM table1
 2 20 20 200 2000
 
 # EXCEPT, or EXCLUDE shouldn't contain duplicate column names
-statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT 
contains duplicate column names
+statement error
 SELECT * EXCLUDE(a, a)
 FROM table1
 
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 476ebe7ebe..ffbf54c4d9 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -637,8 +637,43 @@ SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1
 SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1 WHERE (t1.v2 IS NULL);
 ----
 
+statement ok
+CREATE TABLE t3 (
+  id INT
+) as VALUES
+  (1),
+  (2),
+  (3)
+;
+
+statement ok
+CREATE TABLE t4 (
+  id TEXT
+) as VALUES
+  ('4'),
+  ('5'),
+  ('6')
+;
+
+# test type coersion for wildcard expansion
+query T rowsort
+(SELECT * FROM t3 ) UNION ALL (SELECT * FROM t4)
+----
+1
+2
+3
+4
+5
+6
+
 statement ok
 DROP TABLE t1;
 
 statement ok
 DROP TABLE t2;
+
+statement ok
+DROP TABLE t3;
+
+statement ok
+DROP TABLE t4;
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index dfc8826676..ddf6a7aabf 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3929,7 +3929,8 @@ b 1 3
 a 1 4
 b 5 5
 
-statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression aggregate_test_100.c1 could not be resolved 
from available columns: rn
+# Schema error: No field named aggregate_test_100.c1. Valid fields are rn.
+statement error
 SELECT *
 FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
     FROM aggregate_test_100


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to