This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d07360e6 Support catalog.schema.table.column in SQL SELECT and WHERE 
(#5343)
4d07360e6 is described below

commit 4d07360e6f67012571237ab1a773ac1fec7122d9
Author: Jeffrey <[email protected]>
AuthorDate: Wed Mar 15 01:50:10 2023 +1100

    Support catalog.schema.table.column in SQL SELECT and WHERE (#5343)
    
    * Support catalog.schema.table.column in SQL SELECT and WHERE
    
    * Merge branch 'main' into support_catalog_schema_in_ident
    
    * Update column new() docstring
    
    * Add tests for dfschema search
    
    * Introduce DFField::new_unqualified
    
    * Introduce new_unqualified methods for simpler syntax
    
    * Fix merge
    
    * New ident() expr function
    
    * Refactor OwnedTableReference to be a type alies of TableReference<'static>
    
    * Update comments
    
    * Comments
    
    * Update docstrings
    
    * From OwnedTableReference to TableReference impl
---
 datafusion/common/src/column.rs                    | 109 ++++---
 datafusion/common/src/dfschema.rs                  | 157 ++++++----
 datafusion/common/src/error.rs                     |  70 +++--
 datafusion/common/src/lib.rs                       |   5 +-
 datafusion/common/src/table_reference.rs           | 335 +++++++++-----------
 datafusion/common/src/utils.rs                     | 134 ++++++++
 datafusion/core/src/catalog/listing_schema.rs      |   4 +-
 datafusion/core/src/dataframe.rs                   |   2 +-
 datafusion/core/src/execution/context.rs           |   4 +-
 .../core/src/physical_plan/file_format/parquet.rs  |   4 +
 datafusion/core/src/physical_plan/planner.rs       |   4 +-
 datafusion/core/tests/dataframe.rs                 |   8 +-
 datafusion/core/tests/sql/idenfifers.rs            |   8 +-
 datafusion/core/tests/sql/references.rs            |   2 +-
 .../test_files/information_schema.slt              |  24 ++
 .../core/tests/sqllogictests/test_files/join.slt   |   2 +-
 datafusion/expr/src/expr_fn.rs                     |  40 ++-
 datafusion/expr/src/expr_rewriter.rs               |   7 +-
 datafusion/expr/src/expr_schema.rs                 |   5 +-
 datafusion/expr/src/logical_plan/builder.rs        |  36 ++-
 datafusion/expr/src/logical_plan/plan.rs           |   2 +
 datafusion/expr/src/utils.rs                       |   4 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |   6 +-
 datafusion/optimizer/src/optimizer.rs              |   8 +-
 datafusion/optimizer/src/push_down_projection.rs   |   7 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |  10 +-
 .../src/simplify_expressions/expr_simplifier.rs    |  16 +-
 datafusion/optimizer/src/type_coercion.rs          |  20 +-
 .../optimizer/src/unwrap_cast_in_comparison.rs     |  24 +-
 .../physical-expr/src/intervals/cp_solver.rs       |   2 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |  17 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |  20 +-
 datafusion/sql/src/expr/arrow_cast.rs              |   2 +-
 datafusion/sql/src/expr/identifier.rs              | 338 ++++++++++++++++++---
 datafusion/sql/src/expr/mod.rs                     |   2 +-
 datafusion/sql/src/planner.rs                      |  21 +-
 datafusion/sql/src/relation/mod.rs                 |   5 +-
 datafusion/sql/src/statement.rs                    |  22 +-
 datafusion/sql/tests/integration_test.rs           |  33 +-
 docs/source/user-guide/example-usage.md            |   9 +-
 40 files changed, 1018 insertions(+), 510 deletions(-)

diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs
index 4a0e392f6..78c185723 100644
--- a/datafusion/common/src/column.rs
+++ b/datafusion/common/src/column.rs
@@ -17,7 +17,8 @@
 
 //! Column
 
-use crate::{DFSchema, DataFusionError, Result, SchemaError};
+use crate::utils::{parse_identifiers_normalized, quote_identifier};
+use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, 
SchemaError};
 use std::collections::HashSet;
 use std::convert::Infallible;
 use std::fmt;
@@ -27,21 +28,37 @@ use std::sync::Arc;
 /// A named reference to a qualified field in a schema.
 #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
 pub struct Column {
-    /// relation/table name.
-    pub relation: Option<String>,
+    /// relation/table reference.
+    pub relation: Option<OwnedTableReference>,
     /// field/column name.
     pub name: String,
 }
 
 impl Column {
-    /// Create Column from optional qualifier and name
-    pub fn new(relation: Option<impl Into<String>>, name: impl Into<String>) 
-> Self {
+    /// Create Column from optional qualifier and name. The optional 
qualifier, if present,
+    /// will be parsed and normalized by default.
+    ///
+    /// See full details on [`TableReference::parse_str`]
+    ///
+    /// [`TableReference::parse_str`]: crate::TableReference::parse_str
+    pub fn new(
+        relation: Option<impl Into<OwnedTableReference>>,
+        name: impl Into<String>,
+    ) -> Self {
         Self {
             relation: relation.map(|r| r.into()),
             name: name.into(),
         }
     }
 
+    /// Convenience method for when there is no qualifier
+    pub fn new_unqualified(name: impl Into<String>) -> Self {
+        Self {
+            relation: None,
+            name: name.into(),
+        }
+    }
+
     /// Create Column from unqualified name.
     pub fn from_name(name: impl Into<String>) -> Self {
         Self {
@@ -53,26 +70,36 @@ impl Column {
     /// Deserialize a fully qualified name string into a column
     pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
         let flat_name = flat_name.into();
-        use sqlparser::tokenizer::Token;
-
-        let dialect = sqlparser::dialect::GenericDialect {};
-        let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, 
&flat_name);
-        if let Ok(tokens) = tokenizer.tokenize() {
-            if let [Token::Word(relation), Token::Period, Token::Word(name)] =
-                tokens.as_slice()
-            {
-                return Column {
-                    relation: Some(relation.value.clone()),
-                    name: name.value.clone(),
-                };
-            }
-        }
-        // any expression that's not in the form of `foo.bar` will be treated 
as unqualified column
-        // name
-        Column {
-            relation: None,
-            name: flat_name,
-        }
+        let mut idents = parse_identifiers_normalized(&flat_name);
+
+        let (relation, name) = match idents.len() {
+            1 => (None, idents.remove(0)),
+            2 => (
+                Some(OwnedTableReference::Bare {
+                    table: idents.remove(0).into(),
+                }),
+                idents.remove(0),
+            ),
+            3 => (
+                Some(OwnedTableReference::Partial {
+                    schema: idents.remove(0).into(),
+                    table: idents.remove(0).into(),
+                }),
+                idents.remove(0),
+            ),
+            4 => (
+                Some(OwnedTableReference::Full {
+                    catalog: idents.remove(0).into(),
+                    schema: idents.remove(0).into(),
+                    table: idents.remove(0).into(),
+                }),
+                idents.remove(0),
+            ),
+            // any expression that failed to parse or has more than 4 period 
delimited
+            // identifiers will be treated as an unqualified column name
+            _ => (None, flat_name),
+        };
+        Self { relation, name }
     }
 
     /// Serialize column into a flat name string
@@ -83,6 +110,18 @@ impl Column {
         }
     }
 
+    /// Serialize column into a quoted flat name string
+    pub fn quoted_flat_name(&self) -> String {
+        // TODO: quote identifiers only when special characters present
+        // see: https://github.com/apache/arrow-datafusion/issues/5523
+        match &self.relation {
+            Some(r) => {
+                format!("{}.{}", r.to_quoted_string(), 
quote_identifier(&self.name))
+            }
+            None => quote_identifier(&self.name),
+        }
+    }
+
     /// Qualify column if not done yet.
     ///
     /// If this column already has a [relation](Self::relation), it will be 
returned as is and the given parameters are
@@ -151,7 +190,7 @@ impl Column {
         }
 
         Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
-            field: Column::new(self.relation.clone(), self.name),
+            field: Box::new(Column::new(self.relation.clone(), self.name)),
             valid_fields: schemas
                 .iter()
                 .flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
@@ -240,8 +279,7 @@ impl Column {
                     // If not due to USING columns then due to ambiguous 
column name
                     return Err(DataFusionError::SchemaError(
                         SchemaError::AmbiguousReference {
-                            qualifier: None,
-                            name: self.name,
+                            field: Column::new_unqualified(self.name),
                         },
                     ));
                 }
@@ -249,7 +287,7 @@ impl Column {
         }
 
         Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
-            field: self,
+            field: Box::new(self),
             valid_fields: schemas
                 .iter()
                 .flat_map(|s| s.iter())
@@ -304,7 +342,12 @@ mod tests {
         let fields = names
             .iter()
             .map(|(qualifier, name)| {
-                DFField::new(qualifier.to_owned(), name, DataType::Boolean, 
true)
+                DFField::new(
+                    qualifier.to_owned().map(|s| s.to_string()),
+                    name,
+                    DataType::Boolean,
+                    true,
+                )
             })
             .collect::<Vec<_>>();
         DFSchema::new_with_metadata(fields, HashMap::new())
@@ -362,9 +405,7 @@ mod tests {
                 &[],
             )
             .expect_err("should've failed to find field");
-        let expected = "Schema error: No field named 'z'. \
-        Valid fields are 't1'.'a', 't1'.'b', 't2'.'c', \
-        't2'.'d', 't3'.'a', 't3'.'b', 't3'.'c', 't3'.'d', 't3'.'e'.";
+        let expected = r#"Schema error: No field named "z". Valid fields are 
"t1"."a", "t1"."b", "t2"."c", "t2"."d", "t3"."a", "t3"."b", "t3"."c", "t3"."d", 
"t3"."e"."#;
         assert_eq!(err.to_string(), expected);
 
         // ambiguous column reference
@@ -375,7 +416,7 @@ mod tests {
                 &[],
             )
             .expect_err("should've found ambiguous field");
-        let expected = "Schema error: Ambiguous reference to unqualified field 
'a'";
+        let expected = "Schema error: Ambiguous reference to unqualified field 
\"a\"";
         assert_eq!(err.to_string(), expected);
 
         Ok(())
diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index bb443f8aa..52a0b3d40 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -23,8 +23,9 @@ use std::convert::TryFrom;
 use std::hash::Hash;
 use std::sync::Arc;
 
-use crate::error::{DataFusionError, Result, SchemaError};
-use crate::{field_not_found, Column, TableReference};
+use crate::error::{unqualified_field_not_found, DataFusionError, Result, 
SchemaError};
+use crate::utils::quote_identifier;
+use crate::{field_not_found, Column, OwnedTableReference, TableReference};
 
 use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -70,7 +71,7 @@ impl DFSchema {
                 if !qualified_names.insert((qualifier, field.name())) {
                     return Err(DataFusionError::SchemaError(
                         SchemaError::DuplicateQualifiedField {
-                            qualifier: qualifier.to_string(),
+                            qualifier: Box::new(qualifier.clone()),
                             name: field.name().to_string(),
                         },
                     ));
@@ -90,18 +91,16 @@ impl DFSchema {
         let mut qualified_names = qualified_names
             .iter()
             .map(|(l, r)| (l.to_owned(), r.to_owned()))
-            .collect::<Vec<(&String, &String)>>();
-        qualified_names.sort_by(|a, b| {
-            let a = format!("{}.{}", a.0, a.1);
-            let b = format!("{}.{}", b.0, b.1);
-            a.cmp(&b)
-        });
+            .collect::<Vec<(&OwnedTableReference, &String)>>();
+        qualified_names.sort();
         for (qualifier, name) in &qualified_names {
             if unqualified_names.contains(name) {
                 return Err(DataFusionError::SchemaError(
                     SchemaError::AmbiguousReference {
-                        qualifier: Some(qualifier.to_string()),
-                        name: name.to_string(),
+                        field: Column {
+                            relation: Some((*qualifier).clone()),
+                            name: name.to_string(),
+                        },
                     },
                 ));
             }
@@ -115,7 +114,7 @@ impl DFSchema {
             schema
                 .fields()
                 .iter()
-                .map(|f| DFField::from_qualified(qualifier, f.clone()))
+                .map(|f| DFField::from_qualified(qualifier.to_string(), 
f.clone()))
                 .collect(),
             schema.metadata().clone(),
         )
@@ -140,7 +139,7 @@ impl DFSchema {
         for field in other_schema.fields() {
             // skip duplicate columns
             let duplicated_field = match field.qualifier() {
-                Some(q) => self.field_with_name(Some(q.as_str()), 
field.name()).is_ok(),
+                Some(q) => self.field_with_name(Some(q), field.name()).is_ok(),
                 // for unqualified columns, check as unqualified name
                 None => self.field_with_unqualified_name(field.name()).is_ok(),
             };
@@ -173,7 +172,7 @@ impl DFSchema {
                 // a fully qualified field name is provided.
                 match &self.fields[i].qualifier {
                     Some(qualifier) => {
-                        if (qualifier.to_owned() + "." + 
self.fields[i].name()) == name {
+                        if (qualifier.to_string() + "." + 
self.fields[i].name()) == name {
                             return Err(DataFusionError::Plan(format!(
                                 "Fully qualified field name '{name}' was 
supplied to `index_of` \
                                 which is deprecated. Please use 
`index_of_column_by_name` instead"
@@ -185,12 +184,12 @@ impl DFSchema {
             }
         }
 
-        Err(field_not_found(None, name, self))
+        Err(unqualified_field_not_found(name, self))
     }
 
     pub fn index_of_column_by_name(
         &self,
-        qualifier: Option<&str>,
+        qualifier: Option<&TableReference>,
         name: &str,
     ) -> Result<Option<usize>> {
         let mut matches = self
@@ -201,19 +200,19 @@ impl DFSchema {
                 // field to lookup is qualified.
                 // current field is qualified and not shared between 
relations, compare both
                 // qualifier and name.
-                (Some(q), Some(field_q)) => q == field_q && field.name() == 
name,
+                (Some(q), Some(field_q)) => {
+                    q.resolved_eq(field_q) && field.name() == name
+                }
                 // field to lookup is qualified but current field is 
unqualified.
                 (Some(qq), None) => {
                     // the original field may now be aliased with a name that 
matches the
                     // original qualified name
-                    let table_ref = 
TableReference::parse_str(field.name().as_str());
-                    match table_ref {
-                        TableReference::Partial { schema, table } => {
-                            schema == qq && table == name
-                        }
-                        TableReference::Full { schema, table, .. } => {
-                            schema == qq && table == name
-                        }
+                    let column = Column::from_qualified_name(field.name());
+                    match column {
+                        Column {
+                            relation: Some(r),
+                            name: column_name,
+                        } => &r == qq && column_name == name,
                         _ => false,
                     }
                 }
@@ -227,9 +226,11 @@ impl DFSchema {
                 None => Ok(Some(idx)),
                 // found more than one matches
                 Some(_) => Err(DataFusionError::Internal(format!(
-                    "Ambiguous reference to qualified field named '{}.{}'",
-                    qualifier.unwrap_or("<unqualified>"),
-                    name
+                    "Ambiguous reference to qualified field named {}.{}",
+                    qualifier
+                        .map(|q| q.to_quoted_string())
+                        .unwrap_or("<unqualified>".to_string()),
+                    quote_identifier(name)
                 ))),
             },
         }
@@ -237,23 +238,20 @@ impl DFSchema {
 
     /// Find the index of the column with the given qualifier and name
     pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        let qualifier = col.relation.as_deref();
-        self.index_of_column_by_name(col.relation.as_deref(), &col.name)?
-            .ok_or_else(|| {
-                field_not_found(qualifier.map(|s| s.to_string()), &col.name, 
self)
-            })
+        self.index_of_column_by_name(col.relation.as_ref(), &col.name)?
+            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, 
self))
     }
 
     /// Check if the column is in the current schema
     pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
-        self.index_of_column_by_name(col.relation.as_deref(), &col.name)
+        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
             .map(|idx| idx.is_some())
     }
 
     /// Find the field with the given name
     pub fn field_with_name(
         &self,
-        qualifier: Option<&str>,
+        qualifier: Option<&TableReference>,
         name: &str,
     ) -> Result<&DFField> {
         if let Some(qualifier) = qualifier {
@@ -264,7 +262,7 @@ impl DFSchema {
     }
 
     /// Find all fields having the given qualifier
-    pub fn fields_with_qualified(&self, qualifier: &str) -> Vec<&DFField> {
+    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> 
Vec<&DFField> {
         self.fields
             .iter()
             .filter(|field| field.qualifier().map(|q| 
q.eq(qualifier)).unwrap_or(false))
@@ -283,7 +281,7 @@ impl DFSchema {
     pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
         let matches = self.fields_with_unqualified_name(name);
         match matches.len() {
-            0 => Err(field_not_found(None, name, self)),
+            0 => Err(unqualified_field_not_found(name, self)),
             1 => Ok(matches[0]),
             _ => {
                 // When `matches` size > 1, it doesn't necessarily mean an 
`ambiguous name` problem.
@@ -302,8 +300,10 @@ impl DFSchema {
                 } else {
                     Err(DataFusionError::SchemaError(
                         SchemaError::AmbiguousReference {
-                            qualifier: None,
-                            name: name.to_string(),
+                            field: Column {
+                                relation: None,
+                                name: name.to_string(),
+                            },
                         },
                     ))
                 }
@@ -314,7 +314,7 @@ impl DFSchema {
     /// Find the field with the given qualified name
     pub fn field_with_qualified_name(
         &self,
-        qualifier: &str,
+        qualifier: &TableReference,
         name: &str,
     ) -> Result<&DFField> {
         let idx = self
@@ -338,7 +338,11 @@ impl DFSchema {
     }
 
     /// Find if the field exists with the given qualified name
-    pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str) 
-> bool {
+    pub fn has_column_with_qualified_name(
+        &self,
+        qualifier: &TableReference,
+        name: &str,
+    ) -> bool {
         self.fields().iter().any(|field| {
             field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)
                 && field.name() == name
@@ -452,12 +456,13 @@ impl DFSchema {
     }
 
     /// Replace all field qualifier with new value in schema
-    pub fn replace_qualifier(self, qualifier: &str) -> Self {
+    pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) 
-> Self {
+        let qualifier = qualifier.into();
         DFSchema {
             fields: self
                 .fields
                 .into_iter()
-                .map(|f| DFField::from_qualified(qualifier, f.field))
+                .map(|f| DFField::from_qualified(qualifier.clone(), f.field))
                 .collect(),
             ..self
         }
@@ -621,21 +626,29 @@ impl ExprSchema for DFSchema {
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
-    qualifier: Option<String>,
+    qualifier: Option<OwnedTableReference>,
     /// Arrow field definition
     field: Field,
 }
 
 impl DFField {
     /// Creates a new `DFField`
-    pub fn new(
-        qualifier: Option<&str>,
+    pub fn new<R: Into<OwnedTableReference>>(
+        qualifier: Option<R>,
         name: &str,
         data_type: DataType,
         nullable: bool,
     ) -> Self {
         DFField {
-            qualifier: qualifier.map(|s| s.to_owned()),
+            qualifier: qualifier.map(|s| s.into()),
+            field: Field::new(name, data_type, nullable),
+        }
+    }
+
+    /// Convenience method for creating new `DFField` without a qualifier
+    pub fn new_unqualified(name: &str, data_type: DataType, nullable: bool) -> 
Self {
+        DFField {
+            qualifier: None,
             field: Field::new(name, data_type, nullable),
         }
     }
@@ -649,9 +662,12 @@ impl DFField {
     }
 
     /// Create a qualified field from an existing Arrow field
-    pub fn from_qualified(qualifier: &str, field: Field) -> Self {
+    pub fn from_qualified(
+        qualifier: impl Into<OwnedTableReference>,
+        field: Field,
+    ) -> Self {
         Self {
-            qualifier: Some(qualifier.to_owned()),
+            qualifier: Some(qualifier.into()),
             field,
         }
     }
@@ -697,7 +713,7 @@ impl DFField {
     }
 
     /// Get the optional qualifier
-    pub fn qualifier(&self) -> Option<&String> {
+    pub fn qualifier(&self) -> Option<&OwnedTableReference> {
         self.qualifier.as_ref()
     }
 
@@ -723,9 +739,9 @@ mod tests {
         let col = Column::from_name("t1.c0");
         let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
         // lookup with unqualified name "t1.c0"
-        let err = schema.index_of_column(&col).err().unwrap();
+        let err = schema.index_of_column(&col).unwrap_err();
         assert_eq!(
-            "Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 
't1'.'c1'.",
+            r#"Schema error: No field named "t1.c0". Valid fields are 
"t1"."c0", "t1"."c1"."#,
             &format!("{err}")
         );
         Ok(())
@@ -781,8 +797,12 @@ mod tests {
             join.to_string()
         );
         // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
-        assert!(join.field_with_qualified_name("t2", "c0").is_ok());
+        assert!(join
+            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
+            .is_ok());
+        assert!(join
+            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
+            .is_ok());
         // test invalid access
         assert!(join.field_with_unqualified_name("c0").is_err());
         assert!(join.field_with_unqualified_name("t1.c0").is_err());
@@ -798,7 +818,7 @@ mod tests {
         assert!(join.is_err());
         assert_eq!(
             "Schema error: Schema contains duplicate \
-        qualified field name \'t1\'.\'c0\'",
+        qualified field name \"t1\".\"c0\"",
             &format!("{}", join.err().unwrap())
         );
         Ok(())
@@ -812,7 +832,7 @@ mod tests {
         assert!(join.is_err());
         assert_eq!(
             "Schema error: Schema contains duplicate \
-        unqualified field name \'c0\'",
+        unqualified field name \"c0\"",
             &format!("{}", join.err().unwrap())
         );
         Ok(())
@@ -828,14 +848,18 @@ mod tests {
             join.to_string()
         );
         // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
+        assert!(join
+            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
+            .is_ok());
         assert!(join.field_with_unqualified_name("c0").is_ok());
         assert!(join.field_with_unqualified_name("c100").is_ok());
         assert!(join.field_with_name(None, "c100").is_ok());
         // test invalid access
         assert!(join.field_with_unqualified_name("t1.c0").is_err());
         assert!(join.field_with_unqualified_name("t1.c100").is_err());
-        assert!(join.field_with_qualified_name("", "c100").is_err());
+        assert!(join
+            .field_with_qualified_name(&TableReference::bare(""), "c100")
+            .is_err());
         Ok(())
     }
 
@@ -847,7 +871,7 @@ mod tests {
         assert!(join.is_err());
         assert_eq!(
             "Schema error: Schema contains qualified \
-        field name \'t1\'.\'c0\' and unqualified field name \'c0\' which would 
be ambiguous",
+        field name \"t1\".\"c0\" and unqualified field name \"c0\" which would 
be ambiguous",
             &format!("{}", join.err().unwrap())
         );
         Ok(())
@@ -857,11 +881,11 @@ mod tests {
     #[test]
     fn helpful_error_messages() -> Result<()> {
         let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
-        let expected_help = "Valid fields are \'t1\'.\'c0\', \'t1\'.\'c1\'.";
+        let expected_help = "Valid fields are \"t1\".\"c0\", \"t1\".\"c1\".";
         // Pertinent message parts
-        let expected_err_msg = "Fully qualified field name \'t1.c0\'";
+        let expected_err_msg = "Fully qualified field name 't1.c0'";
         assert!(schema
-            .field_with_qualified_name("x", "y")
+            .field_with_qualified_name(&TableReference::bare("x"), "y")
             .unwrap_err()
             .to_string()
             .contains(expected_help));
@@ -889,12 +913,15 @@ mod tests {
 
         let col = Column::from_qualified_name("t1.c0");
         let err = schema.index_of_column(&col).err().unwrap();
-        assert_eq!("Schema error: No field named 't1'.'c0'.", 
&format!("{err}"));
+        assert_eq!(
+            r#"Schema error: No field named "t1"."c0"."#,
+            &format!("{err}")
+        );
 
         // the same check without qualifier
         let col = Column::from_name("c0");
         let err = schema.index_of_column(&col).err().unwrap();
-        assert_eq!("Schema error: No field named 'c0'.", &format!("{err}"));
+        assert_eq!(r#"Schema error: No field named "c0"."#, &format!("{err}"));
     }
 
     #[test]
@@ -1127,7 +1154,7 @@ mod tests {
         let arrow_schema_ref = Arc::new(arrow_schema.clone());
 
         let df_schema = DFSchema::new_with_metadata(
-            vec![DFField::new(None, "c0", DataType::Int64, true)],
+            vec![DFField::new_unqualified("c0", DataType::Int64, true)],
             metadata,
         )
         .unwrap();
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 3f10c1261..0231895f7 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -23,7 +23,8 @@ use std::io;
 use std::result;
 use std::sync::Arc;
 
-use crate::{Column, DFSchema};
+use crate::utils::quote_identifier;
+use crate::{Column, DFSchema, OwnedTableReference};
 #[cfg(feature = "avro")]
 use apache_avro::Error as AvroError;
 use arrow::error::ArrowError;
@@ -120,29 +121,41 @@ macro_rules! plan_err {
 #[derive(Debug)]
 pub enum SchemaError {
     /// Schema contains a (possibly) qualified and unqualified field with same 
unqualified name
-    AmbiguousReference {
-        qualifier: Option<String>,
+    AmbiguousReference { field: Column },
+    /// Schema contains duplicate qualified field name
+    DuplicateQualifiedField {
+        qualifier: Box<OwnedTableReference>,
         name: String,
     },
-    /// Schema contains duplicate qualified field name
-    DuplicateQualifiedField { qualifier: String, name: String },
     /// Schema contains duplicate unqualified field name
     DuplicateUnqualifiedField { name: String },
     /// No field with this name
     FieldNotFound {
-        field: Column,
+        field: Box<Column>,
         valid_fields: Vec<Column>,
     },
 }
 
 /// Create a "field not found" DataFusion::SchemaError
-pub fn field_not_found(
-    qualifier: Option<String>,
+pub fn field_not_found<R: Into<OwnedTableReference>>(
+    qualifier: Option<R>,
     name: &str,
     schema: &DFSchema,
 ) -> DataFusionError {
     DataFusionError::SchemaError(SchemaError::FieldNotFound {
-        field: Column::new(qualifier, name),
+        field: Box::new(Column::new(qualifier, name)),
+        valid_fields: schema
+            .fields()
+            .iter()
+            .map(|f| f.qualified_column())
+            .collect(),
+    })
+}
+
+/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
+pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> 
DataFusionError {
+    DataFusionError::SchemaError(SchemaError::FieldNotFound {
+        field: Box::new(Column::new_unqualified(name)),
         valid_fields: schema
             .fields()
             .iter()
@@ -158,25 +171,14 @@ impl Display for SchemaError {
                 field,
                 valid_fields,
             } => {
-                write!(f, "No field named ")?;
-                if let Some(q) = &field.relation {
-                    write!(f, "'{}'.'{}'", q, field.name)?;
-                } else {
-                    write!(f, "'{}'", field.name)?;
-                }
+                write!(f, "No field named {}", field.quoted_flat_name())?;
                 if !valid_fields.is_empty() {
                     write!(
                         f,
                         ". Valid fields are {}",
                         valid_fields
                             .iter()
-                            .map(|field| {
-                                if let Some(q) = &field.relation {
-                                    format!("'{}'.'{}'", q, field.name)
-                                } else {
-                                    format!("'{}'", field.name)
-                                }
-                            })
+                            .map(|field| field.quoted_flat_name())
                             .collect::<Vec<String>>()
                             .join(", ")
                     )?;
@@ -186,20 +188,32 @@ impl Display for SchemaError {
             Self::DuplicateQualifiedField { qualifier, name } => {
                 write!(
                     f,
-                    "Schema contains duplicate qualified field name 
'{qualifier}'.'{name}'"
+                    "Schema contains duplicate qualified field name {}.{}",
+                    qualifier.to_quoted_string(),
+                    quote_identifier(name)
                 )
             }
             Self::DuplicateUnqualifiedField { name } => {
                 write!(
                     f,
-                    "Schema contains duplicate unqualified field name '{name}'"
+                    "Schema contains duplicate unqualified field name {}",
+                    quote_identifier(name)
                 )
             }
-            Self::AmbiguousReference { qualifier, name } => {
-                if let Some(q) = qualifier {
-                    write!(f, "Schema contains qualified field name 
'{q}'.'{name}' and unqualified field name '{name}' which would be ambiguous")
+            Self::AmbiguousReference { field } => {
+                if field.relation.is_some() {
+                    write!(
+                        f,
+                        "Schema contains qualified field name {} and 
unqualified field name {} which would be ambiguous",
+                        field.quoted_flat_name(),
+                        quote_identifier(&field.name)
+                    )
                 } else {
-                    write!(f, "Ambiguous reference to unqualified field 
'{name}'")
+                    write!(
+                        f,
+                        "Ambiguous reference to unqualified field {}",
+                        field.quoted_flat_name()
+                    )
                 }
             }
         }
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 636feb21a..4af8720b0 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -34,7 +34,10 @@ pub mod utils;
 use arrow::compute::SortOptions;
 pub use column::Column;
 pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
-pub use error::{field_not_found, DataFusionError, Result, SchemaError, 
SharedResult};
+pub use error::{
+    field_not_found, unqualified_field_not_found, DataFusionError, Result, 
SchemaError,
+    SharedResult,
+};
 pub use parsers::parse_interval;
 pub use scalar::{ScalarType, ScalarValue};
 pub use stats::{ColumnStatistics, Statistics};
diff --git a/datafusion/common/src/table_reference.rs 
b/datafusion/common/src/table_reference.rs
index 34656bc11..257073681 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -15,13 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::Result;
-use sqlparser::{
-    ast::Ident,
-    dialect::GenericDialect,
-    parser::{Parser, ParserError},
-    tokenizer::{Token, TokenWithLocation},
-};
+use crate::utils::{parse_identifiers_normalized, quote_identifier};
 use std::borrow::Cow;
 
 /// A resolved path to a table of the form "catalog.schema.table"
@@ -42,7 +36,7 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
 }
 
 /// Represents a path to a table that may require further resolution
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
 pub enum TableReference<'a> {
     /// An unqualified table reference, e.g. "table"
     Bare {
@@ -67,53 +61,53 @@ pub enum TableReference<'a> {
     },
 }
 
-/// Represents a path to a table that may require further resolution
-/// that owns the underlying names
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum OwnedTableReference {
-    /// An unqualified table reference, e.g. "table"
-    Bare {
-        /// The table name
-        table: String,
-    },
-    /// A partially resolved table reference, e.g. "schema.table"
-    Partial {
-        /// The schema containing the table
-        schema: String,
-        /// The table name
-        table: String,
-    },
-    /// A fully resolved table reference, e.g. "catalog.schema.table"
-    Full {
-        /// The catalog (aka database) containing the table
-        catalog: String,
-        /// The schema containing the table
-        schema: String,
-        /// The table name
-        table: String,
-    },
-}
+pub type OwnedTableReference = TableReference<'static>;
 
-impl OwnedTableReference {
-    /// Return a `TableReference` view of this `OwnedTableReference`
-    pub fn as_table_reference(&self) -> TableReference<'_> {
+impl std::fmt::Display for TableReference<'_> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
-            Self::Bare { table } => TableReference::Bare {
-                table: table.into(),
-            },
-            Self::Partial { schema, table } => TableReference::Partial {
-                schema: schema.into(),
-                table: table.into(),
-            },
-            Self::Full {
+            TableReference::Bare { table } => write!(f, "{table}"),
+            TableReference::Partial { schema, table } => {
+                write!(f, "{schema}.{table}")
+            }
+            TableReference::Full {
                 catalog,
                 schema,
                 table,
-            } => TableReference::Full {
-                catalog: catalog.into(),
-                schema: schema.into(),
-                table: table.into(),
-            },
+            } => write!(f, "{catalog}.{schema}.{table}"),
+        }
+    }
+}
+
+impl<'a> TableReference<'a> {
+    /// Convenience method for creating a `Bare` variant of `TableReference`
+    pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
+        TableReference::Bare {
+            table: table.into(),
+        }
+    }
+
+    /// Convenience method for creating a `Partial` variant of `TableReference`
+    pub fn partial(
+        schema: impl Into<Cow<'a, str>>,
+        table: impl Into<Cow<'a, str>>,
+    ) -> TableReference<'a> {
+        TableReference::Partial {
+            schema: schema.into(),
+            table: table.into(),
+        }
+    }
+
+    /// Convenience method for creating a `Full` variant of `TableReference`
+    pub fn full(
+        catalog: impl Into<Cow<'a, str>>,
+        schema: impl Into<Cow<'a, str>>,
+        table: impl Into<Cow<'a, str>>,
+    ) -> TableReference<'a> {
+        TableReference::Full {
+            catalog: catalog.into(),
+            schema: schema.into(),
+            table: table.into(),
         }
     }
 
@@ -125,39 +119,44 @@ impl OwnedTableReference {
             | Self::Bare { table } => table,
         }
     }
-}
 
-impl std::fmt::Display for OwnedTableReference {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+    /// Retrieve the schema name if in the `Partial` or `Full` qualification
+    pub fn schema(&self) -> Option<&str> {
         match self {
-            OwnedTableReference::Bare { table } => write!(f, "{table}"),
-            OwnedTableReference::Partial { schema, table } => {
-                write!(f, "{schema}.{table}")
-            }
-            OwnedTableReference::Full {
-                catalog,
-                schema,
-                table,
-            } => write!(f, "{catalog}.{schema}.{table}"),
+            Self::Full { schema, .. } | Self::Partial { schema, .. } => 
Some(schema),
+            _ => None,
         }
     }
-}
 
-/// Convert `OwnedTableReference` into a `TableReference`. Somewhat
-/// awkward to use but 'idiomatic': `(&table_ref).into()`
-impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
-    fn from(r: &'a OwnedTableReference) -> Self {
-        r.as_table_reference()
+    /// Retrieve the catalog name if in the `Full` qualification
+    pub fn catalog(&self) -> Option<&str> {
+        match self {
+            Self::Full { catalog, .. } => Some(catalog),
+            _ => None,
+        }
     }
-}
 
-impl<'a> TableReference<'a> {
-    /// Retrieve the actual table name, regardless of qualification
-    pub fn table(&self) -> &str {
+    /// Compare with another `TableReference` as if both are resolved.
+    /// This allows comparing across variants, where if a field is not present
+    /// in both variants being compared then it is ignored in the comparison.
+    ///
+    /// e.g. this allows a `TableReference::Bare` to be considered equal to a
+    /// fully qualified `TableReference::Full` if the table names match.
+    pub fn resolved_eq(&self, other: &Self) -> bool {
         match self {
-            Self::Full { table, .. }
-            | Self::Partial { table, .. }
-            | Self::Bare { table } => table,
+            TableReference::Bare { table } => table == other.table(),
+            TableReference::Partial { schema, table } => {
+                table == other.table() && other.schema().map_or(true, |s| s == 
schema)
+            }
+            TableReference::Full {
+                catalog,
+                schema,
+                table,
+            } => {
+                table == other.table()
+                    && other.schema().map_or(true, |s| s == schema)
+                    && other.catalog().map_or(true, |c| c == catalog)
+            }
         }
     }
 
@@ -190,6 +189,48 @@ impl<'a> TableReference<'a> {
         }
     }
 
+    /// Converts directly into an [`OwnedTableReference`]
+    pub fn to_owned_reference(&self) -> OwnedTableReference {
+        match self {
+            Self::Full {
+                catalog,
+                schema,
+                table,
+            } => OwnedTableReference::Full {
+                catalog: catalog.to_string().into(),
+                schema: schema.to_string().into(),
+                table: table.to_string().into(),
+            },
+            Self::Partial { schema, table } => OwnedTableReference::Partial {
+                schema: schema.to_string().into(),
+                table: table.to_string().into(),
+            },
+            Self::Bare { table } => OwnedTableReference::Bare {
+                table: table.to_string().into(),
+            },
+        }
+    }
+
+    /// Forms a string where the identifiers are quoted
+    pub fn to_quoted_string(&self) -> String {
+        match self {
+            TableReference::Bare { table } => quote_identifier(table),
+            TableReference::Partial { schema, table } => {
+                format!("{}.{}", quote_identifier(schema), 
quote_identifier(table))
+            }
+            TableReference::Full {
+                catalog,
+                schema,
+                table,
+            } => format!(
+                "{}.{}.{}",
+                quote_identifier(catalog),
+                quote_identifier(schema),
+                quote_identifier(table)
+            ),
+        }
+    }
+
     /// Forms a [`TableReference`] by attempting to parse `s` as a multipart 
identifier,
     /// failing that then taking the entire unnormalized input as the 
identifier itself.
     ///
@@ -199,14 +240,7 @@ impl<'a> TableReference<'a> {
     /// `Foo".bar` (note the preserved case and requiring two double quotes to 
represent
     /// a single double quote in the identifier)
     pub fn parse_str(s: &'a str) -> Self {
-        let mut parts = parse_identifiers(s)
-            .unwrap_or_default()
-            .into_iter()
-            .map(|id| match id.quote_style {
-                Some(_) => id.value,
-                None => id.value.to_ascii_lowercase(),
-            })
-            .collect::<Vec<_>>();
+        let mut parts = parse_identifiers_normalized(s);
 
         match parts.len() {
             1 => Self::Bare {
@@ -226,57 +260,34 @@ impl<'a> TableReference<'a> {
     }
 }
 
-// TODO: remove when can use 
https://github.com/sqlparser-rs/sqlparser-rs/issues/805
-fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
-    let dialect = GenericDialect;
-    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
-    let mut idents = vec![];
-
-    // expecting at least one word for identifier
-    match parser.next_token_no_skip() {
-        Some(TokenWithLocation {
-            token: Token::Word(w),
-            ..
-        }) => idents.push(w.to_ident()),
-        Some(TokenWithLocation { token, .. }) => {
-            return Err(ParserError::ParserError(format!(
-                "Unexpected token in identifier: {token}"
-            )))?
-        }
-        None => {
-            return Err(ParserError::ParserError(
-                "Empty input when parsing identifier".to_string(),
-            ))?
-        }
-    };
+/// Parse a `String` into a OwnedTableReference
+impl From<String> for OwnedTableReference {
+    fn from(s: String) -> Self {
+        TableReference::parse_str(&s).to_owned_reference()
+    }
+}
 
-    while let Some(TokenWithLocation { token, .. }) = 
parser.next_token_no_skip() {
-        match token {
-            // ensure that optional period is succeeded by another identifier
-            Token::Period => match parser.next_token_no_skip() {
-                Some(TokenWithLocation {
-                    token: Token::Word(w),
-                    ..
-                }) => idents.push(w.to_ident()),
-                Some(TokenWithLocation { token, .. }) => {
-                    return Err(ParserError::ParserError(format!(
-                        "Unexpected token following period in identifier: 
{token}"
-                    )))?
-                }
-                None => {
-                    return Err(ParserError::ParserError(
-                        "Trailing period in identifier".to_string(),
-                    ))?
-                }
+impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
+    fn from(value: &'a OwnedTableReference) -> Self {
+        match value {
+            OwnedTableReference::Bare { table } => TableReference::Bare {
+                table: Cow::Borrowed(table),
+            },
+            OwnedTableReference::Partial { schema, table } => 
TableReference::Partial {
+                schema: Cow::Borrowed(schema),
+                table: Cow::Borrowed(table),
+            },
+            OwnedTableReference::Full {
+                catalog,
+                schema,
+                table,
+            } => TableReference::Full {
+                catalog: Cow::Borrowed(catalog),
+                schema: Cow::Borrowed(schema),
+                table: Cow::Borrowed(table),
             },
-            _ => {
-                return Err(ParserError::ParserError(format!(
-                    "Unexpected token in identifier: {token}"
-                )))?
-            }
         }
     }
-    Ok(idents)
 }
 
 /// Parse a string into a TableReference, normalizing where appropriate
@@ -288,6 +299,12 @@ impl<'a> From<&'a str> for TableReference<'a> {
     }
 }
 
+impl<'a> From<&'a String> for TableReference<'a> {
+    fn from(s: &'a String) -> Self {
+        Self::parse_str(s)
+    }
+}
+
 impl<'a> From<ResolvedTableReference<'a>> for TableReference<'a> {
     fn from(resolved: ResolvedTableReference<'a>) -> Self {
         Self::Full {
@@ -302,64 +319,6 @@ impl<'a> From<ResolvedTableReference<'a>> for 
TableReference<'a> {
 mod tests {
     use super::*;
 
-    #[test]
-    fn test_parse_identifiers() -> Result<()> {
-        let s = "CATALOG.\"F(o)o. \"\"bar\".table";
-        let actual = parse_identifiers(s)?;
-        let expected = vec![
-            Ident {
-                value: "CATALOG".to_string(),
-                quote_style: None,
-            },
-            Ident {
-                value: "F(o)o. \"bar".to_string(),
-                quote_style: Some('"'),
-            },
-            Ident {
-                value: "table".to_string(),
-                quote_style: None,
-            },
-        ];
-        assert_eq!(expected, actual);
-
-        let s = "";
-        let err = parse_identifiers(s).expect_err("didn't fail to parse");
-        assert_eq!(
-            "SQL(ParserError(\"Empty input when parsing identifier\"))",
-            format!("{err:?}")
-        );
-
-        let s = "*schema.table";
-        let err = parse_identifiers(s).expect_err("didn't fail to parse");
-        assert_eq!(
-            "SQL(ParserError(\"Unexpected token in identifier: *\"))",
-            format!("{err:?}")
-        );
-
-        let s = "schema.table*";
-        let err = parse_identifiers(s).expect_err("didn't fail to parse");
-        assert_eq!(
-            "SQL(ParserError(\"Unexpected token in identifier: *\"))",
-            format!("{err:?}")
-        );
-
-        let s = "schema.table.";
-        let err = parse_identifiers(s).expect_err("didn't fail to parse");
-        assert_eq!(
-            "SQL(ParserError(\"Trailing period in identifier\"))",
-            format!("{err:?}")
-        );
-
-        let s = "schema.*";
-        let err = parse_identifiers(s).expect_err("didn't fail to parse");
-        assert_eq!(
-            "SQL(ParserError(\"Unexpected token following period in 
identifier: *\"))",
-            format!("{err:?}")
-        );
-
-        Ok(())
-    }
-
     #[test]
     fn test_table_reference_from_str_normalizes() {
         let expected = TableReference::Full {
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 3c0730153..a1226def8 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -20,6 +20,10 @@
 use crate::{DataFusionError, Result, ScalarValue};
 use arrow::array::ArrayRef;
 use arrow::compute::SortOptions;
+use sqlparser::ast::Ident;
+use sqlparser::dialect::GenericDialect;
+use sqlparser::parser::{Parser, ParserError};
+use sqlparser::tokenizer::{Token, TokenWithLocation};
 use std::cmp::Ordering;
 
 /// Given column vectors, returns row at `idx`.
@@ -158,6 +162,78 @@ where
     Ok(low)
 }
 
+/// Wraps identifier string in double quotes, escaping any double quotes in
+/// the identifier by replacing it with two double quotes
+///
+/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
+pub fn quote_identifier(s: &str) -> String {
+    format!("\"{}\"", s.replace('"', "\"\""))
+}
+
+// TODO: remove when can use 
https://github.com/sqlparser-rs/sqlparser-rs/issues/805
+pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
+    let dialect = GenericDialect;
+    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
+    let mut idents = vec![];
+
+    // expecting at least one word for identifier
+    match parser.next_token_no_skip() {
+        Some(TokenWithLocation {
+            token: Token::Word(w),
+            ..
+        }) => idents.push(w.to_ident()),
+        Some(TokenWithLocation { token, .. }) => {
+            return Err(ParserError::ParserError(format!(
+                "Unexpected token in identifier: {token}"
+            )))?
+        }
+        None => {
+            return Err(ParserError::ParserError(
+                "Empty input when parsing identifier".to_string(),
+            ))?
+        }
+    };
+
+    while let Some(TokenWithLocation { token, .. }) = 
parser.next_token_no_skip() {
+        match token {
+            // ensure that optional period is succeeded by another identifier
+            Token::Period => match parser.next_token_no_skip() {
+                Some(TokenWithLocation {
+                    token: Token::Word(w),
+                    ..
+                }) => idents.push(w.to_ident()),
+                Some(TokenWithLocation { token, .. }) => {
+                    return Err(ParserError::ParserError(format!(
+                        "Unexpected token following period in identifier: 
{token}"
+                    )))?
+                }
+                None => {
+                    return Err(ParserError::ParserError(
+                        "Trailing period in identifier".to_string(),
+                    ))?
+                }
+            },
+            _ => {
+                return Err(ParserError::ParserError(format!(
+                    "Unexpected token in identifier: {token}"
+                )))?
+            }
+        }
+    }
+    Ok(idents)
+}
+
+pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
+    parse_identifiers(s)
+        .unwrap_or_default()
+        .into_iter()
+        .map(|id| match id.quote_style {
+            Some(_) => id.value,
+            None => id.value.to_ascii_lowercase(),
+        })
+        .collect::<Vec<_>>()
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::array::Float64Array;
@@ -330,4 +406,62 @@ mod tests {
         assert_eq!(res, 2);
         Ok(())
     }
+
+    #[test]
+    fn test_parse_identifiers() -> Result<()> {
+        let s = "CATALOG.\"F(o)o. \"\"bar\".table";
+        let actual = parse_identifiers(s)?;
+        let expected = vec![
+            Ident {
+                value: "CATALOG".to_string(),
+                quote_style: None,
+            },
+            Ident {
+                value: "F(o)o. \"bar".to_string(),
+                quote_style: Some('"'),
+            },
+            Ident {
+                value: "table".to_string(),
+                quote_style: None,
+            },
+        ];
+        assert_eq!(expected, actual);
+
+        let s = "";
+        let err = parse_identifiers(s).expect_err("didn't fail to parse");
+        assert_eq!(
+            "SQL(ParserError(\"Empty input when parsing identifier\"))",
+            format!("{err:?}")
+        );
+
+        let s = "*schema.table";
+        let err = parse_identifiers(s).expect_err("didn't fail to parse");
+        assert_eq!(
+            "SQL(ParserError(\"Unexpected token in identifier: *\"))",
+            format!("{err:?}")
+        );
+
+        let s = "schema.table*";
+        let err = parse_identifiers(s).expect_err("didn't fail to parse");
+        assert_eq!(
+            "SQL(ParserError(\"Unexpected token in identifier: *\"))",
+            format!("{err:?}")
+        );
+
+        let s = "schema.table.";
+        let err = parse_identifiers(s).expect_err("didn't fail to parse");
+        assert_eq!(
+            "SQL(ParserError(\"Trailing period in identifier\"))",
+            format!("{err:?}")
+        );
+
+        let s = "schema.*";
+        let err = parse_identifiers(s).expect_err("didn't fail to parse");
+        assert_eq!(
+            "SQL(ParserError(\"Unexpected token following period in 
identifier: *\"))",
+            format!("{err:?}")
+        );
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/catalog/listing_schema.rs 
b/datafusion/core/src/catalog/listing_schema.rs
index 32ee9f62a..3ea7a1098 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -128,9 +128,7 @@ impl ListingSchemaProvider {
             if !self.table_exist(table_name) {
                 let table_url = format!("{}/{}", self.authority, table_path);
 
-                let name = OwnedTableReference::Bare {
-                    table: table_name.to_string(),
-                };
+                let name = OwnedTableReference::bare(table_name.to_string());
                 let provider = self
                     .factory
                     .create(
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index eab56432e..af41725db 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -1372,7 +1372,7 @@ mod tests {
         let join = left
             .join_on(right, JoinType::Inner, [col("c1").eq(col("c1"))])
             .expect_err("join didn't fail check");
-        let expected = "Schema error: Ambiguous reference to unqualified field 
'c1'";
+        let expected = "Schema error: Ambiguous reference to unqualified field 
\"c1\"";
         assert_eq!(join.to_string(), expected);
 
         Ok(())
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index aa02e4be3..1466bcba0 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -282,7 +282,7 @@ impl SessionContext {
         self.session_id.clone()
     }
 
-    /// Return the [`TableFactoryProvider`] that is registered for the
+    /// Return the [`TableProviderFactory`] that is registered for the
     /// specified file type, if any.
     pub fn table_factory(
         &self,
@@ -1933,7 +1933,7 @@ impl SessionState {
             self.config.options.sql_parser.parse_float_as_decimal;
         for reference in references {
             let table = reference.table();
-            let resolved = 
self.resolve_table_ref(reference.as_table_reference());
+            let resolved = self.resolve_table_ref(&reference);
             if let Entry::Vacant(v) = 
provider.tables.entry(resolved.to_string()) {
                 if let Ok(schema) = self.schema_for_ref(resolved) {
                     if let Some(table) = schema.table(table).await {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2c1bfc9ca..3f3b0bb74 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -204,6 +204,8 @@ impl ParquetExec {
     /// `ParquetRecordBatchStream`. These filters are applied by the
     /// parquet decoder to skip unecessairly decoding other columns
     /// which would not pass the predicate. Defaults to false
+    ///
+    /// [`Expr`]: datafusion_expr::Expr
     pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
         self.pushdown_filters = Some(pushdown_filters);
         self
@@ -219,6 +221,8 @@ impl ParquetExec {
     /// minimize the cost of filter evaluation by reordering the
     /// predicate [`Expr`]s. If false, the predicates are applied in
     /// the same order as specified in the query. Defaults to false.
+    ///
+    /// [`Expr`]: datafusion_expr::Expr
     pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
         self.reorder_filters = Some(reorder_filters);
         self
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 38830c685..0dba182d8 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -2388,7 +2388,7 @@ Internal error: Optimizer rule 'type_coercion' failed due 
to unexpected error: E
             Self {
                 schema: DFSchemaRef::new(
                     DFSchema::new_with_metadata(
-                        vec![DFField::new(None, "a", DataType::Int32, false)],
+                        vec![DFField::new_unqualified("a", DataType::Int32, 
false)],
                         HashMap::new(),
                     )
                     .unwrap(),
@@ -2523,7 +2523,7 @@ Internal error: Optimizer rule 'type_coercion' failed due 
to unexpected error: E
                         .projected_schema
                         .as_ref()
                         .clone()
-                        .replace_qualifier(name);
+                        .replace_qualifier(name.to_string());
                     scan.projected_schema = Arc::new(new_schema);
                     LogicalPlan::TableScan(scan)
                 }
diff --git a/datafusion/core/tests/dataframe.rs 
b/datafusion/core/tests/dataframe.rs
index 19ceebe17..82bd0d844 100644
--- a/datafusion/core/tests/dataframe.rs
+++ b/datafusion/core/tests/dataframe.rs
@@ -249,7 +249,7 @@ async fn sort_on_ambiguous_column() -> Result<()> {
         .sort(vec![col("b").sort(true, true)])
         .unwrap_err();
 
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'b'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"b\"";
     assert_eq!(err.to_string(), expected);
     Ok(())
 }
@@ -268,7 +268,7 @@ async fn group_by_ambiguous_column() -> Result<()> {
         .aggregate(vec![col("b")], vec![max(col("a"))])
         .unwrap_err();
 
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'b'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"b\"";
     assert_eq!(err.to_string(), expected);
     Ok(())
 }
@@ -287,7 +287,7 @@ async fn filter_on_ambiguous_column() -> Result<()> {
         .filter(col("b").eq(lit(1)))
         .unwrap_err();
 
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'b'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"b\"";
     assert_eq!(err.to_string(), expected);
     Ok(())
 }
@@ -306,7 +306,7 @@ async fn select_ambiguous_column() -> Result<()> {
         .select(vec![col("b")])
         .unwrap_err();
 
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'b'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"b\"";
     assert_eq!(err.to_string(), expected);
     Ok(())
 }
diff --git a/datafusion/core/tests/sql/idenfifers.rs 
b/datafusion/core/tests/sql/idenfifers.rs
index a305f23b4..1b57f60bd 100644
--- a/datafusion/core/tests/sql/idenfifers.rs
+++ b/datafusion/core/tests/sql/idenfifers.rs
@@ -211,28 +211,28 @@ async fn case_insensitive_in_sql_errors() {
         .await
         .unwrap_err()
         .to_string();
-    assert_contains!(actual, "No field named 'column1'");
+    assert_contains!(actual, r#"No field named "column1""#);
 
     let actual = ctx
         .sql("SELECT Column1 from test")
         .await
         .unwrap_err()
         .to_string();
-    assert_contains!(actual, "No field named 'column1'");
+    assert_contains!(actual, r#"No field named "column1""#);
 
     let actual = ctx
         .sql("SELECT column1 from test")
         .await
         .unwrap_err()
         .to_string();
-    assert_contains!(actual, "No field named 'column1'");
+    assert_contains!(actual, r#"No field named "column1""#);
 
     let actual = ctx
         .sql(r#"SELECT "column1" from test"#)
         .await
         .unwrap_err()
         .to_string();
-    assert_contains!(actual, "No field named 'column1'");
+    assert_contains!(actual, r#"No field named "column1""#);
 
     // This should pass (note the quotes)
     ctx.sql(r#"SELECT "Column1" from test"#).await.unwrap();
diff --git a/datafusion/core/tests/sql/references.rs 
b/datafusion/core/tests/sql/references.rs
index f006cbb45..335bc6308 100644
--- a/datafusion/core/tests/sql/references.rs
+++ b/datafusion/core/tests/sql/references.rs
@@ -67,7 +67,7 @@ async fn qualified_table_references_and_fields() -> 
Result<()> {
     let error = ctx.sql(sql).await.unwrap_err();
     assert_contains!(
         error.to_string(),
-        "No field named 'f1'.'c1'. Valid fields are 'test'.'f.c1', 
'test'.'test.c2'"
+        r#"No field named "f1"."c1". Valid fields are "test"."f.c1", 
"test"."test.c2""#
     );
 
     // however, enclosing it in double quotes is ok
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt 
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 81ce141a8..25e4195fb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -84,6 +84,30 @@ datafusion information_schema views VIEW
 datafusion public t BASE TABLE
 datafusion public t2 BASE TABLE
 
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE 
tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE 
information_schema.tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE 
datafusion.information_schema.tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
 # Cleanup
 statement ok
 drop table t
diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt 
b/datafusion/core/tests/sqllogictests/test_files/join.slt
index 8d5f57343..49f7cf49b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/join.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/join.slt
@@ -65,7 +65,7 @@ CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
 (55, 'w', 3);
 
 # left semi with wrong where clause
-query error DataFusion error: Schema error: No field named 't2'.'t2_id'. Valid 
fields are 't1'.'t1_id', 't1'.'t1_name', 't1'.'t1_int'.
+query error DataFusion error: Schema error: No field named "t2"."t2_id". Valid 
fields are "t1"."t1_id", "t1"."t1_name", "t1"."t1_int".
 SELECT t1.t1_id,
        t1.t1_name,
        t1.t1_int
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 6465ca80b..ef6f8ac50 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -28,17 +28,47 @@ use arrow::datatypes::DataType;
 use datafusion_common::{Column, Result};
 use std::sync::Arc;
 
-/// Create a column expression based on a qualified or unqualified column name
+/// Create a column expression based on a qualified or unqualified column 
name. Will
+/// normalize unquoted identifiers according to SQL rules (identifiers will 
become lowercase).
 ///
-/// example:
-/// ```
+/// For example:
+///
+/// ```rust
 /// # use datafusion_expr::col;
-/// let c = col("my_column");
+/// let c1 = col("a");
+/// let c2 = col("A");
+/// assert_eq!(c1, c2);
+///
+/// // note how quoting with double quotes preserves the case
+/// let c3 = col(r#""A""#);
+/// assert_ne!(c1, c3);
 /// ```
 pub fn col(ident: impl Into<Column>) -> Expr {
     Expr::Column(ident.into())
 }
 
+/// Create an unqualified column expression from the provided name, without 
normalizing
+/// the column.
+///
+/// For example:
+///
+/// ```rust
+/// # use datafusion_expr::{col, ident};
+/// let c1 = ident("A"); // not normalized staying as column 'A'
+/// let c2 = col("A"); // normalized via SQL rules becoming column 'a'
+/// assert_ne!(c1, c2);
+///
+/// let c3 = col(r#""A""#);
+/// assert_eq!(c1, c3);
+///
+/// let c4 = col("t1.a"); // parses as relation 't1' column 'a'
+/// let c5 = ident("t1.a"); // parses as column 't1.a'
+/// assert_ne!(c4, c5);
+/// ```
+pub fn ident(name: impl Into<String>) -> Expr {
+    Expr::Column(Column::from_name(name))
+}
+
 /// Return a new expression `left <op> right`
 pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
     Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
@@ -652,7 +682,7 @@ mod test {
     #[test]
     fn filter_is_null_and_is_not_null() {
         let col_null = col("col1");
-        let col_not_null = col("col2");
+        let col_not_null = ident("col2");
         assert_eq!(format!("{:?}", col_null.is_null()), "col1 IS NULL");
         assert_eq!(
             format!("{:?}", col_not_null.is_not_null()),
diff --git a/datafusion/expr/src/expr_rewriter.rs 
b/datafusion/expr/src/expr_rewriter.rs
index 3ae9e43e0..ade276bab 100644
--- a/datafusion/expr/src/expr_rewriter.rs
+++ b/datafusion/expr/src/expr_rewriter.rs
@@ -664,7 +664,8 @@ mod test {
     fn normalize_cols_non_exist() {
         // test normalizing columns when the name doesn't exist
         let expr = col("a") + col("b");
-        let schema_a = 
make_schema_with_empty_metadata(vec![make_field("tableA", "a")]);
+        let schema_a =
+            make_schema_with_empty_metadata(vec![make_field("\"tableA\"", 
"a")]);
         let schemas = vec![schema_a];
         let schemas = schemas.iter().collect::<Vec<_>>();
 
@@ -674,7 +675,7 @@ mod test {
                 .to_string();
         assert_eq!(
             error,
-            "Schema error: No field named 'b'. Valid fields are 'tableA'.'a'."
+            r#"Schema error: No field named "b". Valid fields are 
"tableA"."a"."#
         );
     }
 
@@ -690,7 +691,7 @@ mod test {
     }
 
     fn make_field(relation: &str, column: &str) -> DFField {
-        DFField::new(Some(relation), column, DataType::Int8, false)
+        DFField::new(Some(relation.to_string()), column, DataType::Int8, false)
     }
 
     #[test]
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 493c425d7..5ceaf1668 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -247,13 +247,12 @@ impl ExprSchemable for Expr {
     fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
         match self {
             Expr::Column(c) => Ok(DFField::new(
-                c.relation.as_deref(),
+                c.relation.clone(),
                 &c.name,
                 self.get_type(input_schema)?,
                 self.nullable(input_schema)?,
             )),
-            _ => Ok(DFField::new(
-                None,
+            _ => Ok(DFField::new_unqualified(
                 &self.display_name()?,
                 self.get_type(input_schema)?,
                 self.nullable(input_schema)?,
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 9be046934..5616b7c52 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -176,8 +176,7 @@ impl LogicalPlanBuilder {
             .map(|(j, data_type)| {
                 // naming is following convention 
https://www.postgresql.org/docs/current/queries-values.html
                 let name = &format!("column{}", j + 1);
-                DFField::new(
-                    None,
+                DFField::new_unqualified(
                     name,
                     data_type.clone().unwrap_or(DataType::Utf8),
                     true,
@@ -239,7 +238,10 @@ impl LogicalPlanBuilder {
                 DFSchema::new_with_metadata(
                     p.iter()
                         .map(|i| {
-                            DFField::from_qualified(&table_name, 
schema.field(*i).clone())
+                            DFField::from_qualified(
+                                table_name.to_string(),
+                                schema.field(*i).clone(),
+                            )
                         })
                         .collect(),
                     schema.metadata().clone(),
@@ -1105,7 +1107,7 @@ pub fn union(left_plan: LogicalPlan, right_plan: 
LogicalPlan) -> Result<LogicalP
                     })?;
 
             Ok(DFField::new(
-                left_field.qualifier().map(|x| x.as_ref()),
+                left_field.qualifier().cloned(),
                 left_field.name(),
                 data_type,
                 nullable,
@@ -1291,7 +1293,7 @@ pub fn unnest(input: LogicalPlan, column: Column) -> 
Result<LogicalPlan> {
         DataType::List(field)
         | DataType::FixedSizeList(field, _)
         | DataType::LargeList(field) => DFField::new(
-            unnest_field.qualifier().map(String::as_str),
+            unnest_field.qualifier().cloned(),
             unnest_field.name(),
             field.data_type().clone(),
             unnest_field.is_nullable(),
@@ -1332,7 +1334,7 @@ pub fn unnest(input: LogicalPlan, column: Column) -> 
Result<LogicalPlan> {
 mod tests {
     use crate::{expr, expr_fn::exists};
     use arrow::datatypes::{DataType, Field};
-    use datafusion_common::SchemaError;
+    use datafusion_common::{OwnedTableReference, SchemaError, TableReference};
 
     use crate::logical_plan::StringifiedPlan;
 
@@ -1607,10 +1609,13 @@ mod tests {
 
         match plan {
             Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
-                qualifier,
-                name,
+                field:
+                    Column {
+                        relation: Some(OwnedTableReference::Bare { table }),
+                        name,
+                    },
             })) => {
-                assert_eq!("employee_csv", qualifier.unwrap().as_str());
+                assert_eq!("employee_csv", table);
                 assert_eq!("id", &name);
                 Ok(())
             }
@@ -1633,10 +1638,13 @@ mod tests {
 
         match plan {
             Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
-                qualifier,
-                name,
+                field:
+                    Column {
+                        relation: Some(OwnedTableReference::Bare { table }),
+                        name,
+                    },
             })) => {
-                assert_eq!("employee_csv", qualifier.unwrap().as_str());
+                assert_eq!("employee_csv", table);
                 assert_eq!("state", &name);
                 Ok(())
             }
@@ -1737,7 +1745,7 @@ mod tests {
         // Check unnested field is a scalar
         let field = plan
             .schema()
-            .field_with_name(Some("test_table"), "strings")
+            .field_with_name(Some(&TableReference::bare("test_table")), 
"strings")
             .unwrap();
         assert_eq!(&DataType::Utf8, field.data_type());
 
@@ -1756,7 +1764,7 @@ mod tests {
         // Check unnested struct list field should be a struct.
         let field = plan
             .schema()
-            .field_with_name(Some("test_table"), "structs")
+            .field_with_name(Some(&TableReference::bare("test_table")), 
"structs")
             .unwrap();
         assert!(matches!(field.data_type(), DataType::Struct(_)));
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 83bda0612..777a5871c 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1521,6 +1521,8 @@ pub struct Window {
 #[derive(Clone)]
 pub struct TableScan {
     /// The name of the table
+    // TODO: change to OwnedTableReference
+    // see: https://github.com/apache/arrow-datafusion/issues/5522
     pub table_name: String,
     /// The source of the table
     pub source: Arc<dyn TableSource>,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index a0745025d..3d8b44792 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -35,6 +35,7 @@ use crate::{
 use arrow::datatypes::{DataType, TimeUnit};
 use datafusion_common::{
     Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
+    TableReference,
 };
 use std::cmp::Ordering;
 use std::collections::HashSet;
@@ -191,8 +192,9 @@ pub fn expand_qualified_wildcard(
     qualifier: &str,
     schema: &DFSchema,
 ) -> Result<Vec<Expr>> {
+    let qualifier = TableReference::from(qualifier);
     let qualified_fields: Vec<DFField> = schema
-        .fields_with_qualified(qualifier)
+        .fields_with_qualified(&qualifier)
         .into_iter()
         .cloned()
         .collect();
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index e1830390d..33bf676db 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -312,7 +312,7 @@ fn build_common_expr_project_plan(
         match expr_set.get(&id) {
             Some((expr, _, data_type)) => {
                 // todo: check `nullable`
-                let field = DFField::new(None, &id, data_type.clone(), true);
+                let field = DFField::new_unqualified(&id, data_type.clone(), 
true);
                 fields_set.insert(field.name().to_owned());
                 project_exprs.push(expr.clone().alias(&id));
             }
@@ -624,8 +624,8 @@ mod test {
 
         let schema = Arc::new(DFSchema::new_with_metadata(
             vec![
-                DFField::new(None, "a", DataType::Int64, false),
-                DFField::new(None, "c", DataType::Int64, false),
+                DFField::new_unqualified("a", DataType::Int64, false),
+                DFField::new_unqualified("c", DataType::Int64, false),
             ],
             Default::default(),
         )?);
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index c1baa25d4..6091c4f6f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -476,9 +476,9 @@ mod tests {
              Internal error: Optimizer rule 'get table_scan rule' failed, due 
to generate a different schema, \
              original schema: DFSchema { fields: [], metadata: {} }, \
              new schema: DFSchema { fields: [\
-             DFField { qualifier: Some(\"test\"), field: Field { name: \"a\", 
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} } }, \
-             DFField { qualifier: Some(\"test\"), field: Field { name: \"b\", 
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} } }, \
-             DFField { qualifier: Some(\"test\"), field: Field { name: \"c\", 
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} } }], \
+             DFField { qualifier: Some(Bare { table: \"test\" }), field: Field 
{ name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} } }, \
+             DFField { qualifier: Some(Bare { table: \"test\" }), field: Field 
{ name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} } }, \
+             DFField { qualifier: Some(Bare { table: \"test\" }), field: Field 
{ name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} } }], \
              metadata: {} }. \
              This was likely caused by a bug in DataFusion's code \
              and we would welcome that you file an bug report in our issue 
tracker",
@@ -521,7 +521,7 @@ mod tests {
 
                 let new_arrow_field = 
f.field().clone().with_metadata(metadata);
                 if let Some(qualifier) = f.qualifier() {
-                    DFField::from_qualified(qualifier, new_arrow_field)
+                    DFField::from_qualified(qualifier.clone(), new_arrow_field)
                 } else {
                     DFField::from(new_arrow_field)
                 }
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 6d7ab481b..4e9d5f039 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -496,7 +496,8 @@ fn push_down_scan(
     let mut projection: BTreeSet<usize> = used_columns
         .iter()
         .filter(|c| {
-            c.relation.is_none() || c.relation.as_ref().unwrap() == 
&scan.table_name
+            c.relation.is_none()
+                || c.relation.as_ref().unwrap().to_string() == scan.table_name
         })
         .map(|c| schema.index_of(&c.name))
         .filter_map(ArrowResult::ok)
@@ -536,7 +537,9 @@ fn push_down_scan(
     // create the projected schema
     let projected_fields: Vec<DFField> = projection
         .iter()
-        .map(|i| DFField::from_qualified(&scan.table_name, 
schema.fields()[*i].clone()))
+        .map(|i| {
+            DFField::from_qualified(scan.table_name.clone(), 
schema.fields()[*i].clone())
+        })
         .collect();
 
     let projected_schema = projected_fields.to_dfschema_ref()?;
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index cfec3cb74..34ea208b3 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -272,16 +272,10 @@ fn optimize_scalar(
     // qualify the join columns for outside the subquery
     let mut subqry_cols: Vec<_> = subqry_cols
         .iter()
-        .map(|it| Column {
-            relation: Some(subqry_alias.clone()),
-            name: it.name.clone(),
-        })
+        .map(|it| Column::new(Some(subqry_alias.clone()), it.name.clone()))
         .collect();
 
-    let qry_expr = Expr::Column(Column {
-        relation: Some(subqry_alias),
-        name: "__value".to_string(),
-    });
+    let qry_expr = Expr::Column(Column::new(Some(subqry_alias), 
"__value".to_string()));
 
     // if correlated subquery's operation is column equality, put the clause 
into join on clause.
     let mut restore_where_clause = true;
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs 
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index c2e96bf97..2b015acd7 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -2427,14 +2427,14 @@ mod tests {
         Arc::new(
             DFSchema::new_with_metadata(
                 vec![
-                    DFField::new(None, "c1", DataType::Utf8, true),
-                    DFField::new(None, "c2", DataType::Boolean, true),
-                    DFField::new(None, "c3", DataType::Int64, true),
-                    DFField::new(None, "c4", DataType::UInt32, true),
-                    DFField::new(None, "c1_non_null", DataType::Utf8, false),
-                    DFField::new(None, "c2_non_null", DataType::Boolean, 
false),
-                    DFField::new(None, "c3_non_null", DataType::Int64, false),
-                    DFField::new(None, "c4_non_null", DataType::UInt32, false),
+                    DFField::new_unqualified("c1", DataType::Utf8, true),
+                    DFField::new_unqualified("c2", DataType::Boolean, true),
+                    DFField::new_unqualified("c3", DataType::Int64, true),
+                    DFField::new_unqualified("c4", DataType::UInt32, true),
+                    DFField::new_unqualified("c1_non_null", DataType::Utf8, 
false),
+                    DFField::new_unqualified("c2_non_null", DataType::Boolean, 
false),
+                    DFField::new_unqualified("c3_non_null", DataType::Int64, 
false),
+                    DFField::new_unqualified("c4_non_null", DataType::UInt32, 
false),
                 ],
                 HashMap::new(),
             )
diff --git a/datafusion/optimizer/src/type_coercion.rs 
b/datafusion/optimizer/src/type_coercion.rs
index c4c5c29d3..7cfd6cc8d 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -664,7 +664,7 @@ mod test {
             produce_one_row: false,
             schema: Arc::new(
                 DFSchema::new_with_metadata(
-                    vec![DFField::new(None, "a", DataType::Float64, true)],
+                    vec![DFField::new_unqualified("a", DataType::Float64, 
true)],
                     std::collections::HashMap::new(),
                 )
                 .unwrap(),
@@ -682,7 +682,7 @@ mod test {
             produce_one_row: false,
             schema: Arc::new(
                 DFSchema::new_with_metadata(
-                    vec![DFField::new(None, "a", DataType::Float64, true)],
+                    vec![DFField::new_unqualified("a", DataType::Float64, 
true)],
                     std::collections::HashMap::new(),
                 )
                 .unwrap(),
@@ -881,7 +881,7 @@ mod test {
             produce_one_row: false,
             schema: Arc::new(
                 DFSchema::new_with_metadata(
-                    vec![DFField::new(None, "a", DataType::Int64, true)],
+                    vec![DFField::new_unqualified("a", DataType::Int64, true)],
                     std::collections::HashMap::new(),
                 )
                 .unwrap(),
@@ -899,7 +899,11 @@ mod test {
             produce_one_row: false,
             schema: Arc::new(
                 DFSchema::new_with_metadata(
-                    vec![DFField::new(None, "a", DataType::Decimal128(12, 4), 
true)],
+                    vec![DFField::new_unqualified(
+                        "a",
+                        DataType::Decimal128(12, 4),
+                        true,
+                    )],
                     std::collections::HashMap::new(),
                 )
                 .unwrap(),
@@ -1082,7 +1086,7 @@ mod test {
             produce_one_row: false,
             schema: Arc::new(
                 DFSchema::new_with_metadata(
-                    vec![DFField::new(None, "a", data_type, true)],
+                    vec![DFField::new_unqualified("a", data_type, true)],
                     std::collections::HashMap::new(),
                 )
                 .unwrap(),
@@ -1095,7 +1099,7 @@ mod test {
         // gt
         let schema = Arc::new(
             DFSchema::new_with_metadata(
-                vec![DFField::new(None, "a", DataType::Int64, true)],
+                vec![DFField::new_unqualified("a", DataType::Int64, true)],
                 std::collections::HashMap::new(),
             )
             .unwrap(),
@@ -1109,7 +1113,7 @@ mod test {
         // eq
         let schema = Arc::new(
             DFSchema::new_with_metadata(
-                vec![DFField::new(None, "a", DataType::Int64, true)],
+                vec![DFField::new_unqualified("a", DataType::Int64, true)],
                 std::collections::HashMap::new(),
             )
             .unwrap(),
@@ -1123,7 +1127,7 @@ mod test {
         // lt
         let schema = Arc::new(
             DFSchema::new_with_metadata(
-                vec![DFField::new(None, "a", DataType::Int64, true)],
+                vec![DFField::new_unqualified("a", DataType::Int64, true)],
                 std::collections::HashMap::new(),
             )
             .unwrap(),
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs 
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index a940cf272..4c2a24f05 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -743,14 +743,22 @@ mod tests {
         Arc::new(
             DFSchema::new_with_metadata(
                 vec![
-                    DFField::new(None, "c1", DataType::Int32, false),
-                    DFField::new(None, "c2", DataType::Int64, false),
-                    DFField::new(None, "c3", DataType::Decimal128(18, 2), 
false),
-                    DFField::new(None, "c4", DataType::Decimal128(38, 37), 
false),
-                    DFField::new(None, "c5", DataType::Float32, false),
-                    DFField::new(None, "c6", DataType::UInt32, false),
-                    DFField::new(None, "ts_nano_none", 
timestamp_nano_none_type(), false),
-                    DFField::new(None, "ts_nano_utf", 
timestamp_nano_utc_type(), false),
+                    DFField::new_unqualified("c1", DataType::Int32, false),
+                    DFField::new_unqualified("c2", DataType::Int64, false),
+                    DFField::new_unqualified("c3", DataType::Decimal128(18, 
2), false),
+                    DFField::new_unqualified("c4", DataType::Decimal128(38, 
37), false),
+                    DFField::new_unqualified("c5", DataType::Float32, false),
+                    DFField::new_unqualified("c6", DataType::UInt32, false),
+                    DFField::new_unqualified(
+                        "ts_nano_none",
+                        timestamp_nano_none_type(),
+                        false,
+                    ),
+                    DFField::new_unqualified(
+                        "ts_nano_utf",
+                        timestamp_nano_utc_type(),
+                        false,
+                    ),
                 ],
                 HashMap::new(),
             )
diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs 
b/datafusion/physical-expr/src/intervals/cp_solver.rs
index 302a86cdc..66367001c 100644
--- a/datafusion/physical-expr/src/intervals/cp_solver.rs
+++ b/datafusion/physical-expr/src/intervals/cp_solver.rs
@@ -326,7 +326,7 @@ impl ExprIntervalGraph {
     // ```
 
     /// This function associates stable node indices with [PhysicalExpr]s so
-    /// that we can match Arc<dyn PhysicalExpr> and NodeIndex objects during
+    /// that we can match `Arc<dyn PhysicalExpr>` and NodeIndex objects during
     /// membership tests.
     pub fn gather_node_indices(
         &mut self,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 1b704f3aa..aa416e63b 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -145,10 +145,7 @@ impl From<protobuf::Column> for Column {
     fn from(c: protobuf::Column) -> Self {
         let protobuf::Column { relation, name } = c;
 
-        Self {
-            relation: relation.map(|r| r.relation),
-            name,
-        }
+        Self::new(relation.map(|r| r.relation), name)
     }
 }
 
@@ -190,7 +187,7 @@ impl TryFrom<&protobuf::DfField> for DFField {
         let field = df_field.field.as_ref().required("field")?;
 
         Ok(match &df_field.qualifier {
-            Some(q) => DFField::from_qualified(&q.relation, field),
+            Some(q) => DFField::from_qualified(q.relation.clone(), field),
             None => DFField::from(field),
         })
     }
@@ -217,21 +214,17 @@ impl TryFrom<protobuf::OwnedTableReference> for 
OwnedTableReference {
 
         match table_reference_enum {
             TableReferenceEnum::Bare(protobuf::BareTableReference { table }) 
=> {
-                Ok(OwnedTableReference::Bare { table })
+                Ok(OwnedTableReference::bare(table))
             }
             TableReferenceEnum::Partial(protobuf::PartialTableReference {
                 schema,
                 table,
-            }) => Ok(OwnedTableReference::Partial { schema, table }),
+            }) => Ok(OwnedTableReference::partial(schema, table)),
             TableReferenceEnum::Full(protobuf::FullTableReference {
                 catalog,
                 schema,
                 table,
-            }) => Ok(OwnedTableReference::Full {
-                catalog,
-                schema,
-                table,
-            }),
+            }) => Ok(OwnedTableReference::full(catalog, schema, table)),
         }
     }
 }
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index c240ef853..a794c9bd0 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -240,9 +240,9 @@ impl TryFrom<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
 impl From<Column> for protobuf::Column {
     fn from(c: Column) -> Self {
         Self {
-            relation: c
-                .relation
-                .map(|relation| protobuf::ColumnRelation { relation }),
+            relation: c.relation.map(|relation| protobuf::ColumnRelation {
+                relation: relation.to_string(),
+            }),
             name: c.name,
         }
     }
@@ -1321,12 +1321,14 @@ impl From<OwnedTableReference> for 
protobuf::OwnedTableReference {
         use protobuf::owned_table_reference::TableReferenceEnum;
         let table_reference_enum = match t {
             OwnedTableReference::Bare { table } => {
-                TableReferenceEnum::Bare(protobuf::BareTableReference { table 
})
+                TableReferenceEnum::Bare(protobuf::BareTableReference {
+                    table: table.to_string(),
+                })
             }
             OwnedTableReference::Partial { schema, table } => {
                 TableReferenceEnum::Partial(protobuf::PartialTableReference {
-                    schema,
-                    table,
+                    schema: schema.to_string(),
+                    table: table.to_string(),
                 })
             }
             OwnedTableReference::Full {
@@ -1334,9 +1336,9 @@ impl From<OwnedTableReference> for 
protobuf::OwnedTableReference {
                 schema,
                 table,
             } => TableReferenceEnum::Full(protobuf::FullTableReference {
-                catalog,
-                schema,
-                table,
+                catalog: catalog.to_string(),
+                schema: schema.to_string(),
+                table: table.to_string(),
             }),
         };
 
diff --git a/datafusion/sql/src/expr/arrow_cast.rs 
b/datafusion/sql/src/expr/arrow_cast.rs
index 49104ee05..9a2ac28c4 100644
--- a/datafusion/sql/src/expr/arrow_cast.rs
+++ b/datafusion/sql/src/expr/arrow_cast.rs
@@ -93,7 +93,7 @@ pub fn create_arrow_cast(mut args: Vec<Expr>, schema: 
&DFSchema) -> Result<Expr>
 /// assert_eq!(data_type, DataType::Int32);
 /// ```
 ///
-/// Remove if added to arrow: https://github.com/apache/arrow-rs/issues/3821
+/// Remove if added to arrow: <https://github.com/apache/arrow-rs/issues/3821>
 pub fn parse_data_type(val: &str) -> Result<DataType> {
     Parser::new(val).parse()
 }
diff --git a/datafusion/sql/src/expr/identifier.rs 
b/datafusion/sql/src/expr/identifier.rs
index a581d5f47..7548fc8bd 100644
--- a/datafusion/sql/src/expr/identifier.rs
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::planner::{
-    idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
-};
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
 use crate::utils::normalize_ident;
 use datafusion_common::{
-    Column, DFSchema, DataFusionError, OwnedTableReference, Result, 
ScalarValue,
+    Column, DFField, DFSchema, DataFusionError, Result, ScalarValue, 
TableReference,
 };
 use datafusion_expr::{Case, Expr, GetIndexedField};
 use sqlparser::ast::{Expr as SQLExpr, Ident};
@@ -57,6 +55,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         ids: Vec<Ident>,
         schema: &DFSchema,
     ) -> Result<Expr> {
+        if ids.len() < 2 {
+            return Err(DataFusionError::Internal(format!(
+                "Not a compound identifier: {ids:?}"
+            )));
+        }
+
         if ids[0].value.starts_with('@') {
             let var_names: Vec<_> = 
ids.into_iter().map(normalize_ident).collect();
             let ty = self
@@ -69,44 +73,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 })?;
             Ok(Expr::ScalarVariable(ty, var_names))
         } else {
-            // only support "schema.table" type identifiers here
-            let (name, relation) = match idents_to_table_reference(
-                ids,
-                self.options.enable_ident_normalization,
-            )? {
-                OwnedTableReference::Partial { schema, table } => (table, 
schema),
-                r @ OwnedTableReference::Bare { .. }
-                | r @ OwnedTableReference::Full { .. } => {
-                    return Err(DataFusionError::Plan(format!(
-                        "Unsupported compound identifier '{r:?}'",
-                    )));
+            let ids = ids
+                .into_iter()
+                .map(|id| {
+                    if self.options.enable_ident_normalization {
+                        normalize_ident(id)
+                    } else {
+                        id.value
+                    }
+                })
+                .collect::<Vec<_>>();
+
+            // Currently not supporting more than one nested level
+            // Though ideally once that support is in place, this code should 
work with it
+            // TODO: remove when can support multiple nested identifiers
+            if ids.len() > 5 {
+                return Err(DataFusionError::Internal(format!(
+                    "Unsupported compound identifier: {ids:?}"
+                )));
+            }
+
+            let search_result = search_dfschema(&ids, schema);
+            match search_result {
+                // found matching field with spare identifier(s) for nested 
field(s) in structure
+                Some((field, nested_names)) if !nested_names.is_empty() => {
+                    // TODO: remove when can support multiple nested 
identifiers
+                    if nested_names.len() > 1 {
+                        return Err(DataFusionError::Internal(format!(
+                            "Nested identifiers not yet supported for column 
{}",
+                            field.qualified_column().quoted_flat_name()
+                        )));
+                    }
+                    let nested_name = nested_names[0].to_string();
+                    Ok(Expr::GetIndexedField(GetIndexedField::new(
+                        Box::new(Expr::Column(field.qualified_column())),
+                        ScalarValue::Utf8(Some(nested_name)),
+                    )))
                 }
-            };
-
-            // Try and find the reference in schema
-            match schema.field_with_qualified_name(&relation, &name) {
-                Ok(_) => {
-                    // found an exact match on a qualified name so this is a 
table.column identifier
-                    Ok(Expr::Column(Column {
-                        relation: Some(relation),
-                        name,
-                    }))
+                // found matching field with no spare identifier(s)
+                Some((field, _nested_names)) => {
+                    Ok(Expr::Column(field.qualified_column()))
                 }
-                Err(_) => {
-                    if let Some(field) =
-                        schema.fields().iter().find(|f| f.name().eq(&relation))
-                    {
-                        // Access to a field of a column which is a structure, 
example: SELECT my_struct.key
-                        Ok(Expr::GetIndexedField(GetIndexedField::new(
-                            Box::new(Expr::Column(field.qualified_column())),
-                            ScalarValue::Utf8(Some(name)),
+                // found no matching field, will return a default
+                None => {
+                    // return default where use all identifiers to not have a 
nested field
+                    // this len check is because at 5 identifiers will have to 
have a nested field
+                    if ids.len() == 5 {
+                        Err(DataFusionError::Internal(format!(
+                            "Unsupported compound identifier: {ids:?}"
                         )))
                     } else {
-                        // table.column identifier
-                        Ok(Expr::Column(Column {
-                            relation: Some(relation),
-                            name,
-                        }))
+                        let s = &ids[0..ids.len()];
+                        // safe unwrap as s can never be empty or exceed the 
bounds
+                        let (relation, column_name) = 
form_identifier(s).unwrap();
+                        let relation = relation.map(|r| 
r.to_owned_reference());
+                        Ok(Expr::Column(Column::new(relation, column_name)))
                     }
                 }
             }
@@ -160,3 +181,244 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         )))
     }
 }
+
+// (relation, column name)
+fn form_identifier(idents: &[String]) -> Result<(Option<TableReference>, 
&String)> {
+    match idents.len() {
+        1 => Ok((None, &idents[0])),
+        2 => Ok((
+            Some(TableReference::Bare {
+                table: (&idents[0]).into(),
+            }),
+            &idents[1],
+        )),
+        3 => Ok((
+            Some(TableReference::Partial {
+                schema: (&idents[0]).into(),
+                table: (&idents[1]).into(),
+            }),
+            &idents[2],
+        )),
+        4 => Ok((
+            Some(TableReference::Full {
+                catalog: (&idents[0]).into(),
+                schema: (&idents[1]).into(),
+                table: (&idents[2]).into(),
+            }),
+            &idents[3],
+        )),
+        _ => Err(DataFusionError::Internal(format!(
+            "Incorrect number of identifiers: {}",
+            idents.len()
+        ))),
+    }
+}
+
+fn search_dfschema<'ids, 'schema>(
+    ids: &'ids [String],
+    schema: &'schema DFSchema,
+) -> Option<(&'schema DFField, &'ids [String])> {
+    generate_schema_search_terms(ids).find_map(|(qualifier, column, 
nested_names)| {
+        let field = schema.field_with_name(qualifier.as_ref(), column).ok();
+        field.map(|f| (f, nested_names))
+    })
+}
+
+// Possibilities we search with, in order from top to bottom for each len:
+//
+// len = 2:
+// 1. (table.column)
+// 2. (column).nested
+//
+// len = 3:
+// 1. (schema.table.column)
+// 2. (table.column).nested
+// 3. (column).nested1.nested2
+//
+// len = 4:
+// 1. (catalog.schema.table.column)
+// 2. (schema.table.column).nested1
+// 3. (table.column).nested1.nested2
+// 4. (column).nested1.nested2.nested3
+//
+// len = 5:
+// 1. (catalog.schema.table.column).nested
+// 2. (schema.table.column).nested1.nested2
+// 3. (table.column).nested1.nested2.nested3
+// 4. (column).nested1.nested2.nested3.nested4
+//
+// len > 5:
+// 1. (catalog.schema.table.column).nested[.nestedN]+
+// 2. (schema.table.column).nested1.nested2[.nestedN]+
+// 3. (table.column).nested1.nested2.nested3[.nestedN]+
+// 4. (column).nested1.nested2.nested3.nested4[.nestedN]+
+fn generate_schema_search_terms(
+    ids: &[String],
+) -> impl Iterator<Item = (Option<TableReference>, &String, &[String])> {
+    // take at most 4 identifiers to form a Column to search with
+    // - 1 for the column name
+    // - 0 to 3 for the TableReference
+    let bound = ids.len().min(4);
+    // search terms from most specific to least specific
+    (0..bound).rev().map(|i| {
+        let nested_names_index = i + 1;
+        let qualifier_and_column = &ids[0..nested_names_index];
+        // safe unwrap as qualifier_and_column can never be empty or exceed 
the bounds
+        let (relation, column_name) = 
form_identifier(qualifier_and_column).unwrap();
+        (relation, column_name, &ids[nested_names_index..])
+    })
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    // testing according to documentation of generate_schema_search_terms 
function
+    // where ensure generated search terms are in correct order with correct 
values
+    fn test_generate_schema_search_terms() -> Result<()> {
+        type ExpectedItem = (
+            Option<TableReference<'static>>,
+            &'static str,
+            &'static [&'static str],
+        );
+        fn assert_vec_eq(
+            expected: Vec<ExpectedItem>,
+            actual: Vec<(Option<TableReference>, &String, &[String])>,
+        ) {
+            for (expected, actual) in expected.into_iter().zip(actual) {
+                assert_eq!(expected.0, actual.0, "qualifier");
+                assert_eq!(expected.1, actual.1, "column name");
+                assert_eq!(expected.2, actual.2, "nested names");
+            }
+        }
+
+        let actual = generate_schema_search_terms(&[]).collect::<Vec<_>>();
+        assert!(actual.is_empty());
+
+        let ids = vec!["a".to_string()];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![(None, "a", &[])];
+        assert_vec_eq(expected, actual);
+
+        let ids = vec!["a".to_string(), "b".to_string()];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![
+            (Some(TableReference::bare("a")), "b", &[]),
+            (None, "a", &["b"]),
+        ];
+        assert_vec_eq(expected, actual);
+
+        let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![
+            (Some(TableReference::partial("a", "b")), "c", &[]),
+            (Some(TableReference::bare("a")), "b", &["c"]),
+            (None, "a", &["b", "c"]),
+        ];
+        assert_vec_eq(expected, actual);
+
+        let ids = vec![
+            "a".to_string(),
+            "b".to_string(),
+            "c".to_string(),
+            "d".to_string(),
+        ];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![
+            (Some(TableReference::full("a", "b", "c")), "d", &[]),
+            (Some(TableReference::partial("a", "b")), "c", &["d"]),
+            (Some(TableReference::bare("a")), "b", &["c", "d"]),
+            (None, "a", &["b", "c", "d"]),
+        ];
+        assert_vec_eq(expected, actual);
+
+        let ids = vec![
+            "a".to_string(),
+            "b".to_string(),
+            "c".to_string(),
+            "d".to_string(),
+            "e".to_string(),
+        ];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![
+            (Some(TableReference::full("a", "b", "c")), "d", &["e"]),
+            (Some(TableReference::partial("a", "b")), "c", &["d", "e"]),
+            (Some(TableReference::bare("a")), "b", &["c", "d", "e"]),
+            (None, "a", &["b", "c", "d", "e"]),
+        ];
+        assert_vec_eq(expected, actual);
+
+        let ids = vec![
+            "a".to_string(),
+            "b".to_string(),
+            "c".to_string(),
+            "d".to_string(),
+            "e".to_string(),
+            "f".to_string(),
+        ];
+        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+        let expected: Vec<ExpectedItem> = vec![
+            (Some(TableReference::full("a", "b", "c")), "d", &["e", "f"]),
+            (
+                Some(TableReference::partial("a", "b")),
+                "c",
+                &["d", "e", "f"],
+            ),
+            (Some(TableReference::bare("a")), "b", &["c", "d", "e", "f"]),
+            (None, "a", &["b", "c", "d", "e", "f"]),
+        ];
+        assert_vec_eq(expected, actual);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_form_identifier() -> Result<()> {
+        let err = form_identifier(&[]).expect_err("empty identifiers didn't 
fail");
+        let expected = "Internal error: Incorrect number of identifiers: 0. \
+        This was likely caused by a bug in DataFusion's code and we would \
+        welcome that you file an bug report in our issue tracker";
+        assert_eq!(err.to_string(), expected);
+
+        let ids = vec!["a".to_string()];
+        let (qualifier, column) = form_identifier(&ids)?;
+        assert_eq!(qualifier, None);
+        assert_eq!(column, "a");
+
+        let ids = vec!["a".to_string(), "b".to_string()];
+        let (qualifier, column) = form_identifier(&ids)?;
+        assert_eq!(qualifier, Some(TableReference::bare("a")));
+        assert_eq!(column, "b");
+
+        let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
+        let (qualifier, column) = form_identifier(&ids)?;
+        assert_eq!(qualifier, Some(TableReference::partial("a", "b")));
+        assert_eq!(column, "c");
+
+        let ids = vec![
+            "a".to_string(),
+            "b".to_string(),
+            "c".to_string(),
+            "d".to_string(),
+        ];
+        let (qualifier, column) = form_identifier(&ids)?;
+        assert_eq!(qualifier, Some(TableReference::full("a", "b", "c")));
+        assert_eq!(column, "d");
+
+        let err = form_identifier(&[
+            "a".to_string(),
+            "b".to_string(),
+            "c".to_string(),
+            "d".to_string(),
+            "e".to_string(),
+        ])
+        .expect_err("too many identifiers didn't fail");
+        let expected = "Internal error: Incorrect number of identifiers: 5. \
+        This was likely caused by a bug in DataFusion's code and we would \
+        welcome that you file an bug report in our issue tracker";
+        assert_eq!(err.to_string(), expected);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index ad05fbcc1..555344166 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -93,7 +93,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         .find(|field| match field.qualifier() {
                             Some(field_q) => {
                                 field.name() == &col.name
-                                    && field_q.ends_with(&format!(".{q}"))
+                                    && 
field_q.to_string().ends_with(&format!(".{q}"))
                             }
                             _ => false,
                         }) {
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index bf418d0b4..ac3a34cab 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -21,13 +21,14 @@ use std::sync::Arc;
 use std::vec;
 
 use arrow_schema::*;
+use datafusion_common::field_not_found;
 use sqlparser::ast::ExactNumberInfo;
 use sqlparser::ast::TimezoneInfo;
 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
 use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
 
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{field_not_found, DFSchema, DataFusionError, Result};
+use datafusion_common::{unqualified_field_not_found, DFSchema, 
DataFusionError, Result};
 use datafusion_common::{OwnedTableReference, TableReference};
 use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
 use datafusion_expr::utils::find_column_exprs;
@@ -201,16 +202,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         if 
!schema.fields_with_unqualified_name(&col.name).is_empty() {
                             Ok(())
                         } else {
-                            Err(field_not_found(None, col.name.as_str(), 
schema))
+                            Err(unqualified_field_not_found(col.name.as_str(), 
schema))
                         }
                     }
                 }
                 .map_err(|_: DataFusionError| {
-                    field_not_found(
-                        col.relation.as_ref().map(|s| s.to_owned()),
-                        col.name.as_str(),
-                        schema,
-                    )
+                    field_not_found(col.relation.clone(), col.name.as_str(), 
schema)
                 }),
                 _ => Err(DataFusionError::Internal("Not a 
column".to_string())),
             })
@@ -377,22 +374,18 @@ pub(crate) fn idents_to_table_reference(
     match taker.0.len() {
         1 => {
             let table = taker.take(enable_normalization);
-            Ok(OwnedTableReference::Bare { table })
+            Ok(OwnedTableReference::bare(table))
         }
         2 => {
             let table = taker.take(enable_normalization);
             let schema = taker.take(enable_normalization);
-            Ok(OwnedTableReference::Partial { schema, table })
+            Ok(OwnedTableReference::partial(schema, table))
         }
         3 => {
             let table = taker.take(enable_normalization);
             let schema = taker.take(enable_normalization);
             let catalog = taker.take(enable_normalization);
-            Ok(OwnedTableReference::Full {
-                catalog,
-                schema,
-                table,
-            })
+            Ok(OwnedTableReference::full(catalog, schema, table))
         }
         _ => Err(DataFusionError::Plan(format!(
             "Unsupported compound identifier '{:?}'",
diff --git a/datafusion/sql/src/relation/mod.rs 
b/datafusion/sql/src/relation/mod.rs
index 12f5bd6b4..19a278666 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -35,10 +35,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let table_name = table_ref.to_string();
                 let cte = planner_context.ctes.get(&table_name);
                 (
-                    match (
-                        cte,
-                        
self.schema_provider.get_table_provider((&table_ref).into()),
-                    ) {
+                    match (cte, 
self.schema_provider.get_table_provider(table_ref)) {
                         (Some(cte_plan), _) => Ok(cte_plan.clone()),
                         (_, Ok(provider)) => {
                             LogicalPlanBuilder::scan(&table_name, provider, 
None)?.build()
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index fa0eed1b9..56d0700e0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -413,9 +413,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let DescribeTableStmt { table_name } = statement;
         let table_ref = self.object_name_to_table_reference(table_name)?;
 
-        let table_source = self
-            .schema_provider
-            .get_table_provider((&table_ref).into())?;
+        let table_source = self.schema_provider.get_table_provider(table_ref)?;
 
         let schema = table_source.schema();
 
@@ -463,7 +461,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let schema = self.build_schema(columns)?;
 
         // External tables do not support schemas at the moment, so the name 
is just a table name
-        let name = OwnedTableReference::Bare { table: name };
+        let name = OwnedTableReference::bare(name);
 
         Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
             schema: schema.to_dfschema_ref()?,
@@ -634,9 +632,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // Do a table lookup to verify the table exists
         let table_ref = 
self.object_name_to_table_reference(table_name.clone())?;
-        let provider = self
-            .schema_provider
-            .get_table_provider((&table_ref).into())?;
+        let provider = 
self.schema_provider.get_table_provider(table_ref.clone())?;
         let schema = (*provider.schema()).clone();
         let schema = DFSchema::try_from(schema)?;
         let scan =
@@ -688,7 +684,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let table_name = self.object_name_to_table_reference(table_name)?;
         let provider = self
             .schema_provider
-            .get_table_provider((&table_name).into())?;
+            .get_table_provider(table_name.clone())?;
         let arrow_schema = (*provider.schema()).clone();
         let table_schema = Arc::new(DFSchema::try_from(arrow_schema)?);
         let values = table_schema.fields().iter().map(|f| {
@@ -790,7 +786,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let table_name = self.object_name_to_table_reference(table_name)?;
         let provider = self
             .schema_provider
-            .get_table_provider((&table_name).into())?;
+            .get_table_provider(table_name.clone())?;
         let arrow_schema = (*provider.schema()).clone();
         let table_schema = DFSchema::try_from(arrow_schema)?;
 
@@ -896,9 +892,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // Do a table lookup to verify the table exists
         let table_ref = self.object_name_to_table_reference(sql_table_name)?;
-        let _ = self
-            .schema_provider
-            .get_table_provider((&table_ref).into())?;
+        let _ = self.schema_provider.get_table_provider(table_ref)?;
 
         // treat both FULL and EXTENDED as the same
         let select_list = if full || extended {
@@ -934,9 +928,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // Do a table lookup to verify the table exists
         let table_ref = self.object_name_to_table_reference(sql_table_name)?;
-        let _ = self
-            .schema_provider
-            .get_table_provider((&table_ref).into())?;
+        let _ = self.schema_provider.get_table_provider(table_ref)?;
 
         let query = format!(
             "SELECT table_catalog, table_schema, table_name, definition FROM 
information_schema.views WHERE {where_clause}"
diff --git a/datafusion/sql/tests/integration_test.rs 
b/datafusion/sql/tests/integration_test.rs
index 124ab0c69..4d5d56fdf 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -226,11 +226,11 @@ Dml: op=[Insert] table=[test_decimal]
 #[rstest]
 #[case::duplicate_columns(
     "INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
-    "Schema error: Schema contains duplicate unqualified field name 'price'"
+    "Schema error: Schema contains duplicate unqualified field name \"price\""
 )]
 #[case::non_existing_column(
     "INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
-    "Schema error: No field named 'nonexistent'. Valid fields are 'id', 
'price'."
+    "Schema error: No field named \"nonexistent\". Valid fields are \"id\", 
\"price\"."
 )]
 #[case::type_mismatch(
     "INSERT INTO test_decimal SELECT '2022-01-01', 
to_timestamp('2022-01-01T12:00:00')",
@@ -515,7 +515,7 @@ fn select_with_ambiguous_column() {
     let sql = "SELECT id FROM person a, person b";
     let err = logical_plan(sql).expect_err("query should have failed");
     assert_eq!(
-        "SchemaError(AmbiguousReference { qualifier: None, name: \"id\" })",
+        "SchemaError(AmbiguousReference { field: Column { relation: None, 
name: \"id\" } })",
         format!("{err:?}")
     );
 }
@@ -538,7 +538,7 @@ fn where_selection_with_ambiguous_column() {
     let sql = "SELECT * FROM person a, person b WHERE id = id + 1";
     let err = logical_plan(sql).expect_err("query should have failed");
     assert_eq!(
-        "SchemaError(AmbiguousReference { qualifier: None, name: \"id\" })",
+        "SchemaError(AmbiguousReference { field: Column { relation: None, 
name: \"id\" } })",
         format!("{err:?}")
     );
 }
@@ -1121,9 +1121,9 @@ fn 
select_simple_aggregate_with_groupby_column_unselected() {
 fn 
select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist() {
     let sql = "SELECT SUM(age) FROM person GROUP BY doesnotexist";
     let err = logical_plan(sql).expect_err("query should have failed");
-    assert_eq!("Schema error: No field named 'doesnotexist'. Valid fields are 
'SUM(person.age)', \
-        'person'.'id', 'person'.'first_name', 'person'.'last_name', 
'person'.'age', 'person'.'state', \
-        'person'.'salary', 'person'.'birth_date', 'person'.'😀'.", 
format!("{err}"));
+    assert_eq!("Schema error: No field named \"doesnotexist\". Valid fields 
are \"SUM(person.age)\", \
+        \"person\".\"id\", \"person\".\"first_name\", 
\"person\".\"last_name\", \"person\".\"age\", \"person\".\"state\", \
+        \"person\".\"salary\", \"person\".\"birth_date\", \"person\".\"😀\".", 
format!("{err}"));
 }
 
 #[test]
@@ -1432,6 +1432,17 @@ fn select_where_with_positive_operator() {
     quick_test(sql, expected);
 }
 
+#[test]
+fn select_where_compound_identifiers() {
+    let sql = "SELECT aggregate_test_100.c3 \
+    FROM public.aggregate_test_100 \
+    WHERE aggregate_test_100.c3 > 0.1";
+    let expected = "Projection: public.aggregate_test_100.c3\
+            \n  Filter: public.aggregate_test_100.c3 > Float64(0.1)\
+            \n    TableScan: public.aggregate_test_100";
+    quick_test(sql, expected);
+}
+
 #[test]
 fn select_order_by_index() {
     let sql = "SELECT id FROM person ORDER BY 1";
@@ -2964,7 +2975,7 @@ fn order_by_unaliased_name() {
 #[test]
 fn order_by_ambiguous_name() {
     let sql = "select * from person a join person b using (id) order by age";
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'age'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"age\"";
 
     let err = logical_plan(sql).unwrap_err();
     assert_eq!(err.to_string(), expected);
@@ -2973,7 +2984,7 @@ fn order_by_ambiguous_name() {
 #[test]
 fn group_by_ambiguous_name() {
     let sql = "select max(id) from person a join person b using (id) group by 
age";
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'age'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"age\"";
 
     let err = logical_plan(sql).unwrap_err();
     assert_eq!(err.to_string(), expected);
@@ -3278,7 +3289,7 @@ fn test_ambiguous_column_references_in_on_join() {
             INNER JOIN person as p2
             ON id = 1";
 
-    let expected = "Schema error: Ambiguous reference to unqualified field 
'id'";
+    let expected = "Schema error: Ambiguous reference to unqualified field 
\"id\"";
 
     // It should return error.
     let result = logical_plan(sql);
@@ -3875,7 +3886,7 @@ fn assert_field_not_found(err: DataFusionError, name: 
&str) {
     match err {
         DataFusionError::SchemaError { .. } => {
             let msg = format!("{err}");
-            let expected = format!("Schema error: No field named '{name}'.");
+            let expected = format!("Schema error: No field named \"{name}\".");
             if !msg.starts_with(&expected) {
                 panic!("error [{msg}] did not start with [{expected}]");
             }
diff --git a/docs/source/user-guide/example-usage.md 
b/docs/source/user-guide/example-usage.md
index c497c66e5..a2cd109a6 100644
--- a/docs/source/user-guide/example-usage.md
+++ b/docs/source/user-guide/example-usage.md
@@ -118,9 +118,12 @@ async fn main() -> datafusion::error::Result<()> {
   let ctx = SessionContext::new();
   let df = ctx.read_csv("tests/data/capitalized_example.csv", 
CsvReadOptions::new()).await?;
 
-  let df = df.filter(col("A").lt_eq(col("c")))?
-           .aggregate(vec![col("A")], vec![min(col("b"))])?
-           .limit(0, Some(100))?;
+  let df = df
+      // col will parse the input string, hence requiring double quotes to 
maintain the capitalization
+      .filter(col("\"A\"").lt_eq(col("c")))?
+      // alternatively use ident to pass in an unqualified column name 
directly without parsing
+      .aggregate(vec![ident("A")], vec![min(col("b"))])?
+      .limit(0, Some(100))?;
 
   // execute and print results
   df.show().await?;

Reply via email to