This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4afd67a0e Use TableReference for TableScan (#5615)
4afd67a0e is described below

commit 4afd67a0e496e1834ad6184629f28e60f66b2777
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 16 14:09:04 2023 +0100

    Use TableReference for TableScan (#5615)
    
    * Use TableReference for TableScan
    
    * revert doc changes
    
    * Use TableReference::none
    
    * Apply suggestions from code review
    
    Co-authored-by: Jeffrey <[email protected]>
    
    * Add comment
    
    * Restore table check
    
    ---------
    
    Co-authored-by: Jeffrey <[email protected]>
---
 datafusion/common/src/dfschema.rs                 |  16 ++--
 datafusion/common/src/table_reference.rs          |   5 ++
 datafusion/core/src/execution/context.rs          |   7 +-
 datafusion/core/src/physical_plan/planner.rs      |   5 +-
 datafusion/core/src/test_util/mod.rs              |  16 ++--
 datafusion/expr/src/logical_plan/builder.rs       | 105 +++++++++++++++++-----
 datafusion/expr/src/logical_plan/plan.rs          |  15 ++--
 datafusion/expr/src/utils.rs                      |  12 +--
 datafusion/optimizer/src/inline_table_scan.rs     |  11 ++-
 datafusion/optimizer/src/push_down_filter.rs      |   6 +-
 datafusion/optimizer/src/push_down_projection.rs  |   3 +-
 datafusion/proto/proto/datafusion.proto           |   9 +-
 datafusion/proto/src/generated/pbjson.rs          |  30 +++----
 datafusion/proto/src/generated/prost.rs           |  12 +--
 datafusion/proto/src/logical_plan/mod.rs          |  23 +++--
 datafusion/sql/src/relation/mod.rs                |   7 +-
 datafusion/substrait/src/logical_plan/producer.rs |   2 +-
 17 files changed, 187 insertions(+), 97 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index 52a0b3d40..67a367c5e 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -108,13 +108,17 @@ impl DFSchema {
         Ok(Self { fields, metadata })
     }
 
-    /// Create a `DFSchema` from an Arrow schema
-    pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> 
Result<Self> {
+    /// Create a `DFSchema` from an Arrow schema and a given qualifier
+    pub fn try_from_qualified_schema<'a>(
+        qualifier: impl Into<TableReference<'a>>,
+        schema: &Schema,
+    ) -> Result<Self> {
+        let qualifier = qualifier.into();
         Self::new_with_metadata(
             schema
                 .fields()
                 .iter()
-                .map(|f| DFField::from_qualified(qualifier.to_string(), 
f.clone()))
+                .map(|f| DFField::from_qualified(qualifier.clone(), f.clone()))
                 .collect(),
             schema.metadata().clone(),
         )
@@ -662,12 +666,12 @@ impl DFField {
     }
 
     /// Create a qualified field from an existing Arrow field
-    pub fn from_qualified(
-        qualifier: impl Into<OwnedTableReference>,
+    pub fn from_qualified<'a>(
+        qualifier: impl Into<TableReference<'a>>,
         field: Field,
     ) -> Self {
         Self {
-            qualifier: Some(qualifier.into()),
+            qualifier: Some(qualifier.into().to_owned_reference()),
             field,
         }
     }
diff --git a/datafusion/common/src/table_reference.rs 
b/datafusion/common/src/table_reference.rs
index 257073681..aad66473a 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -80,6 +80,11 @@ impl std::fmt::Display for TableReference<'_> {
 }
 
 impl<'a> TableReference<'a> {
+    /// Convenience method for creating a typed none `None`
+    pub fn none() -> Option<TableReference<'a>> {
+        None
+    }
+
     /// Convenience method for creating a `Bare` variant of `TableReference`
     pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
         TableReference::Bare {
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 1466bcba0..1cb199c2b 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1020,10 +1020,9 @@ impl SessionContext {
         table_ref: impl Into<TableReference<'a>>,
     ) -> Result<DataFrame> {
         let table_ref = table_ref.into();
-        let table = table_ref.table().to_owned();
-        let provider = self.table_provider(table_ref).await?;
+        let provider = 
self.table_provider(table_ref.to_owned_reference()).await?;
         let plan = LogicalPlanBuilder::scan(
-            &table,
+            table_ref.to_owned_reference(),
             provider_as_source(Arc::clone(&provider)),
             None,
         )?
@@ -1037,7 +1036,7 @@ impl SessionContext {
         table_ref: impl Into<TableReference<'a>>,
     ) -> Result<Arc<dyn TableProvider>> {
         let table_ref = table_ref.into();
-        let table = table_ref.table().to_owned();
+        let table = table_ref.table().to_string();
         let schema = self.state.read().schema_for_ref(table_ref)?;
         match schema.table(&table).await {
             Some(ref provider) => Ok(Arc::clone(provider)),
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 0dba182d8..9cc907b28 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1876,7 +1876,7 @@ mod tests {
     use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
     use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
     use arrow::record_batch::RecordBatch;
-    use datafusion_common::assert_contains;
+    use datafusion_common::{assert_contains, TableReference};
     use datafusion_common::{DFField, DFSchema, DFSchemaRef};
     use datafusion_expr::{
         col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder,
@@ -2518,7 +2518,8 @@ Internal error: Optimizer rule 'type_coercion' failed due 
to unexpected error: E
             match ctx.read_csv(path, options).await?.into_optimized_plan()? {
                 LogicalPlan::TableScan(ref scan) => {
                     let mut scan = scan.clone();
-                    scan.table_name = name.to_string();
+                    let table_reference = 
TableReference::from(name).to_owned_reference();
+                    scan.table_name = table_reference;
                     let new_schema = scan
                         .projected_schema
                         .as_ref()
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index 61cdc188f..8c570cff8 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -39,7 +39,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_common::{DataFusionError, Statistics};
+use datafusion_common::{DataFusionError, Statistics, TableReference};
 use datafusion_expr::{CreateExternalTable, Expr, TableType};
 use datafusion_physical_expr::PhysicalSortExpr;
 use futures::Stream;
@@ -219,11 +219,8 @@ pub fn scan_empty(
 ) -> Result<LogicalPlanBuilder, DataFusionError> {
     let table_schema = Arc::new(table_schema.clone());
     let provider = Arc::new(EmptyTable::new(table_schema));
-    LogicalPlanBuilder::scan(
-        name.unwrap_or(UNNAMED_TABLE),
-        provider_as_source(provider),
-        projection,
-    )
+    let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
+    LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
 }
 
 /// Scan an empty data source with configured partition, mainly used in tests.
@@ -235,11 +232,8 @@ pub fn scan_empty_with_partitions(
 ) -> Result<LogicalPlanBuilder, DataFusionError> {
     let table_schema = Arc::new(table_schema.clone());
     let provider = 
Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
-    LogicalPlanBuilder::scan(
-        name.unwrap_or(UNNAMED_TABLE),
-        provider_as_source(provider),
-        projection,
-    )
+    let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
+    LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
 }
 
 /// Get the schema for the aggregate_test_* csv files
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 5616b7c52..c5e623785 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -41,7 +41,7 @@ use crate::{
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion_common::{
     Column, DFField, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference, Result,
-    ScalarValue, ToDFSchema,
+    ScalarValue, TableReference, ToDFSchema,
 };
 use std::any::Any;
 use std::cmp::Ordering;
@@ -192,8 +192,39 @@ impl LogicalPlanBuilder {
     }
 
     /// Convert a table provider into a builder with a TableScan
+    ///
+    /// Note that if you pass a string as `table_name`, it is treated
+    /// as a SQL identifier, as described on [`TableReference`] and
+    /// thus is normalized
+    ///
+    /// # Example:
+    /// ```
+    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
+    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
+    /// # };
+    /// # use std::sync::Arc;
+    /// # use arrow::datatypes::{Schema, DataType, Field};
+    /// # use datafusion_common::TableReference;
+    /// #
+    /// # let employee_schema = Arc::new(Schema::new(vec![
+    /// #           Field::new("id", DataType::Int32, false),
+    /// # ])) as _;
+    /// # let table_source = 
Arc::new(LogicalTableSource::new(employee_schema));
+    /// // Scan table_source with the name "mytable" (after normalization)
+    /// # let table = table_source.clone();
+    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
+    ///
+    /// // Scan table_source with the name "MyTable" by enclosing in quotes
+    /// # let table = table_source.clone();
+    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
+    ///
+    /// // Scan table_source with the name "MyTable" by forming the table 
reference
+    /// # let table = table_source.clone();
+    /// let table_reference = TableReference::bare("MyTable");
+    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
+    /// ```
     pub fn scan(
-        table_name: impl Into<String>,
+        table_name: impl Into<OwnedTableReference>,
         table_source: Arc<dyn TableSource>,
         projection: Option<Vec<usize>>,
     ) -> Result<Self> {
@@ -217,14 +248,14 @@ impl LogicalPlanBuilder {
 
     /// Convert a table provider into a builder with a TableScan
     pub fn scan_with_filters(
-        table_name: impl Into<String>,
+        table_name: impl Into<OwnedTableReference>,
         table_source: Arc<dyn TableSource>,
         projection: Option<Vec<usize>>,
         filters: Vec<Expr>,
     ) -> Result<Self> {
         let table_name = table_name.into();
 
-        if table_name.is_empty() {
+        if table_name.table().is_empty() {
             return Err(DataFusionError::Plan(
                 "table_name cannot be empty".to_string(),
             ));
@@ -239,7 +270,7 @@ impl LogicalPlanBuilder {
                     p.iter()
                         .map(|i| {
                             DFField::from_qualified(
-                                table_name.to_string(),
+                                table_name.clone(),
                                 schema.field(*i).clone(),
                             )
                         })
@@ -248,7 +279,7 @@ impl LogicalPlanBuilder {
                 )
             })
             .unwrap_or_else(|| {
-                DFSchema::try_from_qualified_schema(&table_name, &schema)
+                DFSchema::try_from_qualified_schema(table_name.clone(), 
&schema)
             })?;
 
         let table_scan = LogicalPlan::TableScan(TableScan {
@@ -1196,14 +1227,22 @@ pub fn subquery_alias(
 
 /// Create a LogicalPlanBuilder representing a scan of a table with the 
provided name and schema.
 /// This is mostly used for testing and documentation.
-pub fn table_scan(
-    name: Option<&str>,
+pub fn table_scan<'a>(
+    name: Option<impl Into<TableReference<'a>>>,
     table_schema: &Schema,
     projection: Option<Vec<usize>>,
 ) -> Result<LogicalPlanBuilder> {
+    let table_source = table_source(table_schema);
+    let name = name
+        .map(|n| n.into())
+        .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
+        .to_owned_reference();
+    LogicalPlanBuilder::scan(name, table_source, projection)
+}
+
+fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
     let table_schema = Arc::new(table_schema.clone());
-    let table_source = Arc::new(LogicalTableSource { table_schema });
-    LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, 
projection)
+    Arc::new(LogicalTableSource { table_schema })
 }
 
 /// Wrap projection for a plan, if the join keys contains normal expression.
@@ -1361,12 +1400,36 @@ mod tests {
     #[test]
     fn plan_builder_schema() {
         let schema = employee_schema();
-        let plan = table_scan(Some("employee_csv"), &schema, None).unwrap();
+        let projection = None;
+        let plan =
+            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), 
projection)
+                .unwrap();
+        let expected = DFSchema::try_from_qualified_schema(
+            TableReference::bare("employee_csv"),
+            &schema,
+        )
+        .unwrap();
+        assert_eq!(&expected, plan.schema().as_ref());
 
-        let expected =
-            DFSchema::try_from_qualified_schema("employee_csv", 
&schema).unwrap();
+        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifer
+        // (and thus normalized to "employee"csv") as well
+        let projection = None;
+        let plan =
+            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), 
projection)
+                .unwrap();
+        assert_eq!(&expected, plan.schema().as_ref());
+    }
 
-        assert_eq!(&expected, plan.schema().as_ref())
+    #[test]
+    fn plan_builder_empty_name() {
+        let schema = employee_schema();
+        let projection = None;
+        let err =
+            LogicalPlanBuilder::scan("", table_source(&schema), 
projection).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Error during planning: table_name cannot be empty"
+        );
     }
 
     #[test]
@@ -1481,9 +1544,10 @@ mod tests {
 
     #[test]
     fn plan_builder_union_different_num_columns_error() -> Result<()> {
-        let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;
-
-        let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
+        let plan1 =
+            table_scan(TableReference::none(), &employee_schema(), 
Some(vec![3]))?;
+        let plan2 =
+            table_scan(TableReference::none(), &employee_schema(), 
Some(vec![3, 4]))?;
 
         let expected = "Error during planning: Union queries must have the 
same number of columns, (left is 1, right is 2)";
         let err_msg1 = 
plan1.clone().union(plan2.clone().build()?).unwrap_err();
@@ -1707,9 +1771,10 @@ mod tests {
 
     #[test]
     fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
-        let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;
-
-        let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
+        let plan1 =
+            table_scan(TableReference::none(), &employee_schema(), 
Some(vec![3]))?;
+        let plan2 =
+            table_scan(TableReference::none(), &employee_schema(), 
Some(vec![3, 4]))?;
 
         let expected = "Error during planning: INTERSECT/EXCEPT query must 
have the same number of columns. \
          Left is 1 and right is 2.";
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 777a5871c..a6ba1c961 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -34,7 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
     plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference,
-    ScalarValue,
+    ScalarValue, TableReference,
 };
 use std::collections::{HashMap, HashSet};
 use std::fmt::{self, Debug, Display, Formatter};
@@ -1437,9 +1437,12 @@ impl SubqueryAlias {
         alias: impl Into<String>,
     ) -> datafusion_common::Result<Self> {
         let alias = alias.into();
+        let table_ref = TableReference::bare(&alias);
         let schema: Schema = plan.schema().as_ref().clone().into();
-        let schema =
-            DFSchemaRef::new(DFSchema::try_from_qualified_schema(&alias, 
&schema)?);
+        let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(
+            table_ref.to_owned_reference(),
+            &schema,
+        )?);
         Ok(SubqueryAlias {
             input: Arc::new(plan),
             alias,
@@ -1521,9 +1524,7 @@ pub struct Window {
 #[derive(Clone)]
 pub struct TableScan {
     /// The name of the table
-    // TODO: change to OwnedTableReference
-    // see: https://github.com/apache/arrow-datafusion/issues/5522
-    pub table_name: String,
+    pub table_name: OwnedTableReference,
     /// The source of the table
     pub source: Arc<dyn TableSource>,
     /// Optional column indices to use as a projection
@@ -2393,7 +2394,7 @@ mod tests {
             Field::new("state", DataType::Utf8, false),
         ]);
 
-        table_scan(None, &schema, Some(vec![0, 1]))
+        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
             .unwrap()
             .filter(col("state").eq(lit("CO")))
             .unwrap()
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3d8b44792..ea8607fee 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -644,14 +644,10 @@ pub fn from_plan(
             }))
         }
         LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
-            let schema = inputs[0].schema().as_ref().clone().into();
-            let schema =
-                DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, 
&schema)?);
-            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
-                alias: alias.clone(),
-                input: Arc::new(inputs[0].clone()),
-                schema,
-            }))
+            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                inputs[0].clone(),
+                alias.clone(),
+            )?))
         }
         LogicalPlan::Limit(Limit { skip, fetch, .. }) => 
Ok(LogicalPlan::Limit(Limit {
             skip: *skip,
diff --git a/datafusion/optimizer/src/inline_table_scan.rs 
b/datafusion/optimizer/src/inline_table_scan.rs
index 6b5839919..722a70cb3 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -57,7 +57,16 @@ impl OptimizerRule for InlineTableScan {
                         generate_projection_expr(projection, sub_plan)?;
                     let plan = LogicalPlanBuilder::from(sub_plan.clone())
                         .project(projection_exprs)?
-                        .alias(table_name)?;
+                        // Since this This is creating a subquery like:
+                        //```sql
+                        // ...
+                        // FROM <view definition> as "table_name"
+                        // ```
+                        //
+                        // it doesn't make sense to have a qualified
+                        // reference (e.g. "foo"."bar") -- this convert to
+                        // string
+                        .alias(table_name.to_string())?;
                     Ok(Some(plan.build()?))
                 } else {
                     Ok(None)
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index f910268df..55c77e51e 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -2082,7 +2082,7 @@ mod tests {
         let test_provider = PushDownProvider { filter_support };
 
         let table_scan = LogicalPlan::TableScan(TableScan {
-            table_name: "test".to_string(),
+            table_name: "test".into(),
             filters: vec![],
             projected_schema: Arc::new(DFSchema::try_from(
                 (*test_provider.schema()).clone(),
@@ -2154,7 +2154,7 @@ mod tests {
         };
 
         let table_scan = LogicalPlan::TableScan(TableScan {
-            table_name: "test".to_string(),
+            table_name: "test".into(),
             filters: vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))],
             projected_schema: Arc::new(DFSchema::try_from(
                 (*test_provider.schema()).clone(),
@@ -2183,7 +2183,7 @@ mod tests {
         };
 
         let table_scan = LogicalPlan::TableScan(TableScan {
-            table_name: "test".to_string(),
+            table_name: "test".into(),
             filters: vec![],
             projected_schema: Arc::new(DFSchema::try_from(
                 (*test_provider.schema()).clone(),
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 4e9d5f039..767077aa0 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -496,8 +496,7 @@ fn push_down_scan(
     let mut projection: BTreeSet<usize> = used_columns
         .iter()
         .filter(|c| {
-            c.relation.is_none()
-                || c.relation.as_ref().unwrap().to_string() == scan.table_name
+            c.relation.is_none() || c.relation.as_ref().unwrap() == 
&scan.table_name
         })
         .map(|c| schema.index_of(&c.name))
         .filter_map(ArrowResult::ok)
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index d0deb567f..93ae09194 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -97,7 +97,8 @@ message ParquetFormat {
 message AvroFormat {}
 
 message ListingTableScanNode {
-  string table_name = 1;
+  reserved 1; // was string table_name
+  OwnedTableReference table_name = 14;
   repeated string paths = 2;
   string file_extension = 3;
   ProjectionColumns projection = 4;
@@ -115,7 +116,8 @@ message ListingTableScanNode {
 }
 
 message ViewTableScanNode {
-  string table_name = 1;
+  reserved 1; // was string table_name
+  OwnedTableReference table_name = 6;
   LogicalPlanNode input = 2;
   Schema schema = 3;
   ProjectionColumns projection = 4;
@@ -124,7 +126,8 @@ message ViewTableScanNode {
 
 // Logical Plan to Scan a CustomTableProvider registered at runtime
 message CustomTableScanNode {
-  string table_name = 1;
+  reserved 1; // was string table_name
+  OwnedTableReference table_name = 6;
   ProjectionColumns projection = 2;
   Schema schema = 3;
   repeated LogicalExprNode filters = 4;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 7246b5f2b..0ccb62fc9 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4286,7 +4286,7 @@ impl serde::Serialize for CustomTableScanNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if !self.table_name.is_empty() {
+        if self.table_name.is_some() {
             len += 1;
         }
         if self.projection.is_some() {
@@ -4302,8 +4302,8 @@ impl serde::Serialize for CustomTableScanNode {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.CustomTableScanNode", len)?;
-        if !self.table_name.is_empty() {
-            struct_ser.serialize_field("tableName", &self.table_name)?;
+        if let Some(v) = self.table_name.as_ref() {
+            struct_ser.serialize_field("tableName", v)?;
         }
         if let Some(v) = self.projection.as_ref() {
             struct_ser.serialize_field("projection", v)?;
@@ -4399,7 +4399,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode 
{
                             if table_name__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("tableName"));
                             }
-                            table_name__ = Some(map.next_value()?);
+                            table_name__ = map.next_value()?;
                         }
                         GeneratedField::Projection => {
                             if projection__.is_some() {
@@ -4430,7 +4430,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode 
{
                     }
                 }
                 Ok(CustomTableScanNode {
-                    table_name: table_name__.unwrap_or_default(),
+                    table_name: table_name__,
                     projection: projection__,
                     schema: schema__,
                     filters: filters__.unwrap_or_default(),
@@ -9586,7 +9586,7 @@ impl serde::Serialize for ListingTableScanNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if !self.table_name.is_empty() {
+        if self.table_name.is_some() {
             len += 1;
         }
         if !self.paths.is_empty() {
@@ -9620,8 +9620,8 @@ impl serde::Serialize for ListingTableScanNode {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.ListingTableScanNode", len)?;
-        if !self.table_name.is_empty() {
-            struct_ser.serialize_field("tableName", &self.table_name)?;
+        if let Some(v) = self.table_name.as_ref() {
+            struct_ser.serialize_field("tableName", v)?;
         }
         if !self.paths.is_empty() {
             struct_ser.serialize_field("paths", &self.paths)?;
@@ -9779,7 +9779,7 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
                             if table_name__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("tableName"));
                             }
-                            table_name__ = Some(map.next_value()?);
+                            table_name__ = map.next_value()?;
                         }
                         GeneratedField::Paths => {
                             if paths__.is_some() {
@@ -9861,7 +9861,7 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
                     }
                 }
                 Ok(ListingTableScanNode {
-                    table_name: table_name__.unwrap_or_default(),
+                    table_name: table_name__,
                     paths: paths__.unwrap_or_default(),
                     file_extension: file_extension__.unwrap_or_default(),
                     projection: projection__,
@@ -20863,7 +20863,7 @@ impl serde::Serialize for ViewTableScanNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if !self.table_name.is_empty() {
+        if self.table_name.is_some() {
             len += 1;
         }
         if self.input.is_some() {
@@ -20879,8 +20879,8 @@ impl serde::Serialize for ViewTableScanNode {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.ViewTableScanNode", len)?;
-        if !self.table_name.is_empty() {
-            struct_ser.serialize_field("tableName", &self.table_name)?;
+        if let Some(v) = self.table_name.as_ref() {
+            struct_ser.serialize_field("tableName", v)?;
         }
         if let Some(v) = self.input.as_ref() {
             struct_ser.serialize_field("input", v)?;
@@ -20975,7 +20975,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode 
{
                             if table_name__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("tableName"));
                             }
-                            table_name__ = Some(map.next_value()?);
+                            table_name__ = map.next_value()?;
                         }
                         GeneratedField::Input => {
                             if input__.is_some() {
@@ -21004,7 +21004,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode 
{
                     }
                 }
                 Ok(ViewTableScanNode {
-                    table_name: table_name__.unwrap_or_default(),
+                    table_name: table_name__,
                     input: input__,
                     schema: schema__,
                     projection: projection__,
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index da95fd558..6209989de 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -130,8 +130,8 @@ pub struct AvroFormat {}
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ListingTableScanNode {
-    #[prost(string, tag = "1")]
-    pub table_name: ::prost::alloc::string::String,
+    #[prost(message, optional, tag = "14")]
+    pub table_name: ::core::option::Option<OwnedTableReference>,
     #[prost(string, repeated, tag = "2")]
     pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
     #[prost(string, tag = "3")]
@@ -171,8 +171,8 @@ pub mod listing_table_scan_node {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ViewTableScanNode {
-    #[prost(string, tag = "1")]
-    pub table_name: ::prost::alloc::string::String,
+    #[prost(message, optional, tag = "6")]
+    pub table_name: ::core::option::Option<OwnedTableReference>,
     #[prost(message, optional, boxed, tag = "2")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
     #[prost(message, optional, tag = "3")]
@@ -186,8 +186,8 @@ pub struct ViewTableScanNode {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CustomTableScanNode {
-    #[prost(string, tag = "1")]
-    pub table_name: ::prost::alloc::string::String,
+    #[prost(message, optional, tag = "6")]
+    pub table_name: ::core::option::Option<OwnedTableReference>,
     #[prost(message, optional, tag = "2")]
     pub projection: ::core::option::Option<ProjectionColumns>,
     #[prost(message, optional, tag = "3")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 9c9be2732..cce62653b 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -398,8 +398,13 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 let provider = ListingTable::try_new(config)?;
 
+                let table_name = from_owned_table_reference(
+                    scan.table_name.as_ref(),
+                    "ListingTableScan",
+                )?;
+
                 LogicalPlanBuilder::scan_with_filters(
-                    &scan.table_name,
+                    table_name,
                     provider_as_source(Arc::new(provider)),
                     projection,
                     filters,
@@ -430,8 +435,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ctx,
                 )?;
 
+                let table_name =
+                    from_owned_table_reference(scan.table_name.as_ref(), 
"CustomScan")?;
+
                 LogicalPlanBuilder::scan_with_filters(
-                    &scan.table_name,
+                    table_name,
                     provider_as_source(provider),
                     projection,
                     filters,
@@ -730,8 +738,11 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 let provider = ViewTable::try_new(input, definition)?;
 
+                let table_name =
+                    from_owned_table_reference(scan.table_name.as_ref(), 
"ViewScan")?;
+
                 LogicalPlanBuilder::scan(
-                    &scan.table_name,
+                    table_name,
                     provider_as_source(Arc::new(provider)),
                     projection,
                 )?
@@ -843,7 +854,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         logical_plan_type: Some(LogicalPlanType::ListingScan(
                             protobuf::ListingTableScanNode {
                                 file_format_type: Some(file_format_type),
-                                table_name: table_name.to_owned(),
+                                table_name: Some(table_name.clone().into()),
                                 collect_stat: options.collect_stat,
                                 file_extension: options.file_extension.clone(),
                                 table_partition_cols: options
@@ -868,7 +879,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     Ok(protobuf::LogicalPlanNode {
                         logical_plan_type: 
Some(LogicalPlanType::ViewScan(Box::new(
                             protobuf::ViewTableScanNode {
-                                table_name: table_name.to_owned(),
+                                table_name: Some(table_name.clone().into()),
                                 input: Some(Box::new(
                                     
protobuf::LogicalPlanNode::try_from_logical_plan(
                                         view_table.logical_plan(),
@@ -890,7 +901,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         .try_encode_table_provider(provider, &mut bytes)
                         .map_err(|e| context!("Error serializing custom 
table", e))?;
                     let scan = CustomScan(CustomTableScanNode {
-                        table_name: table_name.clone(),
+                        table_name: Some(table_name.clone().into()),
                         projection,
                         schema: Some(schema),
                         filters,
diff --git a/datafusion/sql/src/relation/mod.rs 
b/datafusion/sql/src/relation/mod.rs
index 19a278666..1b369fedf 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -35,10 +35,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let table_name = table_ref.to_string();
                 let cte = planner_context.ctes.get(&table_name);
                 (
-                    match (cte, 
self.schema_provider.get_table_provider(table_ref)) {
+                    match (
+                        cte,
+                        
self.schema_provider.get_table_provider(table_ref.clone()),
+                    ) {
                         (Some(cte_plan), _) => Ok(cte_plan.clone()),
                         (_, Ok(provider)) => {
-                            LogicalPlanBuilder::scan(&table_name, provider, 
None)?.build()
+                            LogicalPlanBuilder::scan(table_ref, provider, 
None)?.build()
                         }
                         (None, Err(e)) => Err(e),
                     }?,
diff --git a/datafusion/substrait/src/logical_plan/producer.rs 
b/datafusion/substrait/src/logical_plan/producer.rs
index cf4003c1c..f891cf6cc 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -124,7 +124,7 @@ pub fn to_substrait_rel(
                         }),
                         advanced_extension: None,
                         read_type: Some(ReadType::NamedTable(NamedTable {
-                            names: vec![scan.table_name.clone()],
+                            names: vec![scan.table_name.to_string()],
                             advanced_extension: None,
                         })),
                     }))),


Reply via email to