This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 93e14a7079 Fix `describe <table>` to work without SessionContext
(#7441)
93e14a7079 is described below
commit 93e14a70793ebf091ee39e2fb3ac72918ada2e13
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Sep 14 15:28:34 2023 -0400
Fix `describe <table>` to work without SessionContext (#7441)
* Fix describe to work without SessionContext
* fix: fmt
* fix schema reporting
* fix schema, add comments
---
datafusion/core/src/execution/context.rs | 58 +----------------
datafusion/core/src/physical_planner.rs | 49 +++++++++++++--
datafusion/core/tests/dataframe/describe.rs | 96 +++++++++++++++++++++++++++++
datafusion/core/tests/dataframe/mod.rs | 59 +-----------------
datafusion/core/tests/sql/describe.rs | 72 ++++++++++++++++++++++
datafusion/core/tests/sql/mod.rs | 1 +
datafusion/expr/src/logical_plan/plan.rs | 38 ++++++++++--
datafusion/sql/src/statement.rs | 4 +-
8 files changed, 254 insertions(+), 123 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 12b0f27229..78b79680b4 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -36,7 +36,7 @@ use datafusion_common::{
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
- DescribeTable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
+ StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
@@ -50,11 +50,8 @@ use std::{
};
use std::{ops::ControlFlow, sync::Weak};
+use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
-use arrow::{
- array::StringBuilder,
- datatypes::{DataType, Field, Schema, SchemaRef},
-};
use crate::catalog::{
schema::{MemorySchemaProvider, SchemaProvider},
@@ -472,9 +469,6 @@ impl SessionContext {
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
self.set_variable(stmt).await
}
- LogicalPlan::DescribeTable(DescribeTable { schema, .. }) => {
- self.return_describe_table_dataframe(schema).await
- }
plan => Ok(DataFrame::new(self.state(), plan)),
}
@@ -486,53 +480,6 @@ impl SessionContext {
Ok(DataFrame::new(self.state(), plan))
}
- // return an record_batch which describe table
- async fn return_describe_table_record_batch(
- &self,
- schema: Arc<Schema>,
- ) -> Result<RecordBatch> {
- let record_batch_schema = Arc::new(Schema::new(vec![
- Field::new("column_name", DataType::Utf8, false),
- Field::new("data_type", DataType::Utf8, false),
- Field::new("is_nullable", DataType::Utf8, false),
- ]));
-
- let mut column_names = StringBuilder::new();
- let mut data_types = StringBuilder::new();
- let mut is_nullables = StringBuilder::new();
- for (_, field) in schema.fields().iter().enumerate() {
- column_names.append_value(field.name());
-
- // "System supplied type" --> Use debug format of the datatype
- let data_type = field.data_type();
- data_types.append_value(format!("{data_type:?}"));
-
- // "YES if the column is possibly nullable, NO if it is known not
nullable. "
- let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
- is_nullables.append_value(nullable_str);
- }
-
- let record_batch = RecordBatch::try_new(
- record_batch_schema,
- vec![
- Arc::new(column_names.finish()),
- Arc::new(data_types.finish()),
- Arc::new(is_nullables.finish()),
- ],
- )?;
-
- Ok(record_batch)
- }
-
- // return an dataframe which describe file
- async fn return_describe_table_dataframe(
- &self,
- schema: Arc<Schema>,
- ) -> Result<DataFrame> {
- let record_batch =
self.return_describe_table_record_batch(schema).await?;
- self.read_batch(record_batch)
- }
-
async fn create_external_table(
&self,
cmd: &CreateExternalTable,
@@ -2276,6 +2223,7 @@ mod tests {
use crate::variable::VarType;
use arrow::array::ArrayRef;
use arrow::record_batch::RecordBatch;
+ use arrow_schema::{Field, Schema};
use async_trait::async_trait;
use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};
use datafusion_physical_expr::functions::make_scalar_function;
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index def4d59873..1a9dd7bff0 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -37,6 +37,9 @@ use crate::logical_expr::{
CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning,
PlanType,
Repartition, Union, UserDefinedLogicalNode,
};
+use crate::physical_plan::memory::MemoryExec;
+use arrow_array::builder::StringBuilder;
+use arrow_array::RecordBatch;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::FileTypeWriterOptions;
use datafusion_common::FileType;
@@ -84,7 +87,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
-use datafusion_expr::{DmlStatement, StringifiedPlan, WriteOp};
+use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp};
use datafusion_expr::{WindowFrame, WindowFrameBound};
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -1261,10 +1264,9 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Statement({name})"
)
}
- LogicalPlan::DescribeTable(_) => {
- internal_err!(
- "Unsupported logical plan: DescribeTable must be root
of the plan"
- )
+ LogicalPlan::DescribeTable(DescribeTable { schema,
output_schema}) => {
+ let output_schema: Schema = output_schema.as_ref().into();
+ self.plan_describe(schema.clone(), Arc::new(output_schema))
}
LogicalPlan::Explain(_) => internal_err!(
"Unsupported logical plan: Explain must be root of the
plan"
@@ -2003,6 +2005,43 @@ impl DefaultPhysicalPlanner {
trace!("Detailed optimized physical plan:\n{:?}", new_plan);
Ok(new_plan)
}
+
+ // return an record_batch which describes a table's schema.
+ fn plan_describe(
+ &self,
+ table_schema: Arc<Schema>,
+ output_schema: Arc<Schema>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let mut column_names = StringBuilder::new();
+ let mut data_types = StringBuilder::new();
+ let mut is_nullables = StringBuilder::new();
+ for (_, field) in table_schema.fields().iter().enumerate() {
+ column_names.append_value(field.name());
+
+ // "System supplied type" --> Use debug format of the datatype
+ let data_type = field.data_type();
+ data_types.append_value(format!("{data_type:?}"));
+
+ // "YES if the column is possibly nullable, NO if it is known not
nullable. "
+ let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
+ is_nullables.append_value(nullable_str);
+ }
+
+ let record_batch = RecordBatch::try_new(
+ output_schema,
+ vec![
+ Arc::new(column_names.finish()),
+ Arc::new(data_types.finish()),
+ Arc::new(is_nullables.finish()),
+ ],
+ )?;
+
+ let schema = record_batch.schema();
+ let partitions = vec![vec![record_batch]];
+ let projection = None;
+ let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?;
+ Ok(Arc::new(mem_exec))
+ }
}
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
diff --git a/datafusion/core/tests/dataframe/describe.rs
b/datafusion/core/tests/dataframe/describe.rs
new file mode 100644
index 0000000000..da7589072b
--- /dev/null
+++ b/datafusion/core/tests/dataframe/describe.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+ assert_batches_eq,
+ prelude::{ParquetReadOptions, SessionContext},
+};
+use datafusion_common::{test_util::parquet_test_data, Result};
+
+#[tokio::test]
+async fn describe() -> Result<()> {
+ let ctx = parquet_context().await;
+
+ let describe_record_batch = ctx
+ .table("alltypes_tiny_pages")
+ .await?
+ .describe()
+ .await?
+ .collect()
+ .await?;
+
+ #[rustfmt::skip]
+ let expected = [
+
"+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+",
+ "| describe | id | bool_col | tinyint_col |
smallint_col | int_col | bigint_col | float_col
| double_col | date_string_col | string_col | timestamp_col
| year | month |",
+
"+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+",
+ "| count | 7300.0 | 7300 | 7300.0 |
7300.0 | 7300.0 | 7300.0 | 7300.0
| 7300.0 | 7300 | 7300 | 7300
| 7300.0 | 7300.0 |",
+ "| null_count | 7300.0 | 7300 | 7300.0 |
7300.0 | 7300.0 | 7300.0 | 7300.0
| 7300.0 | 7300 | 7300 | 7300
| 7300.0 | 7300.0 |",
+ "| mean | 3649.5 | null | 4.5 |
4.5 | 4.5 | 45.0 |
4.949999964237213 | 45.45000000000001 | null | null | null
| 2009.5 | 6.526027397260274 |",
+ "| std | 2107.472815166704 | null | 2.8724780750809518 |
2.8724780750809518 | 2.8724780750809518 | 28.724780750809533 |
3.1597258182544645 | 29.012028558317645 | null | null | null
| 0.5000342500942125 | 3.44808750051728 |",
+ "| min | 0.0 | null | 0.0 |
0.0 | 0.0 | 0.0 | 0.0
| 0.0 | 01/01/09 | 0 | 2008-12-31T23:00:00
| 2009.0 | 1.0 |",
+ "| max | 7299.0 | null | 9.0 |
9.0 | 9.0 | 90.0 |
9.899999618530273 | 90.89999999999999 | 12/31/10 | 9 |
2010-12-31T04:09:13.860 | 2010.0 | 12.0 |",
+ "| median | 3649.0 | null | 4.0 |
4.0 | 4.0 | 45.0 |
4.949999809265137 | 45.45 | null | null | null
| 2009.0 | 7.0 |",
+
"+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+"
+ ];
+ assert_batches_eq!(expected, &describe_record_batch);
+ Ok(())
+}
+
+#[tokio::test]
+async fn describe_boolean_binary() -> Result<()> {
+ let ctx = parquet_context().await;
+
+ //add test case for only boolean boolean/binary column
+ let result = ctx
+ .sql("select 'a' as a,true as b")
+ .await?
+ .describe()
+ .await?
+ .collect()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+------------+------+------+",
+ "| describe | a | b |",
+ "+------------+------+------+",
+ "| count | 1 | 1 |",
+ "| null_count | 1 | 1 |",
+ "| mean | null | null |",
+ "| std | null | null |",
+ "| min | a | null |",
+ "| max | a | null |",
+ "| median | null | null |",
+ "+------------+------+------+"
+ ];
+ assert_batches_eq!(expected, &result);
+ Ok(())
+}
+
+/// Return a SessionContext with parquet file registered
+async fn parquet_context() -> SessionContext {
+ let ctx = SessionContext::new();
+ let testdata = parquet_test_data();
+ ctx.register_parquet(
+ "alltypes_tiny_pages",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+ ctx
+}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 3e0a2ec826..e1982761f0 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -17,6 +17,7 @@
// Include tests in dataframe_functions
mod dataframe_functions;
+mod describe;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::pretty_format_batches;
@@ -267,64 +268,6 @@ async fn test_count_wildcard_on_where_scalar_subquery() ->
Result<()> {
Ok(())
}
-#[tokio::test]
-async fn describe() -> Result<()> {
- let ctx = SessionContext::new();
- let testdata = parquet_test_data();
- ctx.register_parquet(
- "alltypes_tiny_pages",
- &format!("{testdata}/alltypes_tiny_pages.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
-
- let describe_record_batch = ctx
- .table("alltypes_tiny_pages")
- .await?
- .describe()
- .await?
- .collect()
- .await?;
-
- #[rustfmt::skip]
- let expected =
["+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+",
- "| describe | id | bool_col | tinyint_col |
smallint_col | int_col | bigint_col | float_col
| double_col | date_string_col | string_col | timestamp_col
| year | month |",
-
"+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+",
- "| count | 7300.0 | 7300 | 7300.0 |
7300.0 | 7300.0 | 7300.0 | 7300.0
| 7300.0 | 7300 | 7300 | 7300
| 7300.0 | 7300.0 |",
- "| null_count | 7300.0 | 7300 | 7300.0 |
7300.0 | 7300.0 | 7300.0 | 7300.0
| 7300.0 | 7300 | 7300 | 7300
| 7300.0 | 7300.0 |",
- "| mean | 3649.5 | null | 4.5 |
4.5 | 4.5 | 45.0 |
4.949999964237213 | 45.45000000000001 | null | null | null
| 2009.5 | 6.526027397260274 |",
- "| std | 2107.472815166704 | null | 2.8724780750809518 |
2.8724780750809518 | 2.8724780750809518 | 28.724780750809533 |
3.1597258182544645 | 29.012028558317645 | null | null | null
| 0.5000342500942125 | 3.44808750051728 |",
- "| min | 0.0 | null | 0.0 |
0.0 | 0.0 | 0.0 | 0.0
| 0.0 | 01/01/09 | 0 | 2008-12-31T23:00:00
| 2009.0 | 1.0 |",
- "| max | 7299.0 | null | 9.0 |
9.0 | 9.0 | 90.0 |
9.899999618530273 | 90.89999999999999 | 12/31/10 | 9 |
2010-12-31T04:09:13.860 | 2010.0 | 12.0 |",
- "| median | 3649.0 | null | 4.0 |
4.0 | 4.0 | 45.0 |
4.949999809265137 | 45.45 | null | null | null
| 2009.0 | 7.0 |",
-
"+------------+-------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------------------------+--------------------+-------------------+"];
- assert_batches_eq!(expected, &describe_record_batch);
-
- //add test case for only boolean boolean/binary column
- let result = ctx
- .sql("select 'a' as a,true as b")
- .await?
- .describe()
- .await?
- .collect()
- .await?;
- #[rustfmt::skip]
- let expected = ["+------------+------+------+",
- "| describe | a | b |",
- "+------------+------+------+",
- "| count | 1 | 1 |",
- "| null_count | 1 | 1 |",
- "| mean | null | null |",
- "| std | null | null |",
- "| min | a | null |",
- "| max | a | null |",
- "| median | null | null |",
- "+------------+------+------+"];
- assert_batches_eq!(expected, &result);
-
- Ok(())
-}
-
#[tokio::test]
async fn join() -> Result<()> {
let schema1 = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/tests/sql/describe.rs
b/datafusion/core/tests/sql/describe.rs
new file mode 100644
index 0000000000..cd8e79b2c9
--- /dev/null
+++ b/datafusion/core/tests/sql/describe.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::assert_batches_eq;
+use datafusion::prelude::*;
+use datafusion_common::test_util::parquet_test_data;
+
+#[tokio::test]
+async fn describe_plan() {
+ let ctx = parquet_context().await;
+
+ let query = "describe alltypes_tiny_pages";
+ let results = ctx.sql(query).await.unwrap().collect().await.unwrap();
+
+ let expected = vec![
+ "+-----------------+-----------------------------+-------------+",
+ "| column_name | data_type | is_nullable |",
+ "+-----------------+-----------------------------+-------------+",
+ "| id | Int32 | YES |",
+ "| bool_col | Boolean | YES |",
+ "| tinyint_col | Int8 | YES |",
+ "| smallint_col | Int16 | YES |",
+ "| int_col | Int32 | YES |",
+ "| bigint_col | Int64 | YES |",
+ "| float_col | Float32 | YES |",
+ "| double_col | Float64 | YES |",
+ "| date_string_col | Utf8 | YES |",
+ "| string_col | Utf8 | YES |",
+ "| timestamp_col | Timestamp(Nanosecond, None) | YES |",
+ "| year | Int32 | YES |",
+ "| month | Int32 | YES |",
+ "+-----------------+-----------------------------+-------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ // also ensure we plan Describe via SessionState
+ let state = ctx.state();
+ let plan = state.create_logical_plan(query).await.unwrap();
+ let df = DataFrame::new(state, plan);
+ let results = df.collect().await.unwrap();
+
+ assert_batches_eq!(expected, &results);
+}
+
+/// Return a SessionContext with parquet file registered
+async fn parquet_context() -> SessionContext {
+ let ctx = SessionContext::new();
+ let testdata = parquet_test_data();
+ ctx.register_parquet(
+ "alltypes_tiny_pages",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+ ctx
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 35423234db..173c422942 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -82,6 +82,7 @@ pub mod arrow_files;
#[cfg(feature = "avro")]
pub mod create_drop;
pub mod csv_files;
+pub mod describe;
pub mod explain_analyze;
pub mod expr;
pub mod group_by;
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 4e196a7b96..dfc83f9eec 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -181,8 +181,8 @@ impl LogicalPlan {
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union(Union { schema, .. }) => schema,
- LogicalPlan::DescribeTable(DescribeTable { dummy_schema, .. }) => {
- dummy_schema
+ LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) =>
{
+ output_schema
}
LogicalPlan::Dml(DmlStatement { table_schema, .. }) =>
table_schema,
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
@@ -263,6 +263,15 @@ impl LogicalPlan {
]))
}
+ /// Returns the (fixed) output schema for `DESCRIBE` plans
+ pub fn describe_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("column_name", DataType::Utf8, false),
+ Field::new("data_type", DataType::Utf8, false),
+ Field::new("is_nullable", DataType::Utf8, false),
+ ])
+ }
+
/// returns all expressions (non-recursively) in the current
/// logical plan node. This does not include expressions in any
/// children
@@ -1947,12 +1956,33 @@ pub struct Prepare {
}
/// Describe the schema of table
+///
+/// # Example output:
+///
+/// ```sql
+/// ❯ describe traces;
+/// +--------------------+-----------------------------+-------------+
+/// | column_name | data_type | is_nullable |
+/// +--------------------+-----------------------------+-------------+
+/// | attributes | Utf8 | YES |
+/// | duration_nano | Int64 | YES |
+/// | end_time_unix_nano | Int64 | YES |
+/// | service.name | Dictionary(Int32, Utf8) | YES |
+/// | span.kind | Utf8 | YES |
+/// | span.name | Utf8 | YES |
+/// | span_id | Dictionary(Int32, Utf8) | YES |
+/// | time | Timestamp(Nanosecond, None) | NO |
+/// | trace_id | Dictionary(Int32, Utf8) | YES |
+/// | otel.status_code | Utf8 | YES |
+/// | parent_span_id | Utf8 | YES |
+/// +--------------------+-----------------------------+-------------+
+/// ```
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeTable {
/// Table schema
pub schema: Arc<Schema>,
- /// Dummy schema
- pub dummy_schema: DFSchemaRef,
+ /// schema of describe table output
+ pub output_schema: DFSchemaRef,
}
/// Produces a relation with string representations of
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 435cbdf0b2..ab19fa716c 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -576,9 +576,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let schema = table_source.schema();
+ let output_schema =
DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
+
Ok(LogicalPlan::DescribeTable(DescribeTable {
schema,
- dummy_schema: DFSchemaRef::new(DFSchema::empty()),
+ output_schema: Arc::new(output_schema),
}))
}