This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4afd67a0e Use TableReference for TableScan (#5615)
4afd67a0e is described below
commit 4afd67a0e496e1834ad6184629f28e60f66b2777
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 16 14:09:04 2023 +0100
Use TableReference for TableScan (#5615)
* Use TableReference for TableScan
* revert doc changes
* Use TableReference::none
* Apply suggestions from code review
Co-authored-by: Jeffrey <[email protected]>
* Add comment
* Restore table check
---------
Co-authored-by: Jeffrey <[email protected]>
---
datafusion/common/src/dfschema.rs | 16 ++--
datafusion/common/src/table_reference.rs | 5 ++
datafusion/core/src/execution/context.rs | 7 +-
datafusion/core/src/physical_plan/planner.rs | 5 +-
datafusion/core/src/test_util/mod.rs | 16 ++--
datafusion/expr/src/logical_plan/builder.rs | 105 +++++++++++++++++-----
datafusion/expr/src/logical_plan/plan.rs | 15 ++--
datafusion/expr/src/utils.rs | 12 +--
datafusion/optimizer/src/inline_table_scan.rs | 11 ++-
datafusion/optimizer/src/push_down_filter.rs | 6 +-
datafusion/optimizer/src/push_down_projection.rs | 3 +-
datafusion/proto/proto/datafusion.proto | 9 +-
datafusion/proto/src/generated/pbjson.rs | 30 +++----
datafusion/proto/src/generated/prost.rs | 12 +--
datafusion/proto/src/logical_plan/mod.rs | 23 +++--
datafusion/sql/src/relation/mod.rs | 7 +-
datafusion/substrait/src/logical_plan/producer.rs | 2 +-
17 files changed, 187 insertions(+), 97 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index 52a0b3d40..67a367c5e 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -108,13 +108,17 @@ impl DFSchema {
Ok(Self { fields, metadata })
}
- /// Create a `DFSchema` from an Arrow schema
- pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) ->
Result<Self> {
+ /// Create a `DFSchema` from an Arrow schema and a given qualifier
+ pub fn try_from_qualified_schema<'a>(
+ qualifier: impl Into<TableReference<'a>>,
+ schema: &Schema,
+ ) -> Result<Self> {
+ let qualifier = qualifier.into();
Self::new_with_metadata(
schema
.fields()
.iter()
- .map(|f| DFField::from_qualified(qualifier.to_string(),
f.clone()))
+ .map(|f| DFField::from_qualified(qualifier.clone(), f.clone()))
.collect(),
schema.metadata().clone(),
)
@@ -662,12 +666,12 @@ impl DFField {
}
/// Create a qualified field from an existing Arrow field
- pub fn from_qualified(
- qualifier: impl Into<OwnedTableReference>,
+ pub fn from_qualified<'a>(
+ qualifier: impl Into<TableReference<'a>>,
field: Field,
) -> Self {
Self {
- qualifier: Some(qualifier.into()),
+ qualifier: Some(qualifier.into().to_owned_reference()),
field,
}
}
diff --git a/datafusion/common/src/table_reference.rs
b/datafusion/common/src/table_reference.rs
index 257073681..aad66473a 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -80,6 +80,11 @@ impl std::fmt::Display for TableReference<'_> {
}
impl<'a> TableReference<'a> {
+ /// Convenience method for creating a typed none `None`
+ pub fn none() -> Option<TableReference<'a>> {
+ None
+ }
+
/// Convenience method for creating a `Bare` variant of `TableReference`
pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
TableReference::Bare {
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 1466bcba0..1cb199c2b 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1020,10 +1020,9 @@ impl SessionContext {
table_ref: impl Into<TableReference<'a>>,
) -> Result<DataFrame> {
let table_ref = table_ref.into();
- let table = table_ref.table().to_owned();
- let provider = self.table_provider(table_ref).await?;
+ let provider =
self.table_provider(table_ref.to_owned_reference()).await?;
let plan = LogicalPlanBuilder::scan(
- &table,
+ table_ref.to_owned_reference(),
provider_as_source(Arc::clone(&provider)),
None,
)?
@@ -1037,7 +1036,7 @@ impl SessionContext {
table_ref: impl Into<TableReference<'a>>,
) -> Result<Arc<dyn TableProvider>> {
let table_ref = table_ref.into();
- let table = table_ref.table().to_owned();
+ let table = table_ref.table().to_string();
let schema = self.state.read().schema_for_ref(table_ref)?;
match schema.table(&table).await {
Some(ref provider) => Ok(Arc::clone(provider)),
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 0dba182d8..9cc907b28 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1876,7 +1876,7 @@ mod tests {
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
use arrow::record_batch::RecordBatch;
- use datafusion_common::assert_contains;
+ use datafusion_common::{assert_contains, TableReference};
use datafusion_common::{DFField, DFSchema, DFSchemaRef};
use datafusion_expr::{
col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder,
@@ -2518,7 +2518,8 @@ Internal error: Optimizer rule 'type_coercion' failed due
to unexpected error: E
match ctx.read_csv(path, options).await?.into_optimized_plan()? {
LogicalPlan::TableScan(ref scan) => {
let mut scan = scan.clone();
- scan.table_name = name.to_string();
+ let table_reference =
TableReference::from(name).to_owned_reference();
+ scan.table_name = table_reference;
let new_schema = scan
.projected_schema
.as_ref()
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index 61cdc188f..8c570cff8 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -39,7 +39,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion_common::{DataFusionError, Statistics};
+use datafusion_common::{DataFusionError, Statistics, TableReference};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
@@ -219,11 +219,8 @@ pub fn scan_empty(
) -> Result<LogicalPlanBuilder, DataFusionError> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
- LogicalPlanBuilder::scan(
- name.unwrap_or(UNNAMED_TABLE),
- provider_as_source(provider),
- projection,
- )
+ let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
+ LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
/// Scan an empty data source with configured partition, mainly used in tests.
@@ -235,11 +232,8 @@ pub fn scan_empty_with_partitions(
) -> Result<LogicalPlanBuilder, DataFusionError> {
let table_schema = Arc::new(table_schema.clone());
let provider =
Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
- LogicalPlanBuilder::scan(
- name.unwrap_or(UNNAMED_TABLE),
- provider_as_source(provider),
- projection,
- )
+ let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
+ LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
/// Get the schema for the aggregate_test_* csv files
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 5616b7c52..c5e623785 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -41,7 +41,7 @@ use crate::{
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference, Result,
- ScalarValue, ToDFSchema,
+ ScalarValue, TableReference, ToDFSchema,
};
use std::any::Any;
use std::cmp::Ordering;
@@ -192,8 +192,39 @@ impl LogicalPlanBuilder {
}
/// Convert a table provider into a builder with a TableScan
+ ///
+ /// Note that if you pass a string as `table_name`, it is treated
+ /// as a SQL identifier, as described on [`TableReference`] and
+ /// thus is normalized
+ ///
+ /// # Example:
+ /// ```
+ /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
+ /// # logical_plan::builder::LogicalTableSource, logical_plan::table_scan
+ /// # };
+ /// # use std::sync::Arc;
+ /// # use arrow::datatypes::{Schema, DataType, Field};
+ /// # use datafusion_common::TableReference;
+ /// #
+ /// # let employee_schema = Arc::new(Schema::new(vec![
+ /// # Field::new("id", DataType::Int32, false),
+ /// # ])) as _;
+ /// # let table_source =
Arc::new(LogicalTableSource::new(employee_schema));
+ /// // Scan table_source with the name "mytable" (after normalization)
+ /// # let table = table_source.clone();
+ /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
+ ///
+ /// // Scan table_source with the name "MyTable" by enclosing in quotes
+ /// # let table = table_source.clone();
+ /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
+ ///
+ /// // Scan table_source with the name "MyTable" by forming the table
reference
+ /// # let table = table_source.clone();
+ /// let table_reference = TableReference::bare("MyTable");
+ /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
+ /// ```
pub fn scan(
- table_name: impl Into<String>,
+ table_name: impl Into<OwnedTableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
) -> Result<Self> {
@@ -217,14 +248,14 @@ impl LogicalPlanBuilder {
/// Convert a table provider into a builder with a TableScan
pub fn scan_with_filters(
- table_name: impl Into<String>,
+ table_name: impl Into<OwnedTableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<Self> {
let table_name = table_name.into();
- if table_name.is_empty() {
+ if table_name.table().is_empty() {
return Err(DataFusionError::Plan(
"table_name cannot be empty".to_string(),
));
@@ -239,7 +270,7 @@ impl LogicalPlanBuilder {
p.iter()
.map(|i| {
DFField::from_qualified(
- table_name.to_string(),
+ table_name.clone(),
schema.field(*i).clone(),
)
})
@@ -248,7 +279,7 @@ impl LogicalPlanBuilder {
)
})
.unwrap_or_else(|| {
- DFSchema::try_from_qualified_schema(&table_name, &schema)
+ DFSchema::try_from_qualified_schema(table_name.clone(),
&schema)
})?;
let table_scan = LogicalPlan::TableScan(TableScan {
@@ -1196,14 +1227,22 @@ pub fn subquery_alias(
/// Create a LogicalPlanBuilder representing a scan of a table with the
provided name and schema.
/// This is mostly used for testing and documentation.
-pub fn table_scan(
- name: Option<&str>,
+pub fn table_scan<'a>(
+ name: Option<impl Into<TableReference<'a>>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
+ let table_source = table_source(table_schema);
+ let name = name
+ .map(|n| n.into())
+ .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
+ .to_owned_reference();
+ LogicalPlanBuilder::scan(name, table_source, projection)
+}
+
+fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
let table_schema = Arc::new(table_schema.clone());
- let table_source = Arc::new(LogicalTableSource { table_schema });
- LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source,
projection)
+ Arc::new(LogicalTableSource { table_schema })
}
/// Wrap projection for a plan, if the join keys contains normal expression.
@@ -1361,12 +1400,36 @@ mod tests {
#[test]
fn plan_builder_schema() {
let schema = employee_schema();
- let plan = table_scan(Some("employee_csv"), &schema, None).unwrap();
+ let projection = None;
+ let plan =
+ LogicalPlanBuilder::scan("employee_csv", table_source(&schema),
projection)
+ .unwrap();
+ let expected = DFSchema::try_from_qualified_schema(
+ TableReference::bare("employee_csv"),
+ &schema,
+ )
+ .unwrap();
+ assert_eq!(&expected, plan.schema().as_ref());
- let expected =
- DFSchema::try_from_qualified_schema("employee_csv",
&schema).unwrap();
+ // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifer
+ // (and thus normalized to "employee"csv") as well
+ let projection = None;
+ let plan =
+ LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema),
projection)
+ .unwrap();
+ assert_eq!(&expected, plan.schema().as_ref());
+ }
- assert_eq!(&expected, plan.schema().as_ref())
+ #[test]
+ fn plan_builder_empty_name() {
+ let schema = employee_schema();
+ let projection = None;
+ let err =
+ LogicalPlanBuilder::scan("", table_source(&schema),
projection).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Error during planning: table_name cannot be empty"
+ );
}
#[test]
@@ -1481,9 +1544,10 @@ mod tests {
#[test]
fn plan_builder_union_different_num_columns_error() -> Result<()> {
- let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;
-
- let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
+ let plan1 =
+ table_scan(TableReference::none(), &employee_schema(),
Some(vec![3]))?;
+ let plan2 =
+ table_scan(TableReference::none(), &employee_schema(),
Some(vec![3, 4]))?;
let expected = "Error during planning: Union queries must have the
same number of columns, (left is 1, right is 2)";
let err_msg1 =
plan1.clone().union(plan2.clone().build()?).unwrap_err();
@@ -1707,9 +1771,10 @@ mod tests {
#[test]
fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
- let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;
-
- let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
+ let plan1 =
+ table_scan(TableReference::none(), &employee_schema(),
Some(vec![3]))?;
+ let plan2 =
+ table_scan(TableReference::none(), &employee_schema(),
Some(vec![3, 4]))?;
let expected = "Error during planning: INTERSECT/EXCEPT query must
have the same number of columns. \
Left is 1 and right is 2.";
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 777a5871c..a6ba1c961 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -34,7 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference,
- ScalarValue,
+ ScalarValue, TableReference,
};
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
@@ -1437,9 +1437,12 @@ impl SubqueryAlias {
alias: impl Into<String>,
) -> datafusion_common::Result<Self> {
let alias = alias.into();
+ let table_ref = TableReference::bare(&alias);
let schema: Schema = plan.schema().as_ref().clone().into();
- let schema =
- DFSchemaRef::new(DFSchema::try_from_qualified_schema(&alias,
&schema)?);
+ let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(
+ table_ref.to_owned_reference(),
+ &schema,
+ )?);
Ok(SubqueryAlias {
input: Arc::new(plan),
alias,
@@ -1521,9 +1524,7 @@ pub struct Window {
#[derive(Clone)]
pub struct TableScan {
/// The name of the table
- // TODO: change to OwnedTableReference
- // see: https://github.com/apache/arrow-datafusion/issues/5522
- pub table_name: String,
+ pub table_name: OwnedTableReference,
/// The source of the table
pub source: Arc<dyn TableSource>,
/// Optional column indices to use as a projection
@@ -2393,7 +2394,7 @@ mod tests {
Field::new("state", DataType::Utf8, false),
]);
- table_scan(None, &schema, Some(vec![0, 1]))
+ table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
.unwrap()
.filter(col("state").eq(lit("CO")))
.unwrap()
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3d8b44792..ea8607fee 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -644,14 +644,10 @@ pub fn from_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
- let schema = inputs[0].schema().as_ref().clone().into();
- let schema =
- DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias,
&schema)?);
- Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
- alias: alias.clone(),
- input: Arc::new(inputs[0].clone()),
- schema,
- }))
+ Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+ inputs[0].clone(),
+ alias.clone(),
+ )?))
}
LogicalPlan::Limit(Limit { skip, fetch, .. }) =>
Ok(LogicalPlan::Limit(Limit {
skip: *skip,
diff --git a/datafusion/optimizer/src/inline_table_scan.rs
b/datafusion/optimizer/src/inline_table_scan.rs
index 6b5839919..722a70cb3 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -57,7 +57,16 @@ impl OptimizerRule for InlineTableScan {
generate_projection_expr(projection, sub_plan)?;
let plan = LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
- .alias(table_name)?;
+ // Since this This is creating a subquery like:
+ //```sql
+ // ...
+ // FROM <view definition> as "table_name"
+ // ```
+ //
+ // it doesn't make sense to have a qualified
+ // reference (e.g. "foo"."bar") -- this convert to
+ // string
+ .alias(table_name.to_string())?;
Ok(Some(plan.build()?))
} else {
Ok(None)
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index f910268df..55c77e51e 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -2082,7 +2082,7 @@ mod tests {
let test_provider = PushDownProvider { filter_support };
let table_scan = LogicalPlan::TableScan(TableScan {
- table_name: "test".to_string(),
+ table_name: "test".into(),
filters: vec![],
projected_schema: Arc::new(DFSchema::try_from(
(*test_provider.schema()).clone(),
@@ -2154,7 +2154,7 @@ mod tests {
};
let table_scan = LogicalPlan::TableScan(TableScan {
- table_name: "test".to_string(),
+ table_name: "test".into(),
filters: vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))],
projected_schema: Arc::new(DFSchema::try_from(
(*test_provider.schema()).clone(),
@@ -2183,7 +2183,7 @@ mod tests {
};
let table_scan = LogicalPlan::TableScan(TableScan {
- table_name: "test".to_string(),
+ table_name: "test".into(),
filters: vec![],
projected_schema: Arc::new(DFSchema::try_from(
(*test_provider.schema()).clone(),
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 4e9d5f039..767077aa0 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -496,8 +496,7 @@ fn push_down_scan(
let mut projection: BTreeSet<usize> = used_columns
.iter()
.filter(|c| {
- c.relation.is_none()
- || c.relation.as_ref().unwrap().to_string() == scan.table_name
+ c.relation.is_none() || c.relation.as_ref().unwrap() ==
&scan.table_name
})
.map(|c| schema.index_of(&c.name))
.filter_map(ArrowResult::ok)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index d0deb567f..93ae09194 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -97,7 +97,8 @@ message ParquetFormat {
message AvroFormat {}
message ListingTableScanNode {
- string table_name = 1;
+ reserved 1; // was string table_name
+ OwnedTableReference table_name = 14;
repeated string paths = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
@@ -115,7 +116,8 @@ message ListingTableScanNode {
}
message ViewTableScanNode {
- string table_name = 1;
+ reserved 1; // was string table_name
+ OwnedTableReference table_name = 6;
LogicalPlanNode input = 2;
Schema schema = 3;
ProjectionColumns projection = 4;
@@ -124,7 +126,8 @@ message ViewTableScanNode {
// Logical Plan to Scan a CustomTableProvider registered at runtime
message CustomTableScanNode {
- string table_name = 1;
+ reserved 1; // was string table_name
+ OwnedTableReference table_name = 6;
ProjectionColumns projection = 2;
Schema schema = 3;
repeated LogicalExprNode filters = 4;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 7246b5f2b..0ccb62fc9 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4286,7 +4286,7 @@ impl serde::Serialize for CustomTableScanNode {
{
use serde::ser::SerializeStruct;
let mut len = 0;
- if !self.table_name.is_empty() {
+ if self.table_name.is_some() {
len += 1;
}
if self.projection.is_some() {
@@ -4302,8 +4302,8 @@ impl serde::Serialize for CustomTableScanNode {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.CustomTableScanNode", len)?;
- if !self.table_name.is_empty() {
- struct_ser.serialize_field("tableName", &self.table_name)?;
+ if let Some(v) = self.table_name.as_ref() {
+ struct_ser.serialize_field("tableName", v)?;
}
if let Some(v) = self.projection.as_ref() {
struct_ser.serialize_field("projection", v)?;
@@ -4399,7 +4399,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode
{
if table_name__.is_some() {
return
Err(serde::de::Error::duplicate_field("tableName"));
}
- table_name__ = Some(map.next_value()?);
+ table_name__ = map.next_value()?;
}
GeneratedField::Projection => {
if projection__.is_some() {
@@ -4430,7 +4430,7 @@ impl<'de> serde::Deserialize<'de> for CustomTableScanNode
{
}
}
Ok(CustomTableScanNode {
- table_name: table_name__.unwrap_or_default(),
+ table_name: table_name__,
projection: projection__,
schema: schema__,
filters: filters__.unwrap_or_default(),
@@ -9586,7 +9586,7 @@ impl serde::Serialize for ListingTableScanNode {
{
use serde::ser::SerializeStruct;
let mut len = 0;
- if !self.table_name.is_empty() {
+ if self.table_name.is_some() {
len += 1;
}
if !self.paths.is_empty() {
@@ -9620,8 +9620,8 @@ impl serde::Serialize for ListingTableScanNode {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.ListingTableScanNode", len)?;
- if !self.table_name.is_empty() {
- struct_ser.serialize_field("tableName", &self.table_name)?;
+ if let Some(v) = self.table_name.as_ref() {
+ struct_ser.serialize_field("tableName", v)?;
}
if !self.paths.is_empty() {
struct_ser.serialize_field("paths", &self.paths)?;
@@ -9779,7 +9779,7 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
if table_name__.is_some() {
return
Err(serde::de::Error::duplicate_field("tableName"));
}
- table_name__ = Some(map.next_value()?);
+ table_name__ = map.next_value()?;
}
GeneratedField::Paths => {
if paths__.is_some() {
@@ -9861,7 +9861,7 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
}
}
Ok(ListingTableScanNode {
- table_name: table_name__.unwrap_or_default(),
+ table_name: table_name__,
paths: paths__.unwrap_or_default(),
file_extension: file_extension__.unwrap_or_default(),
projection: projection__,
@@ -20863,7 +20863,7 @@ impl serde::Serialize for ViewTableScanNode {
{
use serde::ser::SerializeStruct;
let mut len = 0;
- if !self.table_name.is_empty() {
+ if self.table_name.is_some() {
len += 1;
}
if self.input.is_some() {
@@ -20879,8 +20879,8 @@ impl serde::Serialize for ViewTableScanNode {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.ViewTableScanNode", len)?;
- if !self.table_name.is_empty() {
- struct_ser.serialize_field("tableName", &self.table_name)?;
+ if let Some(v) = self.table_name.as_ref() {
+ struct_ser.serialize_field("tableName", v)?;
}
if let Some(v) = self.input.as_ref() {
struct_ser.serialize_field("input", v)?;
@@ -20975,7 +20975,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode
{
if table_name__.is_some() {
return
Err(serde::de::Error::duplicate_field("tableName"));
}
- table_name__ = Some(map.next_value()?);
+ table_name__ = map.next_value()?;
}
GeneratedField::Input => {
if input__.is_some() {
@@ -21004,7 +21004,7 @@ impl<'de> serde::Deserialize<'de> for ViewTableScanNode
{
}
}
Ok(ViewTableScanNode {
- table_name: table_name__.unwrap_or_default(),
+ table_name: table_name__,
input: input__,
schema: schema__,
projection: projection__,
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index da95fd558..6209989de 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -130,8 +130,8 @@ pub struct AvroFormat {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListingTableScanNode {
- #[prost(string, tag = "1")]
- pub table_name: ::prost::alloc::string::String,
+ #[prost(message, optional, tag = "14")]
+ pub table_name: ::core::option::Option<OwnedTableReference>,
#[prost(string, repeated, tag = "2")]
pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, tag = "3")]
@@ -171,8 +171,8 @@ pub mod listing_table_scan_node {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ViewTableScanNode {
- #[prost(string, tag = "1")]
- pub table_name: ::prost::alloc::string::String,
+ #[prost(message, optional, tag = "6")]
+ pub table_name: ::core::option::Option<OwnedTableReference>,
#[prost(message, optional, boxed, tag = "2")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<LogicalPlanNode>>,
#[prost(message, optional, tag = "3")]
@@ -186,8 +186,8 @@ pub struct ViewTableScanNode {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CustomTableScanNode {
- #[prost(string, tag = "1")]
- pub table_name: ::prost::alloc::string::String,
+ #[prost(message, optional, tag = "6")]
+ pub table_name: ::core::option::Option<OwnedTableReference>,
#[prost(message, optional, tag = "2")]
pub projection: ::core::option::Option<ProjectionColumns>,
#[prost(message, optional, tag = "3")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 9c9be2732..cce62653b 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -398,8 +398,13 @@ impl AsLogicalPlan for LogicalPlanNode {
let provider = ListingTable::try_new(config)?;
+ let table_name = from_owned_table_reference(
+ scan.table_name.as_ref(),
+ "ListingTableScan",
+ )?;
+
LogicalPlanBuilder::scan_with_filters(
- &scan.table_name,
+ table_name,
provider_as_source(Arc::new(provider)),
projection,
filters,
@@ -430,8 +435,11 @@ impl AsLogicalPlan for LogicalPlanNode {
ctx,
)?;
+ let table_name =
+ from_owned_table_reference(scan.table_name.as_ref(),
"CustomScan")?;
+
LogicalPlanBuilder::scan_with_filters(
- &scan.table_name,
+ table_name,
provider_as_source(provider),
projection,
filters,
@@ -730,8 +738,11 @@ impl AsLogicalPlan for LogicalPlanNode {
let provider = ViewTable::try_new(input, definition)?;
+ let table_name =
+ from_owned_table_reference(scan.table_name.as_ref(),
"ViewScan")?;
+
LogicalPlanBuilder::scan(
- &scan.table_name,
+ table_name,
provider_as_source(Arc::new(provider)),
projection,
)?
@@ -843,7 +854,7 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
protobuf::ListingTableScanNode {
file_format_type: Some(file_format_type),
- table_name: table_name.to_owned(),
+ table_name: Some(table_name.clone().into()),
collect_stat: options.collect_stat,
file_extension: options.file_extension.clone(),
table_partition_cols: options
@@ -868,7 +879,7 @@ impl AsLogicalPlan for LogicalPlanNode {
Ok(protobuf::LogicalPlanNode {
logical_plan_type:
Some(LogicalPlanType::ViewScan(Box::new(
protobuf::ViewTableScanNode {
- table_name: table_name.to_owned(),
+ table_name: Some(table_name.clone().into()),
input: Some(Box::new(
protobuf::LogicalPlanNode::try_from_logical_plan(
view_table.logical_plan(),
@@ -890,7 +901,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.try_encode_table_provider(provider, &mut bytes)
.map_err(|e| context!("Error serializing custom
table", e))?;
let scan = CustomScan(CustomTableScanNode {
- table_name: table_name.clone(),
+ table_name: Some(table_name.clone().into()),
projection,
schema: Some(schema),
filters,
diff --git a/datafusion/sql/src/relation/mod.rs
b/datafusion/sql/src/relation/mod.rs
index 19a278666..1b369fedf 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -35,10 +35,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let table_name = table_ref.to_string();
let cte = planner_context.ctes.get(&table_name);
(
- match (cte,
self.schema_provider.get_table_provider(table_ref)) {
+ match (
+ cte,
+
self.schema_provider.get_table_provider(table_ref.clone()),
+ ) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => {
- LogicalPlanBuilder::scan(&table_name, provider,
None)?.build()
+ LogicalPlanBuilder::scan(table_ref, provider,
None)?.build()
}
(None, Err(e)) => Err(e),
}?,
diff --git a/datafusion/substrait/src/logical_plan/producer.rs
b/datafusion/substrait/src/logical_plan/producer.rs
index cf4003c1c..f891cf6cc 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -124,7 +124,7 @@ pub fn to_substrait_rel(
}),
advanced_extension: None,
read_type: Some(ReadType::NamedTable(NamedTable {
- names: vec![scan.table_name.clone()],
+ names: vec![scan.table_name.to_string()],
advanced_extension: None,
})),
}))),