alamb commented on code in PR #8306:
URL: https://github.com/apache/arrow-datafusion/pull/8306#discussion_r1407648293
##########
datafusion/sql/src/planner.rs:
##########
@@ -51,6 +51,15 @@ pub trait ContextProvider {
}
/// Getter for a datasource
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn
TableSource>>;
+ /// Getter for a table function
+ fn get_table_function_source(
+ &self,
+ _name: &str,
+ _args: Vec<Expr>,
+ ) -> Result<Arc<dyn TableSource>> {
+ unimplemented!()
Review Comment:
Can you please return a `NotImplementedError` here (rather than this panic)?
I think you can use this macro:
https://docs.rs/datafusion/latest/datafusion/common/macro.not_impl_err.html
##########
datafusion/core/src/datasource/function.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses a function to generate data
+
+use super::TableProvider;
+
+use datafusion_common::Result;
+use datafusion_expr::Expr;
+
+use std::sync::Arc;
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Sync + Send {
+ /// Create a table provider
+ fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
+}
+
+/// A table that uses a function to generate data
+pub struct TableFunction {
+ /// Name of the table function
+ name: String,
+ /// Function implementation
+ fun: Arc<dyn TableFunctionImpl>,
+}
+
+impl TableFunction {
+ /// Create a new table function
+ pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
+ Self { name, fun }
+ }
+
+ /// Get the name of the table function
+ pub fn name(&self) -> String {
+ self.name.clone()
+ }
+
+ /// Get the function implementation and generate a table
+ pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn
TableProvider>> {
Review Comment:
👍
##########
datafusion/core/src/datasource/function.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses a function to generate data
+
+use super::TableProvider;
+
+use datafusion_common::Result;
+use datafusion_expr::Expr;
+
+use std::sync::Arc;
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Sync + Send {
+ /// Create a table provider
+ fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
Review Comment:
This API is nice and is as specified in
https://github.com/apache/arrow-datafusion/issues/7926. I think it will work
for using a table function as a relation in the query (aka like a table with
parameters)
The one thing I don't think this API supports is TableFunctions that take
other arguments (aka that are fed the result of a table / can use the value of
correlated subqueries as mentioned by @yukkit and @Jesse-Bakker
https://github.com/apache/arrow-datafusion/issues/7926#issuecomment-1813692801.
I can think of two options:
1. Leave this API as is , and add a follow on / new API somehow to support
that usecase
2. Try to extend this API somehow to support table inputs
I personally prefer 1 as I think it offers several additional use cases,
even though it doesn't cover "take a table input".
Any other thoughts?
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -30,24 +32,65 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
- TableFactor::Table { name, alias, .. } => {
- // normalize name and alias
- let table_ref = self.object_name_to_table_reference(name)?;
- let table_name = table_ref.to_string();
- let cte = planner_context.get_cte(&table_name);
- (
- match (
- cte,
-
self.context_provider.get_table_source(table_ref.clone()),
- ) {
- (Some(cte_plan), _) => Ok(cte_plan.clone()),
- (_, Ok(provider)) => {
- LogicalPlanBuilder::scan(table_ref, provider,
None)?.build()
+ TableFactor::Table {
+ name, alias, args, ..
+ } => {
+ // this maybe a little diffcult to resolve others tables'
schema, so we only supprt value and scalar functions now
+ if let Some(func_args) = args {
+ let tbl_func_name =
name.0.get(0).unwrap().value.to_string();
+ let mut args = vec![];
+ for arg in func_args {
+ match arg {
+ FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
=> {
+ let expr = self.sql_expr_to_logical_expr(
+ expr,
+ // TODO(veeupup): for now, maybe it's
little diffcult to resolve tables' schema before create provider
+ // maybe we can put all
relations schema in
+ &DFSchema::empty(),
+ planner_context,
+ )?;
+ args.push(expr);
+ }
+ arg => {
+ unimplemented!(
Review Comment:
`unimplemented!` results in a panic, rather than an error that can be caught
by users.
Can you please return a NotImplementedError here (rather than this panic)?
I think you can use this macro:
https://docs.rs/datafusion/latest/datafusion/common/macro.not_impl_err.html
##########
datafusion/core/src/datasource/function.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses a function to generate data
+
+use super::TableProvider;
+
+use datafusion_common::Result;
+use datafusion_expr::Expr;
+
+use std::sync::Arc;
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Sync + Send {
+ /// Create a table provider
+ fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
+}
+
+/// A table that uses a function to generate data
+pub struct TableFunction {
+ /// Name of the table function
+ name: String,
+ /// Function implementation
+ fun: Arc<dyn TableFunctionImpl>,
+}
+
+impl TableFunction {
+ /// Create a new table function
+ pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
+ Self { name, fun }
+ }
+
+ /// Get the name of the table function
+ pub fn name(&self) -> String {
+ self.name.clone()
+ }
+
+ /// Get the function implementation and generate a table
+ pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn
TableProvider>> {
Review Comment:
👍
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -30,24 +32,65 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
- TableFactor::Table { name, alias, .. } => {
- // normalize name and alias
- let table_ref = self.object_name_to_table_reference(name)?;
- let table_name = table_ref.to_string();
- let cte = planner_context.get_cte(&table_name);
- (
- match (
- cte,
-
self.context_provider.get_table_source(table_ref.clone()),
- ) {
- (Some(cte_plan), _) => Ok(cte_plan.clone()),
- (_, Ok(provider)) => {
- LogicalPlanBuilder::scan(table_ref, provider,
None)?.build()
+ TableFactor::Table {
+ name, alias, args, ..
+ } => {
+ // this maybe a little diffcult to resolve others tables'
schema, so we only supprt value and scalar functions now
+ if let Some(func_args) = args {
+ let tbl_func_name =
name.0.get(0).unwrap().value.to_string();
+ let mut args = vec![];
+ for arg in func_args {
+ match arg {
+ FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
=> {
+ let expr = self.sql_expr_to_logical_expr(
+ expr,
+ // TODO(veeupup): for now, maybe it's
little diffcult to resolve tables' schema before create provider
Review Comment:
When the function is used like a base relation (aka a TableFactor / item in
the `FROM` clause) I don't think there is any schema to use to resolve the
input arguments (aka this code is correct and probably shouldn't be a todo).
I don't really know how we would represent the correlated subquery case in
https://github.com/apache/arrow-datafusion/issues/7926#issuecomment-1813692801
🤔 I think it would need some sort of analysis pass after the initial plan is
built
##########
datafusion-examples/examples/simple_udtf.rs:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::csv::reader::Format;
+use arrow::csv::ReaderBuilder;
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::datasource::function::TableFunctionImpl;
+use datafusion::datasource::streaming::StreamingTable;
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionState;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use datafusion::prelude::SessionContext;
+use datafusion_common::{DFSchema, ScalarValue};
+use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
+use std::fs::File;
+use std::io::Seek;
+use std::path::Path;
+use std::sync::Arc;
+
+// To define your own table function, you only need to do the following 3
things:
+// 1. Implement your own TableProvider
+// 2. Implement your own TableFunctionImpl and return your TableProvider
+// 3. Register the function using ctx.register_udtf
+
+/// This example demonstrates how to register a TableFunction
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create local execution context
+ let ctx = SessionContext::new();
+
+ ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {}));
+ ctx.register_udtf("read_csv_stream", Arc::new(LocalStreamCsvTable {}));
+
+ let testdata = datafusion::test_util::arrow_test_data();
+ let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
+
+ // run it with println now()
+ let df = ctx
+ .sql(format!("SELECT * FROM read_csv('{csv_file}', now());").as_str())
+ .await?;
+ df.show().await?;
+
+ // just run
+ let df = ctx
+ .sql(format!("SELECT * FROM read_csv('{csv_file}');").as_str())
+ .await?;
+ df.show().await?;
+
+ // stream csv table
+ let df2 = ctx
+ .sql(format!("SELECT * FROM read_csv_stream('{csv_file}');").as_str())
+ .await?;
+ df2.show().await?;
+
+ Ok(())
+}
+
+// Option1: (full implmentation of a TableProvider)
+struct LocalCsvTable {
+ schema: SchemaRef,
+ exprs: Vec<Expr>,
+ batches: Vec<RecordBatch>,
+}
+
+#[async_trait]
+impl TableProvider for LocalCsvTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if !self.exprs.is_empty() {
+ self.interpreter_expr(state).await?;
+ }
+ Ok(Arc::new(MemoryExec::try_new(
+ &[self.batches.clone()],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?))
+ }
+}
+
+impl LocalCsvTable {
+ // TODO(veeupup): maybe we can make interpreter Expr this more simpler for
users
+ // TODO(veeupup): maybe we can support more type of exprs
+ async fn interpreter_expr(&self, state: &SessionState) -> Result<()> {
+ use datafusion::logical_expr::expr_rewriter::normalize_col;
Review Comment:
Perhaps for this example, we implement something simpler like a projecton --
so like `read_csv('filename', 2)` would read only the first 2 rows from the
file?
That would be more inline with my expected usecase of changing the output of
the table based on the input arguments.
##########
datafusion/core/src/datasource/function.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses a function to generate data
+
+use super::TableProvider;
+
+use datafusion_common::Result;
+use datafusion_expr::Expr;
+
+use std::sync::Arc;
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Sync + Send {
+ /// Create a table provider
+ fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
+}
+
+/// A table that uses a function to generate data
+pub struct TableFunction {
+ /// Name of the table function
+ name: String,
+ /// Function implementation
+ fun: Arc<dyn TableFunctionImpl>,
+}
+
+impl TableFunction {
+ /// Create a new table function
+ pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
+ Self { name, fun }
+ }
+
+ /// Get the name of the table function
+ pub fn name(&self) -> String {
+ self.name.clone()
+ }
Review Comment:
I think the more common pattern is to return `&str` and allow the caller to
clone the value if necessary, thus avoiding clones when the caller doesn't need
an owned string
```suggestion
pub fn name(&self) -> &str {
&self.name
}
```
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -30,24 +32,65 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
- TableFactor::Table { name, alias, .. } => {
- // normalize name and alias
- let table_ref = self.object_name_to_table_reference(name)?;
- let table_name = table_ref.to_string();
- let cte = planner_context.get_cte(&table_name);
- (
- match (
- cte,
-
self.context_provider.get_table_source(table_ref.clone()),
- ) {
- (Some(cte_plan), _) => Ok(cte_plan.clone()),
- (_, Ok(provider)) => {
- LogicalPlanBuilder::scan(table_ref, provider,
None)?.build()
+ TableFactor::Table {
+ name, alias, args, ..
+ } => {
+ // this maybe a little diffcult to resolve others tables'
schema, so we only supprt value and scalar functions now
+ if let Some(func_args) = args {
+ let tbl_func_name =
name.0.get(0).unwrap().value.to_string();
+ let mut args = vec![];
+ for arg in func_args {
+ match arg {
+ FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
=> {
+ let expr = self.sql_expr_to_logical_expr(
+ expr,
+ // TODO(veeupup): for now, maybe it's
little diffcult to resolve tables' schema before create provider
Review Comment:
When the function is used like a base relation (aka a TableFactor / item in
the `FROM` clause) I don't think there is any schema to use to resolve the
input arguments (aka this code is correct and probably shouldn't be a todo).
I don't really know how we would represent the correlated subquery case in
https://github.com/apache/arrow-datafusion/issues/7926#issuecomment-1813692801
🤔 I think it would need some sort of analysis pass after the initial plan is
built
##########
datafusion/sql/src/relation/mod.rs:
##########
@@ -30,24 +32,65 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
- TableFactor::Table { name, alias, .. } => {
- // normalize name and alias
- let table_ref = self.object_name_to_table_reference(name)?;
- let table_name = table_ref.to_string();
- let cte = planner_context.get_cte(&table_name);
- (
- match (
- cte,
-
self.context_provider.get_table_source(table_ref.clone()),
- ) {
- (Some(cte_plan), _) => Ok(cte_plan.clone()),
- (_, Ok(provider)) => {
- LogicalPlanBuilder::scan(table_ref, provider,
None)?.build()
+ TableFactor::Table {
+ name, alias, args, ..
+ } => {
+ // this maybe a little diffcult to resolve others tables'
schema, so we only supprt value and scalar functions now
+ if let Some(func_args) = args {
Review Comment:
You might be able to improve this code by reducing the indent
Something like
```rust
// plan all arguments
let args = args.map(|args| {
args.iter().map(|arg| {
self.sql_expr_to_logical_expr(....)
).collect::<Result<Vec<_>>>()
})
// swap Option<Result> -> Result<Option> and return error if any
.transpose()?
```
##########
datafusion-examples/examples/simple_udtf.rs:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::csv::reader::Format;
+use arrow::csv::ReaderBuilder;
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::datasource::function::TableFunctionImpl;
+use datafusion::datasource::streaming::StreamingTable;
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionState;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use datafusion::prelude::SessionContext;
+use datafusion_common::{DFSchema, ScalarValue};
+use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
+use std::fs::File;
+use std::io::Seek;
+use std::path::Path;
+use std::sync::Arc;
+
+// To define your own table function, you only need to do the following 3
things:
+// 1. Implement your own TableProvider
+// 2. Implement your own TableFunctionImpl and return your TableProvider
+// 3. Register the function using ctx.register_udtf
+
+/// This example demonstrates how to register a TableFunction
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create local execution context
+ let ctx = SessionContext::new();
+
+ ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {}));
+ ctx.register_udtf("read_csv_stream", Arc::new(LocalStreamCsvTable {}));
+
+ let testdata = datafusion::test_util::arrow_test_data();
+ let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
+
+ // run it with println now()
+ let df = ctx
+ .sql(format!("SELECT * FROM read_csv('{csv_file}', now());").as_str())
+ .await?;
+ df.show().await?;
+
+ // just run
+ let df = ctx
+ .sql(format!("SELECT * FROM read_csv('{csv_file}');").as_str())
+ .await?;
+ df.show().await?;
+
+ // stream csv table
+ let df2 = ctx
+ .sql(format!("SELECT * FROM read_csv_stream('{csv_file}');").as_str())
+ .await?;
+ df2.show().await?;
+
+ Ok(())
+}
+
+// Option1: (full implmentation of a TableProvider)
+struct LocalCsvTable {
+ schema: SchemaRef,
+ exprs: Vec<Expr>,
+ batches: Vec<RecordBatch>,
+}
+
+#[async_trait]
+impl TableProvider for LocalCsvTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if !self.exprs.is_empty() {
+ self.interpreter_expr(state).await?;
+ }
+ Ok(Arc::new(MemoryExec::try_new(
+ &[self.batches.clone()],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?))
+ }
+}
+
+impl LocalCsvTable {
+ // TODO(veeupup): maybe we can make interpreter Expr this more simpler for
users
+ // TODO(veeupup): maybe we can support more type of exprs
+ async fn interpreter_expr(&self, state: &SessionState) -> Result<()> {
+ use datafusion::logical_expr::expr_rewriter::normalize_col;
+ use datafusion::logical_expr::utils::columnize_expr;
+ let plan = LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: true,
+ schema: Arc::new(DFSchema::empty()),
+ });
+ let logical_plan = Projection::try_new(
+ vec![columnize_expr(
+ normalize_col(self.exprs[0].clone(), &plan)?,
+ plan.schema(),
+ )],
+ Arc::new(plan),
+ )
+ .map(LogicalPlan::Projection)?;
+ let rbs = collect(
+ state.create_physical_plan(&logical_plan).await?,
+ Arc::new(TaskContext::from(state)),
+ )
+ .await?;
+ println!("time now: {:?}", rbs[0].column(0));
+ Ok(())
+ }
+}
+
+struct LocalCsvTableFunc {}
+
+impl TableFunctionImpl for LocalCsvTableFunc {
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ let mut new_exprs = vec![];
+ let mut filepath = String::new();
+ for expr in exprs {
+ match expr {
+ Expr::Literal(ScalarValue::Utf8(Some(ref path))) => {
+ filepath = path.clone()
+ }
+ expr => new_exprs.push(expr.clone()),
+ }
+ }
+ let (schema, batches) = read_csv_batches(filepath)?;
+ let table = LocalCsvTable {
+ schema,
+ exprs: new_exprs.clone(),
+ batches,
+ };
+ Ok(Arc::new(table))
+ }
+}
+
+// Option2: (use StreamingTable to make it simpler)
Review Comment:
I actually think adding a second option is more confusing in this example,
so I would recommend removing this part (perhaps you can just add a note that
it is posisble)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]