This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0224f49ca Add view table scan support for datafusion-proto (#3875)
0224f49ca is described below

commit 0224f49caebdceeb1475358aaacf4b1a88bd90d4
Author: r.4ntix <[email protected]>
AuthorDate: Tue Oct 18 21:45:38 2022 +0800

    Add view table scan support for datafusion-proto (#3875)
---
 datafusion/core/src/datasource/view.rs  | 10 +++++++
 datafusion/proto/proto/datafusion.proto |  9 ++++++
 datafusion/proto/src/lib.rs             | 14 +++++++++
 datafusion/proto/src/logical_plan.rs    | 52 +++++++++++++++++++++++++++++++++
 4 files changed, 85 insertions(+)

diff --git a/datafusion/core/src/datasource/view.rs 
b/datafusion/core/src/datasource/view.rs
index 42e847b53..3cd9189db 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -59,6 +59,16 @@ impl ViewTable {
 
         Ok(view)
     }
+
+    /// Get definition ref
+    pub fn definition(&self) -> &Option<String> {
+        &self.definition
+    }
+
+    /// Get logical_plan ref
+    pub fn logical_plan(&self) -> &LogicalPlan {
+        &self.logical_plan
+    }
 }
 
 #[async_trait]
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 9f2cc2d07..d61f52ee7 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -69,6 +69,7 @@ message LogicalPlanNode {
     SubqueryAliasNode subquery_alias = 21;
     CreateViewNode create_view = 22;
     DistinctNode distinct = 23;
+    ViewTableScanNode view_scan = 24;
   }
 }
 
@@ -109,6 +110,14 @@ message ListingTableScanNode {
   }
 }
 
+message ViewTableScanNode {
+  string table_name = 1;
+  LogicalPlanNode input = 2;
+  datafusion.Schema schema = 3;
+  ProjectionColumns projection = 4;
+  string definition = 5;
+}
+
 message ProjectionNode {
   LogicalPlanNode input = 1;
   repeated datafusion.LogicalExprNode expr = 2;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 5f366b561..5f665c9f2 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -170,6 +170,20 @@ mod roundtrip_tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn roundtrip_logical_plan_with_view_scan() -> Result<(), 
DataFusionError> {
+        let ctx = SessionContext::new();
+        ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default())
+            .await?;
+        ctx.sql("CREATE VIEW view_t1(a, b) AS SELECT a, b FROM t1")
+            .await?;
+        let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
+        let bytes = logical_plan_to_bytes(&plan)?;
+        let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+        assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
+        Ok(())
+    }
+
     pub mod proto {
         #[derive(Clone, PartialEq, ::prost::Message)]
         pub struct TopKPlanProto {
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index d0d3505fe..d61bb2d65 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -30,6 +30,7 @@ use datafusion::{
             avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat, 
FileFormat,
         },
         listing::{ListingOptions, ListingTable, ListingTableConfig, 
ListingTableUrl},
+        view::ViewTable,
     },
     datasource::{provider_as_source, source_as_provider},
     prelude::SessionContext,
@@ -666,6 +667,37 @@ impl AsLogicalPlan for LogicalPlanNode {
                     into_logical_plan!(distinct.input, ctx, extension_codec)?;
                 LogicalPlanBuilder::from(input).distinct()?.build()
             }
+            LogicalPlanType::ViewScan(scan) => {
+                let schema: Schema = convert_required!(scan.schema)?;
+
+                let mut projection = None;
+                if let Some(columns) = &scan.projection {
+                    let column_indices = columns
+                        .columns
+                        .iter()
+                        .map(|name| schema.index_of(name))
+                        .collect::<Result<Vec<usize>, _>>()?;
+                    projection = Some(column_indices);
+                }
+
+                let input: LogicalPlan =
+                    into_logical_plan!(scan.input, ctx, extension_codec)?;
+
+                let definition = if !scan.definition.is_empty() {
+                    Some(scan.definition.clone())
+                } else {
+                    None
+                };
+
+                let provider = ViewTable::try_new(input, definition)?;
+
+                LogicalPlanBuilder::scan(
+                    &scan.table_name,
+                    provider_as_source(Arc::new(provider)),
+                    projection,
+                )?
+                .build()
+            }
         }
     }
 
@@ -777,6 +809,26 @@ impl AsLogicalPlan for LogicalPlanNode {
                             },
                         )),
                     })
+                } else if let Some(view_table) = 
source.downcast_ref::<ViewTable>() {
+                    Ok(protobuf::LogicalPlanNode {
+                        logical_plan_type: 
Some(LogicalPlanType::ViewScan(Box::new(
+                            protobuf::ViewTableScanNode {
+                                table_name: table_name.to_owned(),
+                                input: Some(Box::new(
+                                    
protobuf::LogicalPlanNode::try_from_logical_plan(
+                                        view_table.logical_plan(),
+                                        extension_codec,
+                                    )?,
+                                )),
+                                schema: Some(schema),
+                                projection,
+                                definition: view_table
+                                    .definition()
+                                    .clone()
+                                    .unwrap_or_else(|| "".to_string()),
+                            },
+                        ))),
+                    })
                 } else {
                     Err(DataFusionError::Internal(format!(
                         "logical plan to_proto unsupported table provider 
{:?}",

Reply via email to