alamb commented on code in PR #5343: URL: https://github.com/apache/arrow-datafusion/pull/5343#discussion_r1130008949
########## datafusion/common/src/column.rs: ########## @@ -53,26 +70,36 @@ impl Column { /// Deserialize a fully qualified name string into a column pub fn from_qualified_name(flat_name: impl Into<String>) -> Self { let flat_name = flat_name.into(); - use sqlparser::tokenizer::Token; - - let dialect = sqlparser::dialect::GenericDialect {}; Review Comment: this is one of the key changes in my mind -- use the full standard identifier normalization rules rather than some custom sqlparser based semantics ########## datafusion/core/src/execution/context.rs: ########## @@ -329,19 +329,19 @@ impl SessionContext { or_replace, }) => { let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone()); - let table = self.table(&name).await; + let table = self.table(name.clone()).await; Review Comment: I think we can avoid these clones. Here is one way (this is not required as I don't think these codepaths are particularly performance critical) This compiled locally for me: ```diff (arrow_dev) alamb@MacBook-Pro-8 arrow-datafusion2 % git diff | cat git diff | cat diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 7fb3dfcc7..a710dfc03 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -189,8 +189,9 @@ impl<'a> TableReference<'a> { } } - /// Converts directly into an [`OwnedTableReference`] - pub fn to_owned_reference(self) -> OwnedTableReference { + /// Converts directly into an [`OwnedTableReference`] without + /// copying the underlying Strings + pub fn to_owned_reference(&self) -> OwnedTableReference { match self { Self::Full { catalog, @@ -260,6 +261,12 @@ impl<'a> TableReference<'a> { } } +impl <'a> From<&'a OwnedTableReference> for TableReference<'a> { + fn from(value: &'a OwnedTableReference) -> Self { + value.to_owned_reference() + } +} + /// Parse a `String` into a OwnedTableReference impl From<String> for OwnedTableReference { fn from(s: String) -> Self { diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f72ea7560..8807721f7 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -329,19 +329,19 @@ impl SessionContext { or_replace, }) => { let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone()); - let table = self.table(name.clone()).await; + let table = self.table(&name).await; match (if_not_exists, or_replace, table) { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { - self.deregister_table(name.clone())?; + self.deregister_table(&name)?; let schema = Arc::new(input.schema().as_ref().into()); let physical = DataFrame::new(self.state(), input); let batches: Vec<_> = physical.collect_partitioned().await?; let table = Arc::new(MemTable::try_new(schema, batches)?); - self.register_table(name.clone(), table)?; + self.register_table(&name, table)?; self.return_empty_dataframe() } (true, true, Ok(_)) => Err(DataFusionError::Execution( @@ -369,15 +369,15 @@ impl SessionContext { or_replace, definition, }) => { - let view = self.table(name.clone()).await; + let view = self.table(&name).await; match (or_replace, view) { (true, Ok(_)) => { - self.deregister_table(name.clone())?; + self.deregister_table(&name)?; let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?); - self.register_table(name.clone(), table)?; + self.register_table(&name, table)?; self.return_empty_dataframe() } (_, Err(_)) => { @@ -397,7 +397,7 @@ impl SessionContext { name, if_exists, .. }) => { let result = self - .find_and_deregister(name.clone(), TableType::Base) + .find_and_deregister(&name, TableType::Base) .await; match (result, if_exists) { (Ok(true), _) => self.return_empty_dataframe(), @@ -412,7 +412,7 @@ impl SessionContext { name, if_exists, .. }) => { let result = self - .find_and_deregister(name.clone(), TableType::View) + .find_and_deregister(&name, TableType::View) .await; match (result, if_exists) { (Ok(true), _) => self.return_empty_dataframe(), @@ -571,7 +571,7 @@ impl SessionContext { &self, cmd: &CreateExternalTable, ) -> Result<DataFrame> { - let exist = self.table_exist(cmd.name.clone())?; + let exist = self.table_exist(cmd.&name)?; if exist { match cmd.if_not_exists { true => return self.return_empty_dataframe(), @@ -586,7 +586,7 @@ impl SessionContext { let table_provider: Arc<dyn TableProvider> = self.create_custom_table(cmd).await?; - self.register_table(cmd.name.clone(), table_provider)?; + self.register_table(cmd.&name, table_provider)?; self.return_empty_dataframe() } (arrow_dev) alamb@MacBook-Pro-8 arrow-datafusion2 % ``` ########## datafusion/common/src/table_reference.rs: ########## @@ -67,53 +61,53 @@ pub enum TableReference<'a> { }, } -/// Represents a path to a table that may require further resolution -/// that owns the underlying names -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum OwnedTableReference { - /// An unqualified table reference, e.g. "table" - Bare { - /// The table name - table: String, - }, - /// A partially resolved table reference, e.g. "schema.table" - Partial { - /// The schema containing the table - schema: String, - /// The table name - table: String, - }, - /// A fully resolved table reference, e.g. "catalog.schema.table" - Full { - /// The catalog (aka database) containing the table - catalog: String, - /// The schema containing the table - schema: String, - /// The table name - table: String, - }, -} +pub type OwnedTableReference = TableReference<'static>; Review Comment: 👍 ########## datafusion/expr/src/expr_fn.rs: ########## @@ -39,6 +39,12 @@ pub fn col(ident: impl Into<Column>) -> Expr { Expr::Column(ident.into()) } +/// Create an unqualified column expression from the provided name, without normalizing +/// the column Review Comment: ```suggestion /// the column /// /// For example `col("A")` refers to a column named 'a' (normalized via SQL rules) /// but `ident("A")` refers to a column named 'A' (not normalized) ``` ########## datafusion/expr/src/expr_fn.rs: ########## @@ -39,6 +39,12 @@ pub fn col(ident: impl Into<Column>) -> Expr { Expr::Column(ident.into()) } +/// Create an unqualified column expression from the provided name, without normalizing Review Comment: I think we should also update the docstring of `col` to note that the identifier is normalized according to SQL rules (and in particular, capital letters are reduced to lowercase) Perhaps some examples would help too: As in a doc comment showing: col("A") == col("a") col(r#""A""#) != col("a") I can help write this if you would like ########## datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs: ########## @@ -56,9 +56,9 @@ pub async fn insert(ctx: &SessionContext, insert_stmt: SQLStatement) -> Result<D } // Second, get batches in table and destroy the old table - let mut origin_batches = ctx.table(&table_reference).await?.collect().await?; - let schema = ctx.table_provider(&table_reference).await?.schema(); - ctx.deregister_table(&table_reference)?; + let mut origin_batches = ctx.table(table_reference.clone()).await?.collect().await?; Review Comment: ```suggestion let mut origin_batches = ctx.table(&table_reference).await?.collect().await?; ``` If you make the changes to `owned_table_reference` I suggested above ########## datafusion/sql/src/expr/identifier.rs: ########## @@ -160,3 +180,243 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))) } } + +// (relation, column name) +fn form_identifier(idents: &[String]) -> Result<(Option<TableReference>, &String)> { + match idents.len() { + 1 => Ok((None, &idents[0])), + 2 => Ok(( + Some(TableReference::Bare { + table: (&idents[0]).into(), + }), + &idents[1], + )), + 3 => Ok(( + Some(TableReference::Partial { + schema: (&idents[0]).into(), + table: (&idents[1]).into(), + }), + &idents[2], + )), + 4 => Ok(( + Some(TableReference::Full { + catalog: (&idents[0]).into(), + schema: (&idents[1]).into(), + table: (&idents[2]).into(), + }), + &idents[3], + )), + _ => Err(DataFusionError::Internal(format!( + "Incorrect number of identifiers: {}", + idents.len() + ))), + } +} + +fn search_dfschema<'ids, 'schema>( + ids: &'ids [String], + schema: &'schema DFSchema, +) -> Option<(&'schema DFField, &'ids [String])> { + generate_schema_search_terms(ids).find_map(|(qualifier, column, nested_names)| { + let field = schema.field_with_name(qualifier.as_ref(), column).ok(); + field.map(|f| (f, nested_names)) + }) +} + +// Possibilities we search with, in order from top to bottom for each len: +// +// len = 2: +// 1. (table.column) +// 2. (column).nested +// +// len = 3: +// 1. (schema.table.column) +// 2. (table.column).nested +// 3. (column).nested1.nested2 +// +// len = 4: +// 1. (catalog.schema.table.column) +// 2. (schema.table.column).nested1 +// 3. (table.column).nested1.nested2 +// 4. (column).nested1.nested2.nested3 +// +// len = 5: +// 1. (catalog.schema.table.column).nested +// 2. (schema.table.column).nested1.nested2 +// 3. (table.column).nested1.nested2.nested3 +// 4. (column).nested1.nested2.nested3.nested4 +// +// len > 5: +// 1. (catalog.schema.table.column).nested[.nestedN]+ +// 2. (schema.table.column).nested1.nested2[.nestedN]+ +// 3. (table.column).nested1.nested2.nested3[.nestedN]+ +// 4. (column).nested1.nested2.nested3.nested4[.nestedN]+ +fn generate_schema_search_terms( + ids: &[String], +) -> impl Iterator<Item = (Option<TableReference>, &String, &[String])> { + // take at most 4 identifiers to form a Column to search with + // - 1 for the column name + // - 0 to 3 for the TableReference + let bound = ids.len().min(4); + // search terms from most specific to least specific + (0..bound).rev().map(|i| { + let nested_names_index = i + 1; + let qualifier_and_column = &ids[0..nested_names_index]; + let (relation, column_name) = form_identifier(qualifier_and_column).unwrap(); + (relation, column_name, &ids[nested_names_index..]) + }) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + // testing according to documentation of generate_schema_search_terms function + // where ensure generated search terms are in correct order with correct values + fn test_generate_schema_search_terms() -> Result<()> { + type ExpectedItem = ( + Option<TableReference<'static>>, + &'static str, + &'static [&'static str], + ); + fn assert_vec_eq( + expected: Vec<ExpectedItem>, + actual: Vec<(Option<TableReference>, &String, &[String])>, + ) { + for (expected, actual) in expected.into_iter().zip(actual) { + assert_eq!(expected.0, actual.0, "qualifier"); + assert_eq!(expected.1, actual.1, "column name"); + assert_eq!(expected.2, actual.2, "nested names"); + } + } + + let actual = generate_schema_search_terms(&[]).collect::<Vec<_>>(); + assert!(actual.is_empty()); + + let ids = vec!["a".to_string()]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![(None, "a", &[])]; + assert_vec_eq(expected, actual); + + let ids = vec!["a".to_string(), "b".to_string()]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![ + (Some(TableReference::bare("a")), "b", &[]), + (None, "a", &["b"]), + ]; + assert_vec_eq(expected, actual); + + let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![ + (Some(TableReference::partial("a", "b")), "c", &[]), + (Some(TableReference::bare("a")), "b", &["c"]), + (None, "a", &["b", "c"]), + ]; + assert_vec_eq(expected, actual); + + let ids = vec![ + "a".to_string(), + "b".to_string(), + "c".to_string(), + "d".to_string(), + ]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![ + (Some(TableReference::full("a", "b", "c")), "d", &[]), + (Some(TableReference::partial("a", "b")), "c", &["d"]), + (Some(TableReference::bare("a")), "b", &["c", "d"]), + (None, "a", &["b", "c", "d"]), + ]; + assert_vec_eq(expected, actual); + + let ids = vec![ + "a".to_string(), + "b".to_string(), + "c".to_string(), + "d".to_string(), + "e".to_string(), + ]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![ + (Some(TableReference::full("a", "b", "c")), "d", &["e"]), + (Some(TableReference::partial("a", "b")), "c", &["d", "e"]), + (Some(TableReference::bare("a")), "b", &["c", "d", "e"]), + (None, "a", &["b", "c", "d", "e"]), + ]; + assert_vec_eq(expected, actual); + + let ids = vec![ + "a".to_string(), + "b".to_string(), + "c".to_string(), + "d".to_string(), + "e".to_string(), + "f".to_string(), + ]; + let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>(); + let expected: Vec<ExpectedItem> = vec![ + (Some(TableReference::full("a", "b", "c")), "d", &["e", "f"]), + ( + Some(TableReference::partial("a", "b")), + "c", + &["d", "e", "f"], + ), + (Some(TableReference::bare("a")), "b", &["c", "d", "e", "f"]), + (None, "a", &["b", "c", "d", "e", "f"]), + ]; + assert_vec_eq(expected, actual); + + Ok(()) + } + + #[test] + fn test_form_identifier() -> Result<()> { + let err = form_identifier(&[]).expect_err("empty identifiers didn't fail"); + let expected = "Internal error: Incorrect number of identifiers: 0. \ Review Comment: If possible, I think errors that are due to user supplied input (like sql queries or inputs to col()) should return something other than `Internal` -- perhaps `DataFusion::Plan`? I think Internal errors are supposed to signal a bug in DataFusion ########## datafusion/common/src/dfschema.rs: ########## @@ -677,7 +687,7 @@ mod tests { // lookup with unqualified name "t1.c0" let err = schema.index_of_column(&col).err().unwrap(); assert_eq!( - "Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 't1'.'c1'.", + r#"Schema error: No field named "t1.c0". Valid fields are "t1"."c0", "t1"."c1"."#, Review Comment: I agree we can improve it as a follow on PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org