This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4d07360e6 Support catalog.schema.table.column in SQL SELECT and WHERE
(#5343)
4d07360e6 is described below
commit 4d07360e6f67012571237ab1a773ac1fec7122d9
Author: Jeffrey <[email protected]>
AuthorDate: Wed Mar 15 01:50:10 2023 +1100
Support catalog.schema.table.column in SQL SELECT and WHERE (#5343)
* Support catalog.schema.table.column in SQL SELECT and WHERE
* Merge branch 'main' into support_catalog_schema_in_ident
* Update column new() docstring
* Add tests for dfschema search
* Introduce DFField::new_unqualified
* Introduce new_unqualified methods for simpler syntax
* Fix merge
* New ident() expr function
* Refactor OwnedTableReference to be a type alies of TableReference<'static>
* Update comments
* Comments
* Update docstrings
* From OwnedTableReference to TableReference impl
---
datafusion/common/src/column.rs | 109 ++++---
datafusion/common/src/dfschema.rs | 157 ++++++----
datafusion/common/src/error.rs | 70 +++--
datafusion/common/src/lib.rs | 5 +-
datafusion/common/src/table_reference.rs | 335 +++++++++-----------
datafusion/common/src/utils.rs | 134 ++++++++
datafusion/core/src/catalog/listing_schema.rs | 4 +-
datafusion/core/src/dataframe.rs | 2 +-
datafusion/core/src/execution/context.rs | 4 +-
.../core/src/physical_plan/file_format/parquet.rs | 4 +
datafusion/core/src/physical_plan/planner.rs | 4 +-
datafusion/core/tests/dataframe.rs | 8 +-
datafusion/core/tests/sql/idenfifers.rs | 8 +-
datafusion/core/tests/sql/references.rs | 2 +-
.../test_files/information_schema.slt | 24 ++
.../core/tests/sqllogictests/test_files/join.slt | 2 +-
datafusion/expr/src/expr_fn.rs | 40 ++-
datafusion/expr/src/expr_rewriter.rs | 7 +-
datafusion/expr/src/expr_schema.rs | 5 +-
datafusion/expr/src/logical_plan/builder.rs | 36 ++-
datafusion/expr/src/logical_plan/plan.rs | 2 +
datafusion/expr/src/utils.rs | 4 +-
.../optimizer/src/common_subexpr_eliminate.rs | 6 +-
datafusion/optimizer/src/optimizer.rs | 8 +-
datafusion/optimizer/src/push_down_projection.rs | 7 +-
.../optimizer/src/scalar_subquery_to_join.rs | 10 +-
.../src/simplify_expressions/expr_simplifier.rs | 16 +-
datafusion/optimizer/src/type_coercion.rs | 20 +-
.../optimizer/src/unwrap_cast_in_comparison.rs | 24 +-
.../physical-expr/src/intervals/cp_solver.rs | 2 +-
datafusion/proto/src/logical_plan/from_proto.rs | 17 +-
datafusion/proto/src/logical_plan/to_proto.rs | 20 +-
datafusion/sql/src/expr/arrow_cast.rs | 2 +-
datafusion/sql/src/expr/identifier.rs | 338 ++++++++++++++++++---
datafusion/sql/src/expr/mod.rs | 2 +-
datafusion/sql/src/planner.rs | 21 +-
datafusion/sql/src/relation/mod.rs | 5 +-
datafusion/sql/src/statement.rs | 22 +-
datafusion/sql/tests/integration_test.rs | 33 +-
docs/source/user-guide/example-usage.md | 9 +-
40 files changed, 1018 insertions(+), 510 deletions(-)
diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs
index 4a0e392f6..78c185723 100644
--- a/datafusion/common/src/column.rs
+++ b/datafusion/common/src/column.rs
@@ -17,7 +17,8 @@
//! Column
-use crate::{DFSchema, DataFusionError, Result, SchemaError};
+use crate::utils::{parse_identifiers_normalized, quote_identifier};
+use crate::{DFSchema, DataFusionError, OwnedTableReference, Result,
SchemaError};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
@@ -27,21 +28,37 @@ use std::sync::Arc;
/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
- /// relation/table name.
- pub relation: Option<String>,
+ /// relation/table reference.
+ pub relation: Option<OwnedTableReference>,
/// field/column name.
pub name: String,
}
impl Column {
- /// Create Column from optional qualifier and name
- pub fn new(relation: Option<impl Into<String>>, name: impl Into<String>)
-> Self {
+ /// Create Column from optional qualifier and name. The optional
qualifier, if present,
+ /// will be parsed and normalized by default.
+ ///
+ /// See full details on [`TableReference::parse_str`]
+ ///
+ /// [`TableReference::parse_str`]: crate::TableReference::parse_str
+ pub fn new(
+ relation: Option<impl Into<OwnedTableReference>>,
+ name: impl Into<String>,
+ ) -> Self {
Self {
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
+ /// Convenience method for when there is no qualifier
+ pub fn new_unqualified(name: impl Into<String>) -> Self {
+ Self {
+ relation: None,
+ name: name.into(),
+ }
+ }
+
/// Create Column from unqualified name.
pub fn from_name(name: impl Into<String>) -> Self {
Self {
@@ -53,26 +70,36 @@ impl Column {
/// Deserialize a fully qualified name string into a column
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
- use sqlparser::tokenizer::Token;
-
- let dialect = sqlparser::dialect::GenericDialect {};
- let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect,
&flat_name);
- if let Ok(tokens) = tokenizer.tokenize() {
- if let [Token::Word(relation), Token::Period, Token::Word(name)] =
- tokens.as_slice()
- {
- return Column {
- relation: Some(relation.value.clone()),
- name: name.value.clone(),
- };
- }
- }
- // any expression that's not in the form of `foo.bar` will be treated
as unqualified column
- // name
- Column {
- relation: None,
- name: flat_name,
- }
+ let mut idents = parse_identifiers_normalized(&flat_name);
+
+ let (relation, name) = match idents.len() {
+ 1 => (None, idents.remove(0)),
+ 2 => (
+ Some(OwnedTableReference::Bare {
+ table: idents.remove(0).into(),
+ }),
+ idents.remove(0),
+ ),
+ 3 => (
+ Some(OwnedTableReference::Partial {
+ schema: idents.remove(0).into(),
+ table: idents.remove(0).into(),
+ }),
+ idents.remove(0),
+ ),
+ 4 => (
+ Some(OwnedTableReference::Full {
+ catalog: idents.remove(0).into(),
+ schema: idents.remove(0).into(),
+ table: idents.remove(0).into(),
+ }),
+ idents.remove(0),
+ ),
+ // any expression that failed to parse or has more than 4 period
delimited
+ // identifiers will be treated as an unqualified column name
+ _ => (None, flat_name),
+ };
+ Self { relation, name }
}
/// Serialize column into a flat name string
@@ -83,6 +110,18 @@ impl Column {
}
}
+ /// Serialize column into a quoted flat name string
+ pub fn quoted_flat_name(&self) -> String {
+ // TODO: quote identifiers only when special characters present
+ // see: https://github.com/apache/arrow-datafusion/issues/5523
+ match &self.relation {
+ Some(r) => {
+ format!("{}.{}", r.to_quoted_string(),
quote_identifier(&self.name))
+ }
+ None => quote_identifier(&self.name),
+ }
+ }
+
/// Qualify column if not done yet.
///
/// If this column already has a [relation](Self::relation), it will be
returned as is and the given parameters are
@@ -151,7 +190,7 @@ impl Column {
}
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
- field: Column::new(self.relation.clone(), self.name),
+ field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
@@ -240,8 +279,7 @@ impl Column {
// If not due to USING columns then due to ambiguous
column name
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
- qualifier: None,
- name: self.name,
+ field: Column::new_unqualified(self.name),
},
));
}
@@ -249,7 +287,7 @@ impl Column {
}
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
- field: self,
+ field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
@@ -304,7 +342,12 @@ mod tests {
let fields = names
.iter()
.map(|(qualifier, name)| {
- DFField::new(qualifier.to_owned(), name, DataType::Boolean,
true)
+ DFField::new(
+ qualifier.to_owned().map(|s| s.to_string()),
+ name,
+ DataType::Boolean,
+ true,
+ )
})
.collect::<Vec<_>>();
DFSchema::new_with_metadata(fields, HashMap::new())
@@ -362,9 +405,7 @@ mod tests {
&[],
)
.expect_err("should've failed to find field");
- let expected = "Schema error: No field named 'z'. \
- Valid fields are 't1'.'a', 't1'.'b', 't2'.'c', \
- 't2'.'d', 't3'.'a', 't3'.'b', 't3'.'c', 't3'.'d', 't3'.'e'.";
+ let expected = r#"Schema error: No field named "z". Valid fields are
"t1"."a", "t1"."b", "t2"."c", "t2"."d", "t3"."a", "t3"."b", "t3"."c", "t3"."d",
"t3"."e"."#;
assert_eq!(err.to_string(), expected);
// ambiguous column reference
@@ -375,7 +416,7 @@ mod tests {
&[],
)
.expect_err("should've found ambiguous field");
- let expected = "Schema error: Ambiguous reference to unqualified field
'a'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"a\"";
assert_eq!(err.to_string(), expected);
Ok(())
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index bb443f8aa..52a0b3d40 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -23,8 +23,9 @@ use std::convert::TryFrom;
use std::hash::Hash;
use std::sync::Arc;
-use crate::error::{DataFusionError, Result, SchemaError};
-use crate::{field_not_found, Column, TableReference};
+use crate::error::{unqualified_field_not_found, DataFusionError, Result,
SchemaError};
+use crate::utils::quote_identifier;
+use crate::{field_not_found, Column, OwnedTableReference, TableReference};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -70,7 +71,7 @@ impl DFSchema {
if !qualified_names.insert((qualifier, field.name())) {
return Err(DataFusionError::SchemaError(
SchemaError::DuplicateQualifiedField {
- qualifier: qualifier.to_string(),
+ qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
},
));
@@ -90,18 +91,16 @@ impl DFSchema {
let mut qualified_names = qualified_names
.iter()
.map(|(l, r)| (l.to_owned(), r.to_owned()))
- .collect::<Vec<(&String, &String)>>();
- qualified_names.sort_by(|a, b| {
- let a = format!("{}.{}", a.0, a.1);
- let b = format!("{}.{}", b.0, b.1);
- a.cmp(&b)
- });
+ .collect::<Vec<(&OwnedTableReference, &String)>>();
+ qualified_names.sort();
for (qualifier, name) in &qualified_names {
if unqualified_names.contains(name) {
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
- qualifier: Some(qualifier.to_string()),
- name: name.to_string(),
+ field: Column {
+ relation: Some((*qualifier).clone()),
+ name: name.to_string(),
+ },
},
));
}
@@ -115,7 +114,7 @@ impl DFSchema {
schema
.fields()
.iter()
- .map(|f| DFField::from_qualified(qualifier, f.clone()))
+ .map(|f| DFField::from_qualified(qualifier.to_string(),
f.clone()))
.collect(),
schema.metadata().clone(),
)
@@ -140,7 +139,7 @@ impl DFSchema {
for field in other_schema.fields() {
// skip duplicate columns
let duplicated_field = match field.qualifier() {
- Some(q) => self.field_with_name(Some(q.as_str()),
field.name()).is_ok(),
+ Some(q) => self.field_with_name(Some(q), field.name()).is_ok(),
// for unqualified columns, check as unqualified name
None => self.field_with_unqualified_name(field.name()).is_ok(),
};
@@ -173,7 +172,7 @@ impl DFSchema {
// a fully qualified field name is provided.
match &self.fields[i].qualifier {
Some(qualifier) => {
- if (qualifier.to_owned() + "." +
self.fields[i].name()) == name {
+ if (qualifier.to_string() + "." +
self.fields[i].name()) == name {
return Err(DataFusionError::Plan(format!(
"Fully qualified field name '{name}' was
supplied to `index_of` \
which is deprecated. Please use
`index_of_column_by_name` instead"
@@ -185,12 +184,12 @@ impl DFSchema {
}
}
- Err(field_not_found(None, name, self))
+ Err(unqualified_field_not_found(name, self))
}
pub fn index_of_column_by_name(
&self,
- qualifier: Option<&str>,
+ qualifier: Option<&TableReference>,
name: &str,
) -> Result<Option<usize>> {
let mut matches = self
@@ -201,19 +200,19 @@ impl DFSchema {
// field to lookup is qualified.
// current field is qualified and not shared between
relations, compare both
// qualifier and name.
- (Some(q), Some(field_q)) => q == field_q && field.name() ==
name,
+ (Some(q), Some(field_q)) => {
+ q.resolved_eq(field_q) && field.name() == name
+ }
// field to lookup is qualified but current field is
unqualified.
(Some(qq), None) => {
// the original field may now be aliased with a name that
matches the
// original qualified name
- let table_ref =
TableReference::parse_str(field.name().as_str());
- match table_ref {
- TableReference::Partial { schema, table } => {
- schema == qq && table == name
- }
- TableReference::Full { schema, table, .. } => {
- schema == qq && table == name
- }
+ let column = Column::from_qualified_name(field.name());
+ match column {
+ Column {
+ relation: Some(r),
+ name: column_name,
+ } => &r == qq && column_name == name,
_ => false,
}
}
@@ -227,9 +226,11 @@ impl DFSchema {
None => Ok(Some(idx)),
// found more than one matches
Some(_) => Err(DataFusionError::Internal(format!(
- "Ambiguous reference to qualified field named '{}.{}'",
- qualifier.unwrap_or("<unqualified>"),
- name
+ "Ambiguous reference to qualified field named {}.{}",
+ qualifier
+ .map(|q| q.to_quoted_string())
+ .unwrap_or("<unqualified>".to_string()),
+ quote_identifier(name)
))),
},
}
@@ -237,23 +238,20 @@ impl DFSchema {
/// Find the index of the column with the given qualifier and name
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
- let qualifier = col.relation.as_deref();
- self.index_of_column_by_name(col.relation.as_deref(), &col.name)?
- .ok_or_else(|| {
- field_not_found(qualifier.map(|s| s.to_string()), &col.name,
self)
- })
+ self.index_of_column_by_name(col.relation.as_ref(), &col.name)?
+ .ok_or_else(|| field_not_found(col.relation.clone(), &col.name,
self))
}
/// Check if the column is in the current schema
pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
- self.index_of_column_by_name(col.relation.as_deref(), &col.name)
+ self.index_of_column_by_name(col.relation.as_ref(), &col.name)
.map(|idx| idx.is_some())
}
/// Find the field with the given name
pub fn field_with_name(
&self,
- qualifier: Option<&str>,
+ qualifier: Option<&TableReference>,
name: &str,
) -> Result<&DFField> {
if let Some(qualifier) = qualifier {
@@ -264,7 +262,7 @@ impl DFSchema {
}
/// Find all fields having the given qualifier
- pub fn fields_with_qualified(&self, qualifier: &str) -> Vec<&DFField> {
+ pub fn fields_with_qualified(&self, qualifier: &TableReference) ->
Vec<&DFField> {
self.fields
.iter()
.filter(|field| field.qualifier().map(|q|
q.eq(qualifier)).unwrap_or(false))
@@ -283,7 +281,7 @@ impl DFSchema {
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
let matches = self.fields_with_unqualified_name(name);
match matches.len() {
- 0 => Err(field_not_found(None, name, self)),
+ 0 => Err(unqualified_field_not_found(name, self)),
1 => Ok(matches[0]),
_ => {
// When `matches` size > 1, it doesn't necessarily mean an
`ambiguous name` problem.
@@ -302,8 +300,10 @@ impl DFSchema {
} else {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
- qualifier: None,
- name: name.to_string(),
+ field: Column {
+ relation: None,
+ name: name.to_string(),
+ },
},
))
}
@@ -314,7 +314,7 @@ impl DFSchema {
/// Find the field with the given qualified name
pub fn field_with_qualified_name(
&self,
- qualifier: &str,
+ qualifier: &TableReference,
name: &str,
) -> Result<&DFField> {
let idx = self
@@ -338,7 +338,11 @@ impl DFSchema {
}
/// Find if the field exists with the given qualified name
- pub fn has_column_with_qualified_name(&self, qualifier: &str, name: &str)
-> bool {
+ pub fn has_column_with_qualified_name(
+ &self,
+ qualifier: &TableReference,
+ name: &str,
+ ) -> bool {
self.fields().iter().any(|field| {
field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)
&& field.name() == name
@@ -452,12 +456,13 @@ impl DFSchema {
}
/// Replace all field qualifier with new value in schema
- pub fn replace_qualifier(self, qualifier: &str) -> Self {
+ pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>)
-> Self {
+ let qualifier = qualifier.into();
DFSchema {
fields: self
.fields
.into_iter()
- .map(|f| DFField::from_qualified(qualifier, f.field))
+ .map(|f| DFField::from_qualified(qualifier.clone(), f.field))
.collect(),
..self
}
@@ -621,21 +626,29 @@ impl ExprSchema for DFSchema {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DFField {
/// Optional qualifier (usually a table or relation name)
- qualifier: Option<String>,
+ qualifier: Option<OwnedTableReference>,
/// Arrow field definition
field: Field,
}
impl DFField {
/// Creates a new `DFField`
- pub fn new(
- qualifier: Option<&str>,
+ pub fn new<R: Into<OwnedTableReference>>(
+ qualifier: Option<R>,
name: &str,
data_type: DataType,
nullable: bool,
) -> Self {
DFField {
- qualifier: qualifier.map(|s| s.to_owned()),
+ qualifier: qualifier.map(|s| s.into()),
+ field: Field::new(name, data_type, nullable),
+ }
+ }
+
+ /// Convenience method for creating new `DFField` without a qualifier
+ pub fn new_unqualified(name: &str, data_type: DataType, nullable: bool) ->
Self {
+ DFField {
+ qualifier: None,
field: Field::new(name, data_type, nullable),
}
}
@@ -649,9 +662,12 @@ impl DFField {
}
/// Create a qualified field from an existing Arrow field
- pub fn from_qualified(qualifier: &str, field: Field) -> Self {
+ pub fn from_qualified(
+ qualifier: impl Into<OwnedTableReference>,
+ field: Field,
+ ) -> Self {
Self {
- qualifier: Some(qualifier.to_owned()),
+ qualifier: Some(qualifier.into()),
field,
}
}
@@ -697,7 +713,7 @@ impl DFField {
}
/// Get the optional qualifier
- pub fn qualifier(&self) -> Option<&String> {
+ pub fn qualifier(&self) -> Option<&OwnedTableReference> {
self.qualifier.as_ref()
}
@@ -723,9 +739,9 @@ mod tests {
let col = Column::from_name("t1.c0");
let schema = DFSchema::try_from_qualified_schema("t1",
&test_schema_1())?;
// lookup with unqualified name "t1.c0"
- let err = schema.index_of_column(&col).err().unwrap();
+ let err = schema.index_of_column(&col).unwrap_err();
assert_eq!(
- "Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0',
't1'.'c1'.",
+ r#"Schema error: No field named "t1.c0". Valid fields are
"t1"."c0", "t1"."c1"."#,
&format!("{err}")
);
Ok(())
@@ -781,8 +797,12 @@ mod tests {
join.to_string()
);
// test valid access
- assert!(join.field_with_qualified_name("t1", "c0").is_ok());
- assert!(join.field_with_qualified_name("t2", "c0").is_ok());
+ assert!(join
+ .field_with_qualified_name(&TableReference::bare("t1"), "c0")
+ .is_ok());
+ assert!(join
+ .field_with_qualified_name(&TableReference::bare("t2"), "c0")
+ .is_ok());
// test invalid access
assert!(join.field_with_unqualified_name("c0").is_err());
assert!(join.field_with_unqualified_name("t1.c0").is_err());
@@ -798,7 +818,7 @@ mod tests {
assert!(join.is_err());
assert_eq!(
"Schema error: Schema contains duplicate \
- qualified field name \'t1\'.\'c0\'",
+ qualified field name \"t1\".\"c0\"",
&format!("{}", join.err().unwrap())
);
Ok(())
@@ -812,7 +832,7 @@ mod tests {
assert!(join.is_err());
assert_eq!(
"Schema error: Schema contains duplicate \
- unqualified field name \'c0\'",
+ unqualified field name \"c0\"",
&format!("{}", join.err().unwrap())
);
Ok(())
@@ -828,14 +848,18 @@ mod tests {
join.to_string()
);
// test valid access
- assert!(join.field_with_qualified_name("t1", "c0").is_ok());
+ assert!(join
+ .field_with_qualified_name(&TableReference::bare("t1"), "c0")
+ .is_ok());
assert!(join.field_with_unqualified_name("c0").is_ok());
assert!(join.field_with_unqualified_name("c100").is_ok());
assert!(join.field_with_name(None, "c100").is_ok());
// test invalid access
assert!(join.field_with_unqualified_name("t1.c0").is_err());
assert!(join.field_with_unqualified_name("t1.c100").is_err());
- assert!(join.field_with_qualified_name("", "c100").is_err());
+ assert!(join
+ .field_with_qualified_name(&TableReference::bare(""), "c100")
+ .is_err());
Ok(())
}
@@ -847,7 +871,7 @@ mod tests {
assert!(join.is_err());
assert_eq!(
"Schema error: Schema contains qualified \
- field name \'t1\'.\'c0\' and unqualified field name \'c0\' which would
be ambiguous",
+ field name \"t1\".\"c0\" and unqualified field name \"c0\" which would
be ambiguous",
&format!("{}", join.err().unwrap())
);
Ok(())
@@ -857,11 +881,11 @@ mod tests {
#[test]
fn helpful_error_messages() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1",
&test_schema_1())?;
- let expected_help = "Valid fields are \'t1\'.\'c0\', \'t1\'.\'c1\'.";
+ let expected_help = "Valid fields are \"t1\".\"c0\", \"t1\".\"c1\".";
// Pertinent message parts
- let expected_err_msg = "Fully qualified field name \'t1.c0\'";
+ let expected_err_msg = "Fully qualified field name 't1.c0'";
assert!(schema
- .field_with_qualified_name("x", "y")
+ .field_with_qualified_name(&TableReference::bare("x"), "y")
.unwrap_err()
.to_string()
.contains(expected_help));
@@ -889,12 +913,15 @@ mod tests {
let col = Column::from_qualified_name("t1.c0");
let err = schema.index_of_column(&col).err().unwrap();
- assert_eq!("Schema error: No field named 't1'.'c0'.",
&format!("{err}"));
+ assert_eq!(
+ r#"Schema error: No field named "t1"."c0"."#,
+ &format!("{err}")
+ );
// the same check without qualifier
let col = Column::from_name("c0");
let err = schema.index_of_column(&col).err().unwrap();
- assert_eq!("Schema error: No field named 'c0'.", &format!("{err}"));
+ assert_eq!(r#"Schema error: No field named "c0"."#, &format!("{err}"));
}
#[test]
@@ -1127,7 +1154,7 @@ mod tests {
let arrow_schema_ref = Arc::new(arrow_schema.clone());
let df_schema = DFSchema::new_with_metadata(
- vec![DFField::new(None, "c0", DataType::Int64, true)],
+ vec![DFField::new_unqualified("c0", DataType::Int64, true)],
metadata,
)
.unwrap();
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 3f10c1261..0231895f7 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -23,7 +23,8 @@ use std::io;
use std::result;
use std::sync::Arc;
-use crate::{Column, DFSchema};
+use crate::utils::quote_identifier;
+use crate::{Column, DFSchema, OwnedTableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
@@ -120,29 +121,41 @@ macro_rules! plan_err {
#[derive(Debug)]
pub enum SchemaError {
/// Schema contains a (possibly) qualified and unqualified field with same
unqualified name
- AmbiguousReference {
- qualifier: Option<String>,
+ AmbiguousReference { field: Column },
+ /// Schema contains duplicate qualified field name
+ DuplicateQualifiedField {
+ qualifier: Box<OwnedTableReference>,
name: String,
},
- /// Schema contains duplicate qualified field name
- DuplicateQualifiedField { qualifier: String, name: String },
/// Schema contains duplicate unqualified field name
DuplicateUnqualifiedField { name: String },
/// No field with this name
FieldNotFound {
- field: Column,
+ field: Box<Column>,
valid_fields: Vec<Column>,
},
}
/// Create a "field not found" DataFusion::SchemaError
-pub fn field_not_found(
- qualifier: Option<String>,
+pub fn field_not_found<R: Into<OwnedTableReference>>(
+ qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
- field: Column::new(qualifier, name),
+ field: Box::new(Column::new(qualifier, name)),
+ valid_fields: schema
+ .fields()
+ .iter()
+ .map(|f| f.qualified_column())
+ .collect(),
+ })
+}
+
+/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
+pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) ->
DataFusionError {
+ DataFusionError::SchemaError(SchemaError::FieldNotFound {
+ field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
@@ -158,25 +171,14 @@ impl Display for SchemaError {
field,
valid_fields,
} => {
- write!(f, "No field named ")?;
- if let Some(q) = &field.relation {
- write!(f, "'{}'.'{}'", q, field.name)?;
- } else {
- write!(f, "'{}'", field.name)?;
- }
+ write!(f, "No field named {}", field.quoted_flat_name())?;
if !valid_fields.is_empty() {
write!(
f,
". Valid fields are {}",
valid_fields
.iter()
- .map(|field| {
- if let Some(q) = &field.relation {
- format!("'{}'.'{}'", q, field.name)
- } else {
- format!("'{}'", field.name)
- }
- })
+ .map(|field| field.quoted_flat_name())
.collect::<Vec<String>>()
.join(", ")
)?;
@@ -186,20 +188,32 @@ impl Display for SchemaError {
Self::DuplicateQualifiedField { qualifier, name } => {
write!(
f,
- "Schema contains duplicate qualified field name
'{qualifier}'.'{name}'"
+ "Schema contains duplicate qualified field name {}.{}",
+ qualifier.to_quoted_string(),
+ quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
- "Schema contains duplicate unqualified field name '{name}'"
+ "Schema contains duplicate unqualified field name {}",
+ quote_identifier(name)
)
}
- Self::AmbiguousReference { qualifier, name } => {
- if let Some(q) = qualifier {
- write!(f, "Schema contains qualified field name
'{q}'.'{name}' and unqualified field name '{name}' which would be ambiguous")
+ Self::AmbiguousReference { field } => {
+ if field.relation.is_some() {
+ write!(
+ f,
+ "Schema contains qualified field name {} and
unqualified field name {} which would be ambiguous",
+ field.quoted_flat_name(),
+ quote_identifier(&field.name)
+ )
} else {
- write!(f, "Ambiguous reference to unqualified field
'{name}'")
+ write!(
+ f,
+ "Ambiguous reference to unqualified field {}",
+ field.quoted_flat_name()
+ )
}
}
}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 636feb21a..4af8720b0 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -34,7 +34,10 @@ pub mod utils;
use arrow::compute::SortOptions;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
-pub use error::{field_not_found, DataFusionError, Result, SchemaError,
SharedResult};
+pub use error::{
+ field_not_found, unqualified_field_not_found, DataFusionError, Result,
SchemaError,
+ SharedResult,
+};
pub use parsers::parse_interval;
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};
diff --git a/datafusion/common/src/table_reference.rs
b/datafusion/common/src/table_reference.rs
index 34656bc11..257073681 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -15,13 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::Result;
-use sqlparser::{
- ast::Ident,
- dialect::GenericDialect,
- parser::{Parser, ParserError},
- tokenizer::{Token, TokenWithLocation},
-};
+use crate::utils::{parse_identifiers_normalized, quote_identifier};
use std::borrow::Cow;
/// A resolved path to a table of the form "catalog.schema.table"
@@ -42,7 +36,7 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
}
/// Represents a path to a table that may require further resolution
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TableReference<'a> {
/// An unqualified table reference, e.g. "table"
Bare {
@@ -67,53 +61,53 @@ pub enum TableReference<'a> {
},
}
-/// Represents a path to a table that may require further resolution
-/// that owns the underlying names
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum OwnedTableReference {
- /// An unqualified table reference, e.g. "table"
- Bare {
- /// The table name
- table: String,
- },
- /// A partially resolved table reference, e.g. "schema.table"
- Partial {
- /// The schema containing the table
- schema: String,
- /// The table name
- table: String,
- },
- /// A fully resolved table reference, e.g. "catalog.schema.table"
- Full {
- /// The catalog (aka database) containing the table
- catalog: String,
- /// The schema containing the table
- schema: String,
- /// The table name
- table: String,
- },
-}
+pub type OwnedTableReference = TableReference<'static>;
-impl OwnedTableReference {
- /// Return a `TableReference` view of this `OwnedTableReference`
- pub fn as_table_reference(&self) -> TableReference<'_> {
+impl std::fmt::Display for TableReference<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
- Self::Bare { table } => TableReference::Bare {
- table: table.into(),
- },
- Self::Partial { schema, table } => TableReference::Partial {
- schema: schema.into(),
- table: table.into(),
- },
- Self::Full {
+ TableReference::Bare { table } => write!(f, "{table}"),
+ TableReference::Partial { schema, table } => {
+ write!(f, "{schema}.{table}")
+ }
+ TableReference::Full {
catalog,
schema,
table,
- } => TableReference::Full {
- catalog: catalog.into(),
- schema: schema.into(),
- table: table.into(),
- },
+ } => write!(f, "{catalog}.{schema}.{table}"),
+ }
+ }
+}
+
+impl<'a> TableReference<'a> {
+ /// Convenience method for creating a `Bare` variant of `TableReference`
+ pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
+ TableReference::Bare {
+ table: table.into(),
+ }
+ }
+
+ /// Convenience method for creating a `Partial` variant of `TableReference`
+ pub fn partial(
+ schema: impl Into<Cow<'a, str>>,
+ table: impl Into<Cow<'a, str>>,
+ ) -> TableReference<'a> {
+ TableReference::Partial {
+ schema: schema.into(),
+ table: table.into(),
+ }
+ }
+
+ /// Convenience method for creating a `Full` variant of `TableReference`
+ pub fn full(
+ catalog: impl Into<Cow<'a, str>>,
+ schema: impl Into<Cow<'a, str>>,
+ table: impl Into<Cow<'a, str>>,
+ ) -> TableReference<'a> {
+ TableReference::Full {
+ catalog: catalog.into(),
+ schema: schema.into(),
+ table: table.into(),
}
}
@@ -125,39 +119,44 @@ impl OwnedTableReference {
| Self::Bare { table } => table,
}
}
-}
-impl std::fmt::Display for OwnedTableReference {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ /// Retrieve the schema name if in the `Partial` or `Full` qualification
+ pub fn schema(&self) -> Option<&str> {
match self {
- OwnedTableReference::Bare { table } => write!(f, "{table}"),
- OwnedTableReference::Partial { schema, table } => {
- write!(f, "{schema}.{table}")
- }
- OwnedTableReference::Full {
- catalog,
- schema,
- table,
- } => write!(f, "{catalog}.{schema}.{table}"),
+ Self::Full { schema, .. } | Self::Partial { schema, .. } =>
Some(schema),
+ _ => None,
}
}
-}
-/// Convert `OwnedTableReference` into a `TableReference`. Somewhat
-/// awkward to use but 'idiomatic': `(&table_ref).into()`
-impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
- fn from(r: &'a OwnedTableReference) -> Self {
- r.as_table_reference()
+ /// Retrieve the catalog name if in the `Full` qualification
+ pub fn catalog(&self) -> Option<&str> {
+ match self {
+ Self::Full { catalog, .. } => Some(catalog),
+ _ => None,
+ }
}
-}
-impl<'a> TableReference<'a> {
- /// Retrieve the actual table name, regardless of qualification
- pub fn table(&self) -> &str {
+ /// Compare with another `TableReference` as if both are resolved.
+ /// This allows comparing across variants, where if a field is not present
+ /// in both variants being compared then it is ignored in the comparison.
+ ///
+ /// e.g. this allows a `TableReference::Bare` to be considered equal to a
+ /// fully qualified `TableReference::Full` if the table names match.
+ pub fn resolved_eq(&self, other: &Self) -> bool {
match self {
- Self::Full { table, .. }
- | Self::Partial { table, .. }
- | Self::Bare { table } => table,
+ TableReference::Bare { table } => table == other.table(),
+ TableReference::Partial { schema, table } => {
+ table == other.table() && other.schema().map_or(true, |s| s ==
schema)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => {
+ table == other.table()
+ && other.schema().map_or(true, |s| s == schema)
+ && other.catalog().map_or(true, |c| c == catalog)
+ }
}
}
@@ -190,6 +189,48 @@ impl<'a> TableReference<'a> {
}
}
+ /// Converts directly into an [`OwnedTableReference`]
+ pub fn to_owned_reference(&self) -> OwnedTableReference {
+ match self {
+ Self::Full {
+ catalog,
+ schema,
+ table,
+ } => OwnedTableReference::Full {
+ catalog: catalog.to_string().into(),
+ schema: schema.to_string().into(),
+ table: table.to_string().into(),
+ },
+ Self::Partial { schema, table } => OwnedTableReference::Partial {
+ schema: schema.to_string().into(),
+ table: table.to_string().into(),
+ },
+ Self::Bare { table } => OwnedTableReference::Bare {
+ table: table.to_string().into(),
+ },
+ }
+ }
+
+ /// Forms a string where the identifiers are quoted
+ pub fn to_quoted_string(&self) -> String {
+ match self {
+ TableReference::Bare { table } => quote_identifier(table),
+ TableReference::Partial { schema, table } => {
+ format!("{}.{}", quote_identifier(schema),
quote_identifier(table))
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => format!(
+ "{}.{}.{}",
+ quote_identifier(catalog),
+ quote_identifier(schema),
+ quote_identifier(table)
+ ),
+ }
+ }
+
/// Forms a [`TableReference`] by attempting to parse `s` as a multipart
identifier,
/// failing that then taking the entire unnormalized input as the
identifier itself.
///
@@ -199,14 +240,7 @@ impl<'a> TableReference<'a> {
/// `Foo".bar` (note the preserved case and requiring two double quotes to
represent
/// a single double quote in the identifier)
pub fn parse_str(s: &'a str) -> Self {
- let mut parts = parse_identifiers(s)
- .unwrap_or_default()
- .into_iter()
- .map(|id| match id.quote_style {
- Some(_) => id.value,
- None => id.value.to_ascii_lowercase(),
- })
- .collect::<Vec<_>>();
+ let mut parts = parse_identifiers_normalized(s);
match parts.len() {
1 => Self::Bare {
@@ -226,57 +260,34 @@ impl<'a> TableReference<'a> {
}
}
-// TODO: remove when can use
https://github.com/sqlparser-rs/sqlparser-rs/issues/805
-fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
- let dialect = GenericDialect;
- let mut parser = Parser::new(&dialect).try_with_sql(s)?;
- let mut idents = vec![];
-
- // expecting at least one word for identifier
- match parser.next_token_no_skip() {
- Some(TokenWithLocation {
- token: Token::Word(w),
- ..
- }) => idents.push(w.to_ident()),
- Some(TokenWithLocation { token, .. }) => {
- return Err(ParserError::ParserError(format!(
- "Unexpected token in identifier: {token}"
- )))?
- }
- None => {
- return Err(ParserError::ParserError(
- "Empty input when parsing identifier".to_string(),
- ))?
- }
- };
+/// Parse a `String` into a OwnedTableReference
+impl From<String> for OwnedTableReference {
+ fn from(s: String) -> Self {
+ TableReference::parse_str(&s).to_owned_reference()
+ }
+}
- while let Some(TokenWithLocation { token, .. }) =
parser.next_token_no_skip() {
- match token {
- // ensure that optional period is succeeded by another identifier
- Token::Period => match parser.next_token_no_skip() {
- Some(TokenWithLocation {
- token: Token::Word(w),
- ..
- }) => idents.push(w.to_ident()),
- Some(TokenWithLocation { token, .. }) => {
- return Err(ParserError::ParserError(format!(
- "Unexpected token following period in identifier:
{token}"
- )))?
- }
- None => {
- return Err(ParserError::ParserError(
- "Trailing period in identifier".to_string(),
- ))?
- }
+impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
+ fn from(value: &'a OwnedTableReference) -> Self {
+ match value {
+ OwnedTableReference::Bare { table } => TableReference::Bare {
+ table: Cow::Borrowed(table),
+ },
+ OwnedTableReference::Partial { schema, table } =>
TableReference::Partial {
+ schema: Cow::Borrowed(schema),
+ table: Cow::Borrowed(table),
+ },
+ OwnedTableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => TableReference::Full {
+ catalog: Cow::Borrowed(catalog),
+ schema: Cow::Borrowed(schema),
+ table: Cow::Borrowed(table),
},
- _ => {
- return Err(ParserError::ParserError(format!(
- "Unexpected token in identifier: {token}"
- )))?
- }
}
}
- Ok(idents)
}
/// Parse a string into a TableReference, normalizing where appropriate
@@ -288,6 +299,12 @@ impl<'a> From<&'a str> for TableReference<'a> {
}
}
+impl<'a> From<&'a String> for TableReference<'a> {
+ fn from(s: &'a String) -> Self {
+ Self::parse_str(s)
+ }
+}
+
impl<'a> From<ResolvedTableReference<'a>> for TableReference<'a> {
fn from(resolved: ResolvedTableReference<'a>) -> Self {
Self::Full {
@@ -302,64 +319,6 @@ impl<'a> From<ResolvedTableReference<'a>> for
TableReference<'a> {
mod tests {
use super::*;
- #[test]
- fn test_parse_identifiers() -> Result<()> {
- let s = "CATALOG.\"F(o)o. \"\"bar\".table";
- let actual = parse_identifiers(s)?;
- let expected = vec![
- Ident {
- value: "CATALOG".to_string(),
- quote_style: None,
- },
- Ident {
- value: "F(o)o. \"bar".to_string(),
- quote_style: Some('"'),
- },
- Ident {
- value: "table".to_string(),
- quote_style: None,
- },
- ];
- assert_eq!(expected, actual);
-
- let s = "";
- let err = parse_identifiers(s).expect_err("didn't fail to parse");
- assert_eq!(
- "SQL(ParserError(\"Empty input when parsing identifier\"))",
- format!("{err:?}")
- );
-
- let s = "*schema.table";
- let err = parse_identifiers(s).expect_err("didn't fail to parse");
- assert_eq!(
- "SQL(ParserError(\"Unexpected token in identifier: *\"))",
- format!("{err:?}")
- );
-
- let s = "schema.table*";
- let err = parse_identifiers(s).expect_err("didn't fail to parse");
- assert_eq!(
- "SQL(ParserError(\"Unexpected token in identifier: *\"))",
- format!("{err:?}")
- );
-
- let s = "schema.table.";
- let err = parse_identifiers(s).expect_err("didn't fail to parse");
- assert_eq!(
- "SQL(ParserError(\"Trailing period in identifier\"))",
- format!("{err:?}")
- );
-
- let s = "schema.*";
- let err = parse_identifiers(s).expect_err("didn't fail to parse");
- assert_eq!(
- "SQL(ParserError(\"Unexpected token following period in
identifier: *\"))",
- format!("{err:?}")
- );
-
- Ok(())
- }
-
#[test]
fn test_table_reference_from_str_normalizes() {
let expected = TableReference::Full {
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 3c0730153..a1226def8 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -20,6 +20,10 @@
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::ArrayRef;
use arrow::compute::SortOptions;
+use sqlparser::ast::Ident;
+use sqlparser::dialect::GenericDialect;
+use sqlparser::parser::{Parser, ParserError};
+use sqlparser::tokenizer::{Token, TokenWithLocation};
use std::cmp::Ordering;
/// Given column vectors, returns row at `idx`.
@@ -158,6 +162,78 @@ where
Ok(low)
}
+/// Wraps identifier string in double quotes, escaping any double quotes in
+/// the identifier by replacing it with two double quotes
+///
+/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
+pub fn quote_identifier(s: &str) -> String {
+ format!("\"{}\"", s.replace('"', "\"\""))
+}
+
+// TODO: remove when can use
https://github.com/sqlparser-rs/sqlparser-rs/issues/805
+pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
+ let dialect = GenericDialect;
+ let mut parser = Parser::new(&dialect).try_with_sql(s)?;
+ let mut idents = vec![];
+
+ // expecting at least one word for identifier
+ match parser.next_token_no_skip() {
+ Some(TokenWithLocation {
+ token: Token::Word(w),
+ ..
+ }) => idents.push(w.to_ident()),
+ Some(TokenWithLocation { token, .. }) => {
+ return Err(ParserError::ParserError(format!(
+ "Unexpected token in identifier: {token}"
+ )))?
+ }
+ None => {
+ return Err(ParserError::ParserError(
+ "Empty input when parsing identifier".to_string(),
+ ))?
+ }
+ };
+
+ while let Some(TokenWithLocation { token, .. }) =
parser.next_token_no_skip() {
+ match token {
+ // ensure that optional period is succeeded by another identifier
+ Token::Period => match parser.next_token_no_skip() {
+ Some(TokenWithLocation {
+ token: Token::Word(w),
+ ..
+ }) => idents.push(w.to_ident()),
+ Some(TokenWithLocation { token, .. }) => {
+ return Err(ParserError::ParserError(format!(
+ "Unexpected token following period in identifier:
{token}"
+ )))?
+ }
+ None => {
+ return Err(ParserError::ParserError(
+ "Trailing period in identifier".to_string(),
+ ))?
+ }
+ },
+ _ => {
+ return Err(ParserError::ParserError(format!(
+ "Unexpected token in identifier: {token}"
+ )))?
+ }
+ }
+ }
+ Ok(idents)
+}
+
+pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
+ parse_identifiers(s)
+ .unwrap_or_default()
+ .into_iter()
+ .map(|id| match id.quote_style {
+ Some(_) => id.value,
+ None => id.value.to_ascii_lowercase(),
+ })
+ .collect::<Vec<_>>()
+}
+
#[cfg(test)]
mod tests {
use arrow::array::Float64Array;
@@ -330,4 +406,62 @@ mod tests {
assert_eq!(res, 2);
Ok(())
}
+
+ #[test]
+ fn test_parse_identifiers() -> Result<()> {
+ let s = "CATALOG.\"F(o)o. \"\"bar\".table";
+ let actual = parse_identifiers(s)?;
+ let expected = vec![
+ Ident {
+ value: "CATALOG".to_string(),
+ quote_style: None,
+ },
+ Ident {
+ value: "F(o)o. \"bar".to_string(),
+ quote_style: Some('"'),
+ },
+ Ident {
+ value: "table".to_string(),
+ quote_style: None,
+ },
+ ];
+ assert_eq!(expected, actual);
+
+ let s = "";
+ let err = parse_identifiers(s).expect_err("didn't fail to parse");
+ assert_eq!(
+ "SQL(ParserError(\"Empty input when parsing identifier\"))",
+ format!("{err:?}")
+ );
+
+ let s = "*schema.table";
+ let err = parse_identifiers(s).expect_err("didn't fail to parse");
+ assert_eq!(
+ "SQL(ParserError(\"Unexpected token in identifier: *\"))",
+ format!("{err:?}")
+ );
+
+ let s = "schema.table*";
+ let err = parse_identifiers(s).expect_err("didn't fail to parse");
+ assert_eq!(
+ "SQL(ParserError(\"Unexpected token in identifier: *\"))",
+ format!("{err:?}")
+ );
+
+ let s = "schema.table.";
+ let err = parse_identifiers(s).expect_err("didn't fail to parse");
+ assert_eq!(
+ "SQL(ParserError(\"Trailing period in identifier\"))",
+ format!("{err:?}")
+ );
+
+ let s = "schema.*";
+ let err = parse_identifiers(s).expect_err("didn't fail to parse");
+ assert_eq!(
+ "SQL(ParserError(\"Unexpected token following period in
identifier: *\"))",
+ format!("{err:?}")
+ );
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/catalog/listing_schema.rs
b/datafusion/core/src/catalog/listing_schema.rs
index 32ee9f62a..3ea7a1098 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -128,9 +128,7 @@ impl ListingSchemaProvider {
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
- let name = OwnedTableReference::Bare {
- table: table_name.to_string(),
- };
+ let name = OwnedTableReference::bare(table_name.to_string());
let provider = self
.factory
.create(
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index eab56432e..af41725db 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -1372,7 +1372,7 @@ mod tests {
let join = left
.join_on(right, JoinType::Inner, [col("c1").eq(col("c1"))])
.expect_err("join didn't fail check");
- let expected = "Schema error: Ambiguous reference to unqualified field
'c1'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"c1\"";
assert_eq!(join.to_string(), expected);
Ok(())
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index aa02e4be3..1466bcba0 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -282,7 +282,7 @@ impl SessionContext {
self.session_id.clone()
}
- /// Return the [`TableFactoryProvider`] that is registered for the
+ /// Return the [`TableProviderFactory`] that is registered for the
/// specified file type, if any.
pub fn table_factory(
&self,
@@ -1933,7 +1933,7 @@ impl SessionState {
self.config.options.sql_parser.parse_float_as_decimal;
for reference in references {
let table = reference.table();
- let resolved =
self.resolve_table_ref(reference.as_table_reference());
+ let resolved = self.resolve_table_ref(&reference);
if let Entry::Vacant(v) =
provider.tables.entry(resolved.to_string()) {
if let Ok(schema) = self.schema_for_ref(resolved) {
if let Some(table) = schema.table(table).await {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2c1bfc9ca..3f3b0bb74 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -204,6 +204,8 @@ impl ParquetExec {
/// `ParquetRecordBatchStream`. These filters are applied by the
/// parquet decoder to skip unecessairly decoding other columns
/// which would not pass the predicate. Defaults to false
+ ///
+ /// [`Expr`]: datafusion_expr::Expr
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.pushdown_filters = Some(pushdown_filters);
self
@@ -219,6 +221,8 @@ impl ParquetExec {
/// minimize the cost of filter evaluation by reordering the
/// predicate [`Expr`]s. If false, the predicates are applied in
/// the same order as specified in the query. Defaults to false.
+ ///
+ /// [`Expr`]: datafusion_expr::Expr
pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
self.reorder_filters = Some(reorder_filters);
self
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 38830c685..0dba182d8 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -2388,7 +2388,7 @@ Internal error: Optimizer rule 'type_coercion' failed due
to unexpected error: E
Self {
schema: DFSchemaRef::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Int32, false)],
+ vec![DFField::new_unqualified("a", DataType::Int32,
false)],
HashMap::new(),
)
.unwrap(),
@@ -2523,7 +2523,7 @@ Internal error: Optimizer rule 'type_coercion' failed due
to unexpected error: E
.projected_schema
.as_ref()
.clone()
- .replace_qualifier(name);
+ .replace_qualifier(name.to_string());
scan.projected_schema = Arc::new(new_schema);
LogicalPlan::TableScan(scan)
}
diff --git a/datafusion/core/tests/dataframe.rs
b/datafusion/core/tests/dataframe.rs
index 19ceebe17..82bd0d844 100644
--- a/datafusion/core/tests/dataframe.rs
+++ b/datafusion/core/tests/dataframe.rs
@@ -249,7 +249,7 @@ async fn sort_on_ambiguous_column() -> Result<()> {
.sort(vec![col("b").sort(true, true)])
.unwrap_err();
- let expected = "Schema error: Ambiguous reference to unqualified field
'b'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"b\"";
assert_eq!(err.to_string(), expected);
Ok(())
}
@@ -268,7 +268,7 @@ async fn group_by_ambiguous_column() -> Result<()> {
.aggregate(vec![col("b")], vec![max(col("a"))])
.unwrap_err();
- let expected = "Schema error: Ambiguous reference to unqualified field
'b'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"b\"";
assert_eq!(err.to_string(), expected);
Ok(())
}
@@ -287,7 +287,7 @@ async fn filter_on_ambiguous_column() -> Result<()> {
.filter(col("b").eq(lit(1)))
.unwrap_err();
- let expected = "Schema error: Ambiguous reference to unqualified field
'b'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"b\"";
assert_eq!(err.to_string(), expected);
Ok(())
}
@@ -306,7 +306,7 @@ async fn select_ambiguous_column() -> Result<()> {
.select(vec![col("b")])
.unwrap_err();
- let expected = "Schema error: Ambiguous reference to unqualified field
'b'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"b\"";
assert_eq!(err.to_string(), expected);
Ok(())
}
diff --git a/datafusion/core/tests/sql/idenfifers.rs
b/datafusion/core/tests/sql/idenfifers.rs
index a305f23b4..1b57f60bd 100644
--- a/datafusion/core/tests/sql/idenfifers.rs
+++ b/datafusion/core/tests/sql/idenfifers.rs
@@ -211,28 +211,28 @@ async fn case_insensitive_in_sql_errors() {
.await
.unwrap_err()
.to_string();
- assert_contains!(actual, "No field named 'column1'");
+ assert_contains!(actual, r#"No field named "column1""#);
let actual = ctx
.sql("SELECT Column1 from test")
.await
.unwrap_err()
.to_string();
- assert_contains!(actual, "No field named 'column1'");
+ assert_contains!(actual, r#"No field named "column1""#);
let actual = ctx
.sql("SELECT column1 from test")
.await
.unwrap_err()
.to_string();
- assert_contains!(actual, "No field named 'column1'");
+ assert_contains!(actual, r#"No field named "column1""#);
let actual = ctx
.sql(r#"SELECT "column1" from test"#)
.await
.unwrap_err()
.to_string();
- assert_contains!(actual, "No field named 'column1'");
+ assert_contains!(actual, r#"No field named "column1""#);
// This should pass (note the quotes)
ctx.sql(r#"SELECT "Column1" from test"#).await.unwrap();
diff --git a/datafusion/core/tests/sql/references.rs
b/datafusion/core/tests/sql/references.rs
index f006cbb45..335bc6308 100644
--- a/datafusion/core/tests/sql/references.rs
+++ b/datafusion/core/tests/sql/references.rs
@@ -67,7 +67,7 @@ async fn qualified_table_references_and_fields() ->
Result<()> {
let error = ctx.sql(sql).await.unwrap_err();
assert_contains!(
error.to_string(),
- "No field named 'f1'.'c1'. Valid fields are 'test'.'f.c1',
'test'.'test.c2'"
+ r#"No field named "f1"."c1". Valid fields are "test"."f.c1",
"test"."test.c2""#
);
// however, enclosing it in double quotes is ok
diff --git
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 81ce141a8..25e4195fb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -84,6 +84,30 @@ datafusion information_schema views VIEW
datafusion public t BASE TABLE
datafusion public t2 BASE TABLE
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE
tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE
information_schema.tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
+query TTTT rowsort
+SELECT * from information_schema.tables WHERE
datafusion.information_schema.tables.table_schema='information_schema';
+----
+datafusion information_schema columns VIEW
+datafusion information_schema df_settings VIEW
+datafusion information_schema tables VIEW
+datafusion information_schema views VIEW
+
# Cleanup
statement ok
drop table t
diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt
b/datafusion/core/tests/sqllogictests/test_files/join.slt
index 8d5f57343..49f7cf49b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/join.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/join.slt
@@ -65,7 +65,7 @@ CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
(55, 'w', 3);
# left semi with wrong where clause
-query error DataFusion error: Schema error: No field named 't2'.'t2_id'. Valid
fields are 't1'.'t1_id', 't1'.'t1_name', 't1'.'t1_int'.
+query error DataFusion error: Schema error: No field named "t2"."t2_id". Valid
fields are "t1"."t1_id", "t1"."t1_name", "t1"."t1_int".
SELECT t1.t1_id,
t1.t1_name,
t1.t1_int
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 6465ca80b..ef6f8ac50 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -28,17 +28,47 @@ use arrow::datatypes::DataType;
use datafusion_common::{Column, Result};
use std::sync::Arc;
-/// Create a column expression based on a qualified or unqualified column name
+/// Create a column expression based on a qualified or unqualified column
name. Will
+/// normalize unquoted identifiers according to SQL rules (identifiers will
become lowercase).
///
-/// example:
-/// ```
+/// For example:
+///
+/// ```rust
/// # use datafusion_expr::col;
-/// let c = col("my_column");
+/// let c1 = col("a");
+/// let c2 = col("A");
+/// assert_eq!(c1, c2);
+///
+/// // note how quoting with double quotes preserves the case
+/// let c3 = col(r#""A""#);
+/// assert_ne!(c1, c3);
/// ```
pub fn col(ident: impl Into<Column>) -> Expr {
Expr::Column(ident.into())
}
+/// Create an unqualified column expression from the provided name, without
normalizing
+/// the column.
+///
+/// For example:
+///
+/// ```rust
+/// # use datafusion_expr::{col, ident};
+/// let c1 = ident("A"); // not normalized staying as column 'A'
+/// let c2 = col("A"); // normalized via SQL rules becoming column 'a'
+/// assert_ne!(c1, c2);
+///
+/// let c3 = col(r#""A""#);
+/// assert_eq!(c1, c3);
+///
+/// let c4 = col("t1.a"); // parses as relation 't1' column 'a'
+/// let c5 = ident("t1.a"); // parses as column 't1.a'
+/// assert_ne!(c4, c5);
+/// ```
+pub fn ident(name: impl Into<String>) -> Expr {
+ Expr::Column(Column::from_name(name))
+}
+
/// Return a new expression `left <op> right`
pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
@@ -652,7 +682,7 @@ mod test {
#[test]
fn filter_is_null_and_is_not_null() {
let col_null = col("col1");
- let col_not_null = col("col2");
+ let col_not_null = ident("col2");
assert_eq!(format!("{:?}", col_null.is_null()), "col1 IS NULL");
assert_eq!(
format!("{:?}", col_not_null.is_not_null()),
diff --git a/datafusion/expr/src/expr_rewriter.rs
b/datafusion/expr/src/expr_rewriter.rs
index 3ae9e43e0..ade276bab 100644
--- a/datafusion/expr/src/expr_rewriter.rs
+++ b/datafusion/expr/src/expr_rewriter.rs
@@ -664,7 +664,8 @@ mod test {
fn normalize_cols_non_exist() {
// test normalizing columns when the name doesn't exist
let expr = col("a") + col("b");
- let schema_a =
make_schema_with_empty_metadata(vec![make_field("tableA", "a")]);
+ let schema_a =
+ make_schema_with_empty_metadata(vec![make_field("\"tableA\"",
"a")]);
let schemas = vec![schema_a];
let schemas = schemas.iter().collect::<Vec<_>>();
@@ -674,7 +675,7 @@ mod test {
.to_string();
assert_eq!(
error,
- "Schema error: No field named 'b'. Valid fields are 'tableA'.'a'."
+ r#"Schema error: No field named "b". Valid fields are
"tableA"."a"."#
);
}
@@ -690,7 +691,7 @@ mod test {
}
fn make_field(relation: &str, column: &str) -> DFField {
- DFField::new(Some(relation), column, DataType::Int8, false)
+ DFField::new(Some(relation.to_string()), column, DataType::Int8, false)
}
#[test]
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 493c425d7..5ceaf1668 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -247,13 +247,12 @@ impl ExprSchemable for Expr {
fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
- c.relation.as_deref(),
+ c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
- _ => Ok(DFField::new(
- None,
+ _ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 9be046934..5616b7c52 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -176,8 +176,7 @@ impl LogicalPlanBuilder {
.map(|(j, data_type)| {
// naming is following convention
https://www.postgresql.org/docs/current/queries-values.html
let name = &format!("column{}", j + 1);
- DFField::new(
- None,
+ DFField::new_unqualified(
name,
data_type.clone().unwrap_or(DataType::Utf8),
true,
@@ -239,7 +238,10 @@ impl LogicalPlanBuilder {
DFSchema::new_with_metadata(
p.iter()
.map(|i| {
- DFField::from_qualified(&table_name,
schema.field(*i).clone())
+ DFField::from_qualified(
+ table_name.to_string(),
+ schema.field(*i).clone(),
+ )
})
.collect(),
schema.metadata().clone(),
@@ -1105,7 +1107,7 @@ pub fn union(left_plan: LogicalPlan, right_plan:
LogicalPlan) -> Result<LogicalP
})?;
Ok(DFField::new(
- left_field.qualifier().map(|x| x.as_ref()),
+ left_field.qualifier().cloned(),
left_field.name(),
data_type,
nullable,
@@ -1291,7 +1293,7 @@ pub fn unnest(input: LogicalPlan, column: Column) ->
Result<LogicalPlan> {
DataType::List(field)
| DataType::FixedSizeList(field, _)
| DataType::LargeList(field) => DFField::new(
- unnest_field.qualifier().map(String::as_str),
+ unnest_field.qualifier().cloned(),
unnest_field.name(),
field.data_type().clone(),
unnest_field.is_nullable(),
@@ -1332,7 +1334,7 @@ pub fn unnest(input: LogicalPlan, column: Column) ->
Result<LogicalPlan> {
mod tests {
use crate::{expr, expr_fn::exists};
use arrow::datatypes::{DataType, Field};
- use datafusion_common::SchemaError;
+ use datafusion_common::{OwnedTableReference, SchemaError, TableReference};
use crate::logical_plan::StringifiedPlan;
@@ -1607,10 +1609,13 @@ mod tests {
match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
- qualifier,
- name,
+ field:
+ Column {
+ relation: Some(OwnedTableReference::Bare { table }),
+ name,
+ },
})) => {
- assert_eq!("employee_csv", qualifier.unwrap().as_str());
+ assert_eq!("employee_csv", table);
assert_eq!("id", &name);
Ok(())
}
@@ -1633,10 +1638,13 @@ mod tests {
match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
- qualifier,
- name,
+ field:
+ Column {
+ relation: Some(OwnedTableReference::Bare { table }),
+ name,
+ },
})) => {
- assert_eq!("employee_csv", qualifier.unwrap().as_str());
+ assert_eq!("employee_csv", table);
assert_eq!("state", &name);
Ok(())
}
@@ -1737,7 +1745,7 @@ mod tests {
// Check unnested field is a scalar
let field = plan
.schema()
- .field_with_name(Some("test_table"), "strings")
+ .field_with_name(Some(&TableReference::bare("test_table")),
"strings")
.unwrap();
assert_eq!(&DataType::Utf8, field.data_type());
@@ -1756,7 +1764,7 @@ mod tests {
// Check unnested struct list field should be a struct.
let field = plan
.schema()
- .field_with_name(Some("test_table"), "structs")
+ .field_with_name(Some(&TableReference::bare("test_table")),
"structs")
.unwrap();
assert!(matches!(field.data_type(), DataType::Struct(_)));
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 83bda0612..777a5871c 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1521,6 +1521,8 @@ pub struct Window {
#[derive(Clone)]
pub struct TableScan {
/// The name of the table
+ // TODO: change to OwnedTableReference
+ // see: https://github.com/apache/arrow-datafusion/issues/5522
pub table_name: String,
/// The source of the table
pub source: Arc<dyn TableSource>,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index a0745025d..3d8b44792 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -35,6 +35,7 @@ use crate::{
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
+ TableReference,
};
use std::cmp::Ordering;
use std::collections::HashSet;
@@ -191,8 +192,9 @@ pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
) -> Result<Vec<Expr>> {
+ let qualifier = TableReference::from(qualifier);
let qualified_fields: Vec<DFField> = schema
- .fields_with_qualified(qualifier)
+ .fields_with_qualified(&qualifier)
.into_iter()
.cloned()
.collect();
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index e1830390d..33bf676db 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -312,7 +312,7 @@ fn build_common_expr_project_plan(
match expr_set.get(&id) {
Some((expr, _, data_type)) => {
// todo: check `nullable`
- let field = DFField::new(None, &id, data_type.clone(), true);
+ let field = DFField::new_unqualified(&id, data_type.clone(),
true);
fields_set.insert(field.name().to_owned());
project_exprs.push(expr.clone().alias(&id));
}
@@ -624,8 +624,8 @@ mod test {
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
- DFField::new(None, "a", DataType::Int64, false),
- DFField::new(None, "c", DataType::Int64, false),
+ DFField::new_unqualified("a", DataType::Int64, false),
+ DFField::new_unqualified("c", DataType::Int64, false),
],
Default::default(),
)?);
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index c1baa25d4..6091c4f6f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -476,9 +476,9 @@ mod tests {
Internal error: Optimizer rule 'get table_scan rule' failed, due
to generate a different schema, \
original schema: DFSchema { fields: [], metadata: {} }, \
new schema: DFSchema { fields: [\
- DFField { qualifier: Some(\"test\"), field: Field { name: \"a\",
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} } }, \
- DFField { qualifier: Some(\"test\"), field: Field { name: \"b\",
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} } }, \
- DFField { qualifier: Some(\"test\"), field: Field { name: \"c\",
data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} } }], \
+ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field
{ name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} } }, \
+ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field
{ name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} } }, \
+ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field
{ name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} } }], \
metadata: {} }. \
This was likely caused by a bug in DataFusion's code \
and we would welcome that you file an bug report in our issue
tracker",
@@ -521,7 +521,7 @@ mod tests {
let new_arrow_field =
f.field().clone().with_metadata(metadata);
if let Some(qualifier) = f.qualifier() {
- DFField::from_qualified(qualifier, new_arrow_field)
+ DFField::from_qualified(qualifier.clone(), new_arrow_field)
} else {
DFField::from(new_arrow_field)
}
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 6d7ab481b..4e9d5f039 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -496,7 +496,8 @@ fn push_down_scan(
let mut projection: BTreeSet<usize> = used_columns
.iter()
.filter(|c| {
- c.relation.is_none() || c.relation.as_ref().unwrap() ==
&scan.table_name
+ c.relation.is_none()
+ || c.relation.as_ref().unwrap().to_string() == scan.table_name
})
.map(|c| schema.index_of(&c.name))
.filter_map(ArrowResult::ok)
@@ -536,7 +537,9 @@ fn push_down_scan(
// create the projected schema
let projected_fields: Vec<DFField> = projection
.iter()
- .map(|i| DFField::from_qualified(&scan.table_name,
schema.fields()[*i].clone()))
+ .map(|i| {
+ DFField::from_qualified(scan.table_name.clone(),
schema.fields()[*i].clone())
+ })
.collect();
let projected_schema = projected_fields.to_dfschema_ref()?;
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index cfec3cb74..34ea208b3 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -272,16 +272,10 @@ fn optimize_scalar(
// qualify the join columns for outside the subquery
let mut subqry_cols: Vec<_> = subqry_cols
.iter()
- .map(|it| Column {
- relation: Some(subqry_alias.clone()),
- name: it.name.clone(),
- })
+ .map(|it| Column::new(Some(subqry_alias.clone()), it.name.clone()))
.collect();
- let qry_expr = Expr::Column(Column {
- relation: Some(subqry_alias),
- name: "__value".to_string(),
- });
+ let qry_expr = Expr::Column(Column::new(Some(subqry_alias),
"__value".to_string()));
// if correlated subquery's operation is column equality, put the clause
into join on clause.
let mut restore_where_clause = true;
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index c2e96bf97..2b015acd7 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -2427,14 +2427,14 @@ mod tests {
Arc::new(
DFSchema::new_with_metadata(
vec![
- DFField::new(None, "c1", DataType::Utf8, true),
- DFField::new(None, "c2", DataType::Boolean, true),
- DFField::new(None, "c3", DataType::Int64, true),
- DFField::new(None, "c4", DataType::UInt32, true),
- DFField::new(None, "c1_non_null", DataType::Utf8, false),
- DFField::new(None, "c2_non_null", DataType::Boolean,
false),
- DFField::new(None, "c3_non_null", DataType::Int64, false),
- DFField::new(None, "c4_non_null", DataType::UInt32, false),
+ DFField::new_unqualified("c1", DataType::Utf8, true),
+ DFField::new_unqualified("c2", DataType::Boolean, true),
+ DFField::new_unqualified("c3", DataType::Int64, true),
+ DFField::new_unqualified("c4", DataType::UInt32, true),
+ DFField::new_unqualified("c1_non_null", DataType::Utf8,
false),
+ DFField::new_unqualified("c2_non_null", DataType::Boolean,
false),
+ DFField::new_unqualified("c3_non_null", DataType::Int64,
false),
+ DFField::new_unqualified("c4_non_null", DataType::UInt32,
false),
],
HashMap::new(),
)
diff --git a/datafusion/optimizer/src/type_coercion.rs
b/datafusion/optimizer/src/type_coercion.rs
index c4c5c29d3..7cfd6cc8d 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -664,7 +664,7 @@ mod test {
produce_one_row: false,
schema: Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Float64, true)],
+ vec![DFField::new_unqualified("a", DataType::Float64,
true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -682,7 +682,7 @@ mod test {
produce_one_row: false,
schema: Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Float64, true)],
+ vec![DFField::new_unqualified("a", DataType::Float64,
true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -881,7 +881,7 @@ mod test {
produce_one_row: false,
schema: Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Int64, true)],
+ vec![DFField::new_unqualified("a", DataType::Int64, true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -899,7 +899,11 @@ mod test {
produce_one_row: false,
schema: Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Decimal128(12, 4),
true)],
+ vec![DFField::new_unqualified(
+ "a",
+ DataType::Decimal128(12, 4),
+ true,
+ )],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -1082,7 +1086,7 @@ mod test {
produce_one_row: false,
schema: Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", data_type, true)],
+ vec![DFField::new_unqualified("a", data_type, true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -1095,7 +1099,7 @@ mod test {
// gt
let schema = Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Int64, true)],
+ vec![DFField::new_unqualified("a", DataType::Int64, true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -1109,7 +1113,7 @@ mod test {
// eq
let schema = Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Int64, true)],
+ vec![DFField::new_unqualified("a", DataType::Int64, true)],
std::collections::HashMap::new(),
)
.unwrap(),
@@ -1123,7 +1127,7 @@ mod test {
// lt
let schema = Arc::new(
DFSchema::new_with_metadata(
- vec![DFField::new(None, "a", DataType::Int64, true)],
+ vec![DFField::new_unqualified("a", DataType::Int64, true)],
std::collections::HashMap::new(),
)
.unwrap(),
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index a940cf272..4c2a24f05 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -743,14 +743,22 @@ mod tests {
Arc::new(
DFSchema::new_with_metadata(
vec![
- DFField::new(None, "c1", DataType::Int32, false),
- DFField::new(None, "c2", DataType::Int64, false),
- DFField::new(None, "c3", DataType::Decimal128(18, 2),
false),
- DFField::new(None, "c4", DataType::Decimal128(38, 37),
false),
- DFField::new(None, "c5", DataType::Float32, false),
- DFField::new(None, "c6", DataType::UInt32, false),
- DFField::new(None, "ts_nano_none",
timestamp_nano_none_type(), false),
- DFField::new(None, "ts_nano_utf",
timestamp_nano_utc_type(), false),
+ DFField::new_unqualified("c1", DataType::Int32, false),
+ DFField::new_unqualified("c2", DataType::Int64, false),
+ DFField::new_unqualified("c3", DataType::Decimal128(18,
2), false),
+ DFField::new_unqualified("c4", DataType::Decimal128(38,
37), false),
+ DFField::new_unqualified("c5", DataType::Float32, false),
+ DFField::new_unqualified("c6", DataType::UInt32, false),
+ DFField::new_unqualified(
+ "ts_nano_none",
+ timestamp_nano_none_type(),
+ false,
+ ),
+ DFField::new_unqualified(
+ "ts_nano_utf",
+ timestamp_nano_utc_type(),
+ false,
+ ),
],
HashMap::new(),
)
diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs
b/datafusion/physical-expr/src/intervals/cp_solver.rs
index 302a86cdc..66367001c 100644
--- a/datafusion/physical-expr/src/intervals/cp_solver.rs
+++ b/datafusion/physical-expr/src/intervals/cp_solver.rs
@@ -326,7 +326,7 @@ impl ExprIntervalGraph {
// ```
/// This function associates stable node indices with [PhysicalExpr]s so
- /// that we can match Arc<dyn PhysicalExpr> and NodeIndex objects during
+ /// that we can match `Arc<dyn PhysicalExpr>` and NodeIndex objects during
/// membership tests.
pub fn gather_node_indices(
&mut self,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 1b704f3aa..aa416e63b 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -145,10 +145,7 @@ impl From<protobuf::Column> for Column {
fn from(c: protobuf::Column) -> Self {
let protobuf::Column { relation, name } = c;
- Self {
- relation: relation.map(|r| r.relation),
- name,
- }
+ Self::new(relation.map(|r| r.relation), name)
}
}
@@ -190,7 +187,7 @@ impl TryFrom<&protobuf::DfField> for DFField {
let field = df_field.field.as_ref().required("field")?;
Ok(match &df_field.qualifier {
- Some(q) => DFField::from_qualified(&q.relation, field),
+ Some(q) => DFField::from_qualified(q.relation.clone(), field),
None => DFField::from(field),
})
}
@@ -217,21 +214,17 @@ impl TryFrom<protobuf::OwnedTableReference> for
OwnedTableReference {
match table_reference_enum {
TableReferenceEnum::Bare(protobuf::BareTableReference { table })
=> {
- Ok(OwnedTableReference::Bare { table })
+ Ok(OwnedTableReference::bare(table))
}
TableReferenceEnum::Partial(protobuf::PartialTableReference {
schema,
table,
- }) => Ok(OwnedTableReference::Partial { schema, table }),
+ }) => Ok(OwnedTableReference::partial(schema, table)),
TableReferenceEnum::Full(protobuf::FullTableReference {
catalog,
schema,
table,
- }) => Ok(OwnedTableReference::Full {
- catalog,
- schema,
- table,
- }),
+ }) => Ok(OwnedTableReference::full(catalog, schema, table)),
}
}
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index c240ef853..a794c9bd0 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -240,9 +240,9 @@ impl TryFrom<&DataType> for
protobuf::arrow_type::ArrowTypeEnum {
impl From<Column> for protobuf::Column {
fn from(c: Column) -> Self {
Self {
- relation: c
- .relation
- .map(|relation| protobuf::ColumnRelation { relation }),
+ relation: c.relation.map(|relation| protobuf::ColumnRelation {
+ relation: relation.to_string(),
+ }),
name: c.name,
}
}
@@ -1321,12 +1321,14 @@ impl From<OwnedTableReference> for
protobuf::OwnedTableReference {
use protobuf::owned_table_reference::TableReferenceEnum;
let table_reference_enum = match t {
OwnedTableReference::Bare { table } => {
- TableReferenceEnum::Bare(protobuf::BareTableReference { table
})
+ TableReferenceEnum::Bare(protobuf::BareTableReference {
+ table: table.to_string(),
+ })
}
OwnedTableReference::Partial { schema, table } => {
TableReferenceEnum::Partial(protobuf::PartialTableReference {
- schema,
- table,
+ schema: schema.to_string(),
+ table: table.to_string(),
})
}
OwnedTableReference::Full {
@@ -1334,9 +1336,9 @@ impl From<OwnedTableReference> for
protobuf::OwnedTableReference {
schema,
table,
} => TableReferenceEnum::Full(protobuf::FullTableReference {
- catalog,
- schema,
- table,
+ catalog: catalog.to_string(),
+ schema: schema.to_string(),
+ table: table.to_string(),
}),
};
diff --git a/datafusion/sql/src/expr/arrow_cast.rs
b/datafusion/sql/src/expr/arrow_cast.rs
index 49104ee05..9a2ac28c4 100644
--- a/datafusion/sql/src/expr/arrow_cast.rs
+++ b/datafusion/sql/src/expr/arrow_cast.rs
@@ -93,7 +93,7 @@ pub fn create_arrow_cast(mut args: Vec<Expr>, schema:
&DFSchema) -> Result<Expr>
/// assert_eq!(data_type, DataType::Int32);
/// ```
///
-/// Remove if added to arrow: https://github.com/apache/arrow-rs/issues/3821
+/// Remove if added to arrow: <https://github.com/apache/arrow-rs/issues/3821>
pub fn parse_data_type(val: &str) -> Result<DataType> {
Parser::new(val).parse()
}
diff --git a/datafusion/sql/src/expr/identifier.rs
b/datafusion/sql/src/expr/identifier.rs
index a581d5f47..7548fc8bd 100644
--- a/datafusion/sql/src/expr/identifier.rs
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::planner::{
- idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
-};
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::normalize_ident;
use datafusion_common::{
- Column, DFSchema, DataFusionError, OwnedTableReference, Result,
ScalarValue,
+ Column, DFField, DFSchema, DataFusionError, Result, ScalarValue,
TableReference,
};
use datafusion_expr::{Case, Expr, GetIndexedField};
use sqlparser::ast::{Expr as SQLExpr, Ident};
@@ -57,6 +55,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
ids: Vec<Ident>,
schema: &DFSchema,
) -> Result<Expr> {
+ if ids.len() < 2 {
+ return Err(DataFusionError::Internal(format!(
+ "Not a compound identifier: {ids:?}"
+ )));
+ }
+
if ids[0].value.starts_with('@') {
let var_names: Vec<_> =
ids.into_iter().map(normalize_ident).collect();
let ty = self
@@ -69,44 +73,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})?;
Ok(Expr::ScalarVariable(ty, var_names))
} else {
- // only support "schema.table" type identifiers here
- let (name, relation) = match idents_to_table_reference(
- ids,
- self.options.enable_ident_normalization,
- )? {
- OwnedTableReference::Partial { schema, table } => (table,
schema),
- r @ OwnedTableReference::Bare { .. }
- | r @ OwnedTableReference::Full { .. } => {
- return Err(DataFusionError::Plan(format!(
- "Unsupported compound identifier '{r:?}'",
- )));
+ let ids = ids
+ .into_iter()
+ .map(|id| {
+ if self.options.enable_ident_normalization {
+ normalize_ident(id)
+ } else {
+ id.value
+ }
+ })
+ .collect::<Vec<_>>();
+
+ // Currently not supporting more than one nested level
+ // Though ideally once that support is in place, this code should
work with it
+ // TODO: remove when can support multiple nested identifiers
+ if ids.len() > 5 {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported compound identifier: {ids:?}"
+ )));
+ }
+
+ let search_result = search_dfschema(&ids, schema);
+ match search_result {
+ // found matching field with spare identifier(s) for nested
field(s) in structure
+ Some((field, nested_names)) if !nested_names.is_empty() => {
+ // TODO: remove when can support multiple nested
identifiers
+ if nested_names.len() > 1 {
+ return Err(DataFusionError::Internal(format!(
+ "Nested identifiers not yet supported for column
{}",
+ field.qualified_column().quoted_flat_name()
+ )));
+ }
+ let nested_name = nested_names[0].to_string();
+ Ok(Expr::GetIndexedField(GetIndexedField::new(
+ Box::new(Expr::Column(field.qualified_column())),
+ ScalarValue::Utf8(Some(nested_name)),
+ )))
}
- };
-
- // Try and find the reference in schema
- match schema.field_with_qualified_name(&relation, &name) {
- Ok(_) => {
- // found an exact match on a qualified name so this is a
table.column identifier
- Ok(Expr::Column(Column {
- relation: Some(relation),
- name,
- }))
+ // found matching field with no spare identifier(s)
+ Some((field, _nested_names)) => {
+ Ok(Expr::Column(field.qualified_column()))
}
- Err(_) => {
- if let Some(field) =
- schema.fields().iter().find(|f| f.name().eq(&relation))
- {
- // Access to a field of a column which is a structure,
example: SELECT my_struct.key
- Ok(Expr::GetIndexedField(GetIndexedField::new(
- Box::new(Expr::Column(field.qualified_column())),
- ScalarValue::Utf8(Some(name)),
+ // found no matching field, will return a default
+ None => {
+ // return default where use all identifiers to not have a
nested field
+ // this len check is because at 5 identifiers will have to
have a nested field
+ if ids.len() == 5 {
+ Err(DataFusionError::Internal(format!(
+ "Unsupported compound identifier: {ids:?}"
)))
} else {
- // table.column identifier
- Ok(Expr::Column(Column {
- relation: Some(relation),
- name,
- }))
+ let s = &ids[0..ids.len()];
+ // safe unwrap as s can never be empty or exceed the
bounds
+ let (relation, column_name) =
form_identifier(s).unwrap();
+ let relation = relation.map(|r|
r.to_owned_reference());
+ Ok(Expr::Column(Column::new(relation, column_name)))
}
}
}
@@ -160,3 +181,244 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}
}
+
+// (relation, column name)
+fn form_identifier(idents: &[String]) -> Result<(Option<TableReference>,
&String)> {
+ match idents.len() {
+ 1 => Ok((None, &idents[0])),
+ 2 => Ok((
+ Some(TableReference::Bare {
+ table: (&idents[0]).into(),
+ }),
+ &idents[1],
+ )),
+ 3 => Ok((
+ Some(TableReference::Partial {
+ schema: (&idents[0]).into(),
+ table: (&idents[1]).into(),
+ }),
+ &idents[2],
+ )),
+ 4 => Ok((
+ Some(TableReference::Full {
+ catalog: (&idents[0]).into(),
+ schema: (&idents[1]).into(),
+ table: (&idents[2]).into(),
+ }),
+ &idents[3],
+ )),
+ _ => Err(DataFusionError::Internal(format!(
+ "Incorrect number of identifiers: {}",
+ idents.len()
+ ))),
+ }
+}
+
+fn search_dfschema<'ids, 'schema>(
+ ids: &'ids [String],
+ schema: &'schema DFSchema,
+) -> Option<(&'schema DFField, &'ids [String])> {
+ generate_schema_search_terms(ids).find_map(|(qualifier, column,
nested_names)| {
+ let field = schema.field_with_name(qualifier.as_ref(), column).ok();
+ field.map(|f| (f, nested_names))
+ })
+}
+
+// Possibilities we search with, in order from top to bottom for each len:
+//
+// len = 2:
+// 1. (table.column)
+// 2. (column).nested
+//
+// len = 3:
+// 1. (schema.table.column)
+// 2. (table.column).nested
+// 3. (column).nested1.nested2
+//
+// len = 4:
+// 1. (catalog.schema.table.column)
+// 2. (schema.table.column).nested1
+// 3. (table.column).nested1.nested2
+// 4. (column).nested1.nested2.nested3
+//
+// len = 5:
+// 1. (catalog.schema.table.column).nested
+// 2. (schema.table.column).nested1.nested2
+// 3. (table.column).nested1.nested2.nested3
+// 4. (column).nested1.nested2.nested3.nested4
+//
+// len > 5:
+// 1. (catalog.schema.table.column).nested[.nestedN]+
+// 2. (schema.table.column).nested1.nested2[.nestedN]+
+// 3. (table.column).nested1.nested2.nested3[.nestedN]+
+// 4. (column).nested1.nested2.nested3.nested4[.nestedN]+
+fn generate_schema_search_terms(
+ ids: &[String],
+) -> impl Iterator<Item = (Option<TableReference>, &String, &[String])> {
+ // take at most 4 identifiers to form a Column to search with
+ // - 1 for the column name
+ // - 0 to 3 for the TableReference
+ let bound = ids.len().min(4);
+ // search terms from most specific to least specific
+ (0..bound).rev().map(|i| {
+ let nested_names_index = i + 1;
+ let qualifier_and_column = &ids[0..nested_names_index];
+ // safe unwrap as qualifier_and_column can never be empty or exceed
the bounds
+ let (relation, column_name) =
form_identifier(qualifier_and_column).unwrap();
+ (relation, column_name, &ids[nested_names_index..])
+ })
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ // testing according to documentation of generate_schema_search_terms
function
+ // where ensure generated search terms are in correct order with correct
values
+ fn test_generate_schema_search_terms() -> Result<()> {
+ type ExpectedItem = (
+ Option<TableReference<'static>>,
+ &'static str,
+ &'static [&'static str],
+ );
+ fn assert_vec_eq(
+ expected: Vec<ExpectedItem>,
+ actual: Vec<(Option<TableReference>, &String, &[String])>,
+ ) {
+ for (expected, actual) in expected.into_iter().zip(actual) {
+ assert_eq!(expected.0, actual.0, "qualifier");
+ assert_eq!(expected.1, actual.1, "column name");
+ assert_eq!(expected.2, actual.2, "nested names");
+ }
+ }
+
+ let actual = generate_schema_search_terms(&[]).collect::<Vec<_>>();
+ assert!(actual.is_empty());
+
+ let ids = vec!["a".to_string()];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![(None, "a", &[])];
+ assert_vec_eq(expected, actual);
+
+ let ids = vec!["a".to_string(), "b".to_string()];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![
+ (Some(TableReference::bare("a")), "b", &[]),
+ (None, "a", &["b"]),
+ ];
+ assert_vec_eq(expected, actual);
+
+ let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![
+ (Some(TableReference::partial("a", "b")), "c", &[]),
+ (Some(TableReference::bare("a")), "b", &["c"]),
+ (None, "a", &["b", "c"]),
+ ];
+ assert_vec_eq(expected, actual);
+
+ let ids = vec![
+ "a".to_string(),
+ "b".to_string(),
+ "c".to_string(),
+ "d".to_string(),
+ ];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![
+ (Some(TableReference::full("a", "b", "c")), "d", &[]),
+ (Some(TableReference::partial("a", "b")), "c", &["d"]),
+ (Some(TableReference::bare("a")), "b", &["c", "d"]),
+ (None, "a", &["b", "c", "d"]),
+ ];
+ assert_vec_eq(expected, actual);
+
+ let ids = vec![
+ "a".to_string(),
+ "b".to_string(),
+ "c".to_string(),
+ "d".to_string(),
+ "e".to_string(),
+ ];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![
+ (Some(TableReference::full("a", "b", "c")), "d", &["e"]),
+ (Some(TableReference::partial("a", "b")), "c", &["d", "e"]),
+ (Some(TableReference::bare("a")), "b", &["c", "d", "e"]),
+ (None, "a", &["b", "c", "d", "e"]),
+ ];
+ assert_vec_eq(expected, actual);
+
+ let ids = vec![
+ "a".to_string(),
+ "b".to_string(),
+ "c".to_string(),
+ "d".to_string(),
+ "e".to_string(),
+ "f".to_string(),
+ ];
+ let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
+ let expected: Vec<ExpectedItem> = vec![
+ (Some(TableReference::full("a", "b", "c")), "d", &["e", "f"]),
+ (
+ Some(TableReference::partial("a", "b")),
+ "c",
+ &["d", "e", "f"],
+ ),
+ (Some(TableReference::bare("a")), "b", &["c", "d", "e", "f"]),
+ (None, "a", &["b", "c", "d", "e", "f"]),
+ ];
+ assert_vec_eq(expected, actual);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_form_identifier() -> Result<()> {
+ let err = form_identifier(&[]).expect_err("empty identifiers didn't
fail");
+ let expected = "Internal error: Incorrect number of identifiers: 0. \
+ This was likely caused by a bug in DataFusion's code and we would \
+ welcome that you file an bug report in our issue tracker";
+ assert_eq!(err.to_string(), expected);
+
+ let ids = vec!["a".to_string()];
+ let (qualifier, column) = form_identifier(&ids)?;
+ assert_eq!(qualifier, None);
+ assert_eq!(column, "a");
+
+ let ids = vec!["a".to_string(), "b".to_string()];
+ let (qualifier, column) = form_identifier(&ids)?;
+ assert_eq!(qualifier, Some(TableReference::bare("a")));
+ assert_eq!(column, "b");
+
+ let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
+ let (qualifier, column) = form_identifier(&ids)?;
+ assert_eq!(qualifier, Some(TableReference::partial("a", "b")));
+ assert_eq!(column, "c");
+
+ let ids = vec![
+ "a".to_string(),
+ "b".to_string(),
+ "c".to_string(),
+ "d".to_string(),
+ ];
+ let (qualifier, column) = form_identifier(&ids)?;
+ assert_eq!(qualifier, Some(TableReference::full("a", "b", "c")));
+ assert_eq!(column, "d");
+
+ let err = form_identifier(&[
+ "a".to_string(),
+ "b".to_string(),
+ "c".to_string(),
+ "d".to_string(),
+ "e".to_string(),
+ ])
+ .expect_err("too many identifiers didn't fail");
+ let expected = "Internal error: Incorrect number of identifiers: 5. \
+ This was likely caused by a bug in DataFusion's code and we would \
+ welcome that you file an bug report in our issue tracker";
+ assert_eq!(err.to_string(), expected);
+
+ Ok(())
+ }
+}
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index ad05fbcc1..555344166 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -93,7 +93,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.find(|field| match field.qualifier() {
Some(field_q) => {
field.name() == &col.name
- && field_q.ends_with(&format!(".{q}"))
+ &&
field_q.to_string().ends_with(&format!(".{q}"))
}
_ => false,
}) {
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index bf418d0b4..ac3a34cab 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -21,13 +21,14 @@ use std::sync::Arc;
use std::vec;
use arrow_schema::*;
+use datafusion_common::field_not_found;
use sqlparser::ast::ExactNumberInfo;
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
use datafusion_common::config::ConfigOptions;
-use datafusion_common::{field_not_found, DFSchema, DataFusionError, Result};
+use datafusion_common::{unqualified_field_not_found, DFSchema,
DataFusionError, Result};
use datafusion_common::{OwnedTableReference, TableReference};
use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
@@ -201,16 +202,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if
!schema.fields_with_unqualified_name(&col.name).is_empty() {
Ok(())
} else {
- Err(field_not_found(None, col.name.as_str(),
schema))
+ Err(unqualified_field_not_found(col.name.as_str(),
schema))
}
}
}
.map_err(|_: DataFusionError| {
- field_not_found(
- col.relation.as_ref().map(|s| s.to_owned()),
- col.name.as_str(),
- schema,
- )
+ field_not_found(col.relation.clone(), col.name.as_str(),
schema)
}),
_ => Err(DataFusionError::Internal("Not a
column".to_string())),
})
@@ -377,22 +374,18 @@ pub(crate) fn idents_to_table_reference(
match taker.0.len() {
1 => {
let table = taker.take(enable_normalization);
- Ok(OwnedTableReference::Bare { table })
+ Ok(OwnedTableReference::bare(table))
}
2 => {
let table = taker.take(enable_normalization);
let schema = taker.take(enable_normalization);
- Ok(OwnedTableReference::Partial { schema, table })
+ Ok(OwnedTableReference::partial(schema, table))
}
3 => {
let table = taker.take(enable_normalization);
let schema = taker.take(enable_normalization);
let catalog = taker.take(enable_normalization);
- Ok(OwnedTableReference::Full {
- catalog,
- schema,
- table,
- })
+ Ok(OwnedTableReference::full(catalog, schema, table))
}
_ => Err(DataFusionError::Plan(format!(
"Unsupported compound identifier '{:?}'",
diff --git a/datafusion/sql/src/relation/mod.rs
b/datafusion/sql/src/relation/mod.rs
index 12f5bd6b4..19a278666 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -35,10 +35,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let table_name = table_ref.to_string();
let cte = planner_context.ctes.get(&table_name);
(
- match (
- cte,
-
self.schema_provider.get_table_provider((&table_ref).into()),
- ) {
+ match (cte,
self.schema_provider.get_table_provider(table_ref)) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => {
LogicalPlanBuilder::scan(&table_name, provider,
None)?.build()
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index fa0eed1b9..56d0700e0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -413,9 +413,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let DescribeTableStmt { table_name } = statement;
let table_ref = self.object_name_to_table_reference(table_name)?;
- let table_source = self
- .schema_provider
- .get_table_provider((&table_ref).into())?;
+ let table_source = self.schema_provider.get_table_provider(table_ref)?;
let schema = table_source.schema();
@@ -463,7 +461,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let schema = self.build_schema(columns)?;
// External tables do not support schemas at the moment, so the name
is just a table name
- let name = OwnedTableReference::Bare { table: name };
+ let name = OwnedTableReference::bare(name);
Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
schema: schema.to_dfschema_ref()?,
@@ -634,9 +632,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Do a table lookup to verify the table exists
let table_ref =
self.object_name_to_table_reference(table_name.clone())?;
- let provider = self
- .schema_provider
- .get_table_provider((&table_ref).into())?;
+ let provider =
self.schema_provider.get_table_provider(table_ref.clone())?;
let schema = (*provider.schema()).clone();
let schema = DFSchema::try_from(schema)?;
let scan =
@@ -688,7 +684,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
- .get_table_provider((&table_name).into())?;
+ .get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_schema = Arc::new(DFSchema::try_from(arrow_schema)?);
let values = table_schema.fields().iter().map(|f| {
@@ -790,7 +786,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
- .get_table_provider((&table_name).into())?;
+ .get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;
@@ -896,9 +892,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
- let _ = self
- .schema_provider
- .get_table_provider((&table_ref).into())?;
+ let _ = self.schema_provider.get_table_provider(table_ref)?;
// treat both FULL and EXTENDED as the same
let select_list = if full || extended {
@@ -934,9 +928,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
- let _ = self
- .schema_provider
- .get_table_provider((&table_ref).into())?;
+ let _ = self.schema_provider.get_table_provider(table_ref)?;
let query = format!(
"SELECT table_catalog, table_schema, table_name, definition FROM
information_schema.views WHERE {where_clause}"
diff --git a/datafusion/sql/tests/integration_test.rs
b/datafusion/sql/tests/integration_test.rs
index 124ab0c69..4d5d56fdf 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -226,11 +226,11 @@ Dml: op=[Insert] table=[test_decimal]
#[rstest]
#[case::duplicate_columns(
"INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
- "Schema error: Schema contains duplicate unqualified field name 'price'"
+ "Schema error: Schema contains duplicate unqualified field name \"price\""
)]
#[case::non_existing_column(
"INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
- "Schema error: No field named 'nonexistent'. Valid fields are 'id',
'price'."
+ "Schema error: No field named \"nonexistent\". Valid fields are \"id\",
\"price\"."
)]
#[case::type_mismatch(
"INSERT INTO test_decimal SELECT '2022-01-01',
to_timestamp('2022-01-01T12:00:00')",
@@ -515,7 +515,7 @@ fn select_with_ambiguous_column() {
let sql = "SELECT id FROM person a, person b";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
- "SchemaError(AmbiguousReference { qualifier: None, name: \"id\" })",
+ "SchemaError(AmbiguousReference { field: Column { relation: None,
name: \"id\" } })",
format!("{err:?}")
);
}
@@ -538,7 +538,7 @@ fn where_selection_with_ambiguous_column() {
let sql = "SELECT * FROM person a, person b WHERE id = id + 1";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
- "SchemaError(AmbiguousReference { qualifier: None, name: \"id\" })",
+ "SchemaError(AmbiguousReference { field: Column { relation: None,
name: \"id\" } })",
format!("{err:?}")
);
}
@@ -1121,9 +1121,9 @@ fn
select_simple_aggregate_with_groupby_column_unselected() {
fn
select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist() {
let sql = "SELECT SUM(age) FROM person GROUP BY doesnotexist";
let err = logical_plan(sql).expect_err("query should have failed");
- assert_eq!("Schema error: No field named 'doesnotexist'. Valid fields are
'SUM(person.age)', \
- 'person'.'id', 'person'.'first_name', 'person'.'last_name',
'person'.'age', 'person'.'state', \
- 'person'.'salary', 'person'.'birth_date', 'person'.'😀'.",
format!("{err}"));
+ assert_eq!("Schema error: No field named \"doesnotexist\". Valid fields
are \"SUM(person.age)\", \
+ \"person\".\"id\", \"person\".\"first_name\",
\"person\".\"last_name\", \"person\".\"age\", \"person\".\"state\", \
+ \"person\".\"salary\", \"person\".\"birth_date\", \"person\".\"😀\".",
format!("{err}"));
}
#[test]
@@ -1432,6 +1432,17 @@ fn select_where_with_positive_operator() {
quick_test(sql, expected);
}
+#[test]
+fn select_where_compound_identifiers() {
+ let sql = "SELECT aggregate_test_100.c3 \
+ FROM public.aggregate_test_100 \
+ WHERE aggregate_test_100.c3 > 0.1";
+ let expected = "Projection: public.aggregate_test_100.c3\
+ \n Filter: public.aggregate_test_100.c3 > Float64(0.1)\
+ \n TableScan: public.aggregate_test_100";
+ quick_test(sql, expected);
+}
+
#[test]
fn select_order_by_index() {
let sql = "SELECT id FROM person ORDER BY 1";
@@ -2964,7 +2975,7 @@ fn order_by_unaliased_name() {
#[test]
fn order_by_ambiguous_name() {
let sql = "select * from person a join person b using (id) order by age";
- let expected = "Schema error: Ambiguous reference to unqualified field
'age'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"age\"";
let err = logical_plan(sql).unwrap_err();
assert_eq!(err.to_string(), expected);
@@ -2973,7 +2984,7 @@ fn order_by_ambiguous_name() {
#[test]
fn group_by_ambiguous_name() {
let sql = "select max(id) from person a join person b using (id) group by
age";
- let expected = "Schema error: Ambiguous reference to unqualified field
'age'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"age\"";
let err = logical_plan(sql).unwrap_err();
assert_eq!(err.to_string(), expected);
@@ -3278,7 +3289,7 @@ fn test_ambiguous_column_references_in_on_join() {
INNER JOIN person as p2
ON id = 1";
- let expected = "Schema error: Ambiguous reference to unqualified field
'id'";
+ let expected = "Schema error: Ambiguous reference to unqualified field
\"id\"";
// It should return error.
let result = logical_plan(sql);
@@ -3875,7 +3886,7 @@ fn assert_field_not_found(err: DataFusionError, name:
&str) {
match err {
DataFusionError::SchemaError { .. } => {
let msg = format!("{err}");
- let expected = format!("Schema error: No field named '{name}'.");
+ let expected = format!("Schema error: No field named \"{name}\".");
if !msg.starts_with(&expected) {
panic!("error [{msg}] did not start with [{expected}]");
}
diff --git a/docs/source/user-guide/example-usage.md
b/docs/source/user-guide/example-usage.md
index c497c66e5..a2cd109a6 100644
--- a/docs/source/user-guide/example-usage.md
+++ b/docs/source/user-guide/example-usage.md
@@ -118,9 +118,12 @@ async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/capitalized_example.csv",
CsvReadOptions::new()).await?;
- let df = df.filter(col("A").lt_eq(col("c")))?
- .aggregate(vec![col("A")], vec![min(col("b"))])?
- .limit(0, Some(100))?;
+ let df = df
+ // col will parse the input string, hence requiring double quotes to
maintain the capitalization
+ .filter(col("\"A\"").lt_eq(col("c")))?
+ // alternatively use ident to pass in an unqualified column name
directly without parsing
+ .aggregate(vec![ident("A")], vec![min(col("b"))])?
+ .limit(0, Some(100))?;
// execute and print results
df.show().await?;