This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0224f49ca Add view table scan support for datafusion-proto (#3875)
0224f49ca is described below
commit 0224f49caebdceeb1475358aaacf4b1a88bd90d4
Author: r.4ntix <[email protected]>
AuthorDate: Tue Oct 18 21:45:38 2022 +0800
Add view table scan support for datafusion-proto (#3875)
---
datafusion/core/src/datasource/view.rs | 10 +++++++
datafusion/proto/proto/datafusion.proto | 9 ++++++
datafusion/proto/src/lib.rs | 14 +++++++++
datafusion/proto/src/logical_plan.rs | 52 +++++++++++++++++++++++++++++++++
4 files changed, 85 insertions(+)
diff --git a/datafusion/core/src/datasource/view.rs
b/datafusion/core/src/datasource/view.rs
index 42e847b53..3cd9189db 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -59,6 +59,16 @@ impl ViewTable {
Ok(view)
}
+
+ /// Get definition ref
+ pub fn definition(&self) -> &Option<String> {
+ &self.definition
+ }
+
+ /// Get logical_plan ref
+ pub fn logical_plan(&self) -> &LogicalPlan {
+ &self.logical_plan
+ }
}
#[async_trait]
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 9f2cc2d07..d61f52ee7 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -69,6 +69,7 @@ message LogicalPlanNode {
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
DistinctNode distinct = 23;
+ ViewTableScanNode view_scan = 24;
}
}
@@ -109,6 +110,14 @@ message ListingTableScanNode {
}
}
+message ViewTableScanNode {
+ string table_name = 1;
+ LogicalPlanNode input = 2;
+ datafusion.Schema schema = 3;
+ ProjectionColumns projection = 4;
+ string definition = 5;
+}
+
message ProjectionNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 5f366b561..5f665c9f2 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -170,6 +170,20 @@ mod roundtrip_tests {
Ok(())
}
+ #[tokio::test]
+ async fn roundtrip_logical_plan_with_view_scan() -> Result<(),
DataFusionError> {
+ let ctx = SessionContext::new();
+ ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default())
+ .await?;
+ ctx.sql("CREATE VIEW view_t1(a, b) AS SELECT a, b FROM t1")
+ .await?;
+ let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
+ Ok(())
+ }
+
pub mod proto {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TopKPlanProto {
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index d0d3505fe..d61bb2d65 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -30,6 +30,7 @@ use datafusion::{
avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat,
FileFormat,
},
listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
+ view::ViewTable,
},
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
@@ -666,6 +667,37 @@ impl AsLogicalPlan for LogicalPlanNode {
into_logical_plan!(distinct.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).distinct()?.build()
}
+ LogicalPlanType::ViewScan(scan) => {
+ let schema: Schema = convert_required!(scan.schema)?;
+
+ let mut projection = None;
+ if let Some(columns) = &scan.projection {
+ let column_indices = columns
+ .columns
+ .iter()
+ .map(|name| schema.index_of(name))
+ .collect::<Result<Vec<usize>, _>>()?;
+ projection = Some(column_indices);
+ }
+
+ let input: LogicalPlan =
+ into_logical_plan!(scan.input, ctx, extension_codec)?;
+
+ let definition = if !scan.definition.is_empty() {
+ Some(scan.definition.clone())
+ } else {
+ None
+ };
+
+ let provider = ViewTable::try_new(input, definition)?;
+
+ LogicalPlanBuilder::scan(
+ &scan.table_name,
+ provider_as_source(Arc::new(provider)),
+ projection,
+ )?
+ .build()
+ }
}
}
@@ -777,6 +809,26 @@ impl AsLogicalPlan for LogicalPlanNode {
},
)),
})
+ } else if let Some(view_table) =
source.downcast_ref::<ViewTable>() {
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type:
Some(LogicalPlanType::ViewScan(Box::new(
+ protobuf::ViewTableScanNode {
+ table_name: table_name.to_owned(),
+ input: Some(Box::new(
+
protobuf::LogicalPlanNode::try_from_logical_plan(
+ view_table.logical_plan(),
+ extension_codec,
+ )?,
+ )),
+ schema: Some(schema),
+ projection,
+ definition: view_table
+ .definition()
+ .clone()
+ .unwrap_or_else(|| "".to_string()),
+ },
+ ))),
+ })
} else {
Err(DataFusionError::Internal(format!(
"logical plan to_proto unsupported table provider
{:?}",