This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7a22ba9 ARROW-10859: [Rust] [DataFusion] Made collect not require
ExecutionContext
7a22ba9 is described below
commit 7a22ba9f4e04041e9be38824b405b3431d800294
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Sun Dec 13 07:41:34 2020 -0500
ARROW-10859: [Rust] [DataFusion] Made collect not require ExecutionContext
This PR observes that `ExecutionContext::collect(&self, plan: Arc<dyn
ExecutionPlan>)` does not use `self` on its implementation.
Using this observation, it refactors out `collect` out of
`ExecutionContext` (into `physical_plan::collect`), thereby simplifying the
execution of the plans, by not requiring creating an `ExecutionContext` just to
execute a physical plan.
From a design's perspective, this makes it obvious that the execution of a
physical plan is entirely independent of a context and its state.
Closes #8874 from jorgecarleitao/simplify_collect
Authored-by: Jorge C. Leitao <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/benchmarks/src/bin/nyctaxi.rs | 3 +-
rust/benchmarks/src/bin/tpch.rs | 3 +-
rust/datafusion/examples/flight_server.rs | 8 +--
rust/datafusion/src/execution/context.rs | 75 ++++++++++---------------
rust/datafusion/src/execution/dataframe_impl.rs | 4 +-
rust/datafusion/src/physical_plan/filter.rs | 4 +-
rust/datafusion/src/physical_plan/mod.rs | 20 +++++++
rust/datafusion/src/physical_plan/sort.rs | 9 ++-
rust/datafusion/src/test/mod.rs | 8 ---
rust/datafusion/tests/dataframe.rs | 4 +-
rust/datafusion/tests/sql.rs | 13 +++--
11 files changed, 76 insertions(+), 75 deletions(-)
diff --git a/rust/benchmarks/src/bin/nyctaxi.rs
b/rust/benchmarks/src/bin/nyctaxi.rs
index 1ffa684..a3cac44 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -27,6 +27,7 @@ use arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion::physical_plan::collect;
use datafusion::physical_plan::csv::CsvReadOptions;
use structopt::StructOpt;
@@ -116,7 +117,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str,
debug: bool) -> Resu
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(physical_plan).await?;
+ let result = collect(physical_plan).await?;
if debug {
pretty::print_batches(&result)?;
}
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index cd3d9d8..1d7f4cf 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -27,6 +27,7 @@ use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::collect;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::prelude::*;
@@ -987,7 +988,7 @@ async fn execute_query(
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(physical_plan).await?;
+ let result = collect(physical_plan).await?;
if debug {
pretty::print_batches(&result)?;
}
diff --git a/rust/datafusion/examples/flight_server.rs
b/rust/datafusion/examples/flight_server.rs
index d835ab0..549c005 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -21,9 +21,9 @@ use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
-use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
+use datafusion::{datasource::parquet::ParquetTable, physical_plan::collect};
use arrow_flight::{
flight_service_server::FlightService,
flight_service_server::FlightServiceServer,
@@ -105,10 +105,8 @@ impl FlightService for FlightServiceImpl {
.map_err(|e| to_tonic_err(&e))?;
// execute the query
- let results = ctx
- .collect(plan.clone())
- .await
- .map_err(|e| to_tonic_err(&e))?;
+ let results =
+ collect(plan.clone()).await.map_err(|e| to_tonic_err(&e))?;
if results.is_empty() {
return Err(Status::internal("There were no results from
ticket"));
}
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index d74c7db..d01bf6f 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -27,7 +27,6 @@ use futures::{StreamExt, TryStreamExt};
use arrow::csv;
use arrow::datatypes::*;
-use arrow::record_batch::RecordBatch;
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
@@ -40,9 +39,7 @@ use crate::logical_plan::{
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
-use crate::physical_plan::common;
use crate::physical_plan::csv::CsvReadOptions;
-use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
@@ -321,27 +318,6 @@ impl ExecutionContext {
.create_physical_plan(logical_plan, &self.state)
}
- /// Execute a physical plan and collect the results in memory
- pub async fn collect(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Vec<RecordBatch>> {
- match plan.output_partitioning().partition_count() {
- 0 => Ok(vec![]),
- 1 => {
- let it = plan.execute(0).await?;
- common::collect(it).await
- }
- _ => {
- // merge into a single partition
- let plan = MergeExec::new(plan.clone());
- // MergeExec must produce a single partition
- assert_eq!(1, plan.output_partitioning().partition_count());
- common::collect(plan.execute(0).await?).await
- }
- }
- }
-
/// Execute a query and write the results to a partitioned CSV file
pub async fn write_csv(
&self,
@@ -554,6 +530,7 @@ mod tests {
use super::*;
use crate::logical_plan::{col, create_udf, sum};
+ use crate::physical_plan::collect;
use crate::physical_plan::functions::ScalarFunctionImplementation;
use crate::test;
use crate::variable::VarType;
@@ -563,6 +540,7 @@ mod tests {
};
use arrow::array::{ArrayRef, Float64Array, Int32Array, StringArray};
use arrow::compute::add;
+ use arrow::record_batch::RecordBatch;
use std::fs::File;
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
@@ -602,7 +580,8 @@ mod tests {
let provider = test::create_table_dual();
ctx.register_table("dual", provider);
- let results = collect(&mut ctx, "SELECT @@version, @name FROM
dual").await?;
+ let results =
+ plan_and_collect(&mut ctx, "SELECT @@version, @name FROM
dual").await?;
let batch = &results[0];
assert_eq!(2, batch.num_columns());
@@ -638,7 +617,7 @@ mod tests {
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
- let results = ctx.collect(physical_plan).await?;
+ let results = collect(physical_plan).await?;
// there should be one batch per partition
assert_eq!(results.len(), partition_count);
@@ -685,7 +664,7 @@ mod tests {
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
- let batches = ctx.collect(physical_plan).await?;
+ let batches = collect(physical_plan).await?;
assert_eq!(4, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(10, batches[0].num_rows());
@@ -763,7 +742,7 @@ mod tests {
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());
- let batches = ctx.collect(physical_plan).await?;
+ let batches = collect(physical_plan).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());
@@ -1032,7 +1011,7 @@ mod tests {
CsvReadOptions::new().schema(&schema).has_header(false),
)?;
- let results = collect(
+ let results = plan_and_collect(
&mut ctx,
"
SELECT
@@ -1175,11 +1154,11 @@ mod tests {
ctx.register_csv("part3", &format!("{}/part-3.csv", out_dir),
csv_read_option)?;
ctx.register_csv("allparts", &out_dir, csv_read_option)?;
- let part0 = collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
- let part1 = collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
- let part2 = collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
- let part3 = collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
- let allparts = collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;
+ let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part0").await?;
+ let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part1").await?;
+ let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part2").await?;
+ let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part3").await?;
+ let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
allparts").await?;
let part0_count: usize = part0.iter().map(|batch|
batch.num_rows()).sum();
let part1_count: usize = part1.iter().map(|batch|
batch.num_rows()).sum();
@@ -1216,11 +1195,11 @@ mod tests {
ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))?;
ctx.register_parquet("allparts", &out_dir)?;
- let part0 = collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
- let part1 = collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
- let part2 = collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
- let part3 = collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
- let allparts = collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;
+ let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part0").await?;
+ let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part1").await?;
+ let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part2").await?;
+ let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part3").await?;
+ let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
allparts").await?;
let part0_count: usize = part0.iter().map(|batch|
batch.num_rows()).sum();
let part1_count: usize = part1.iter().map(|batch|
batch.num_rows()).sum();
@@ -1254,7 +1233,8 @@ mod tests {
.file_extension(file_extension),
)?;
let results =
- collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM
test").await?;
+ plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM
test")
+ .await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].num_rows(), 1);
@@ -1349,7 +1329,7 @@ mod tests {
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(plan).await?;
+ let result = collect(plan).await?;
let batch = &result[0];
assert_eq!(3, batch.num_columns());
@@ -1401,7 +1381,7 @@ mod tests {
MemTable::try_new(Arc::new(schema), vec![vec![batch1],
vec![batch2]])?;
ctx.register_table("t", Box::new(provider));
- let result = collect(&mut ctx, "SELECT AVG(a) FROM t").await?;
+ let result = plan_and_collect(&mut ctx, "SELECT AVG(a) FROM t").await?;
let batch = &result[0];
assert_eq!(1, batch.num_columns());
@@ -1449,7 +1429,7 @@ mod tests {
ctx.register_udaf(my_avg);
- let result = collect(&mut ctx, "SELECT MY_AVG(a) FROM t").await?;
+ let result = plan_and_collect(&mut ctx, "SELECT MY_AVG(a) FROM
t").await?;
let batch = &result[0];
assert_eq!(1, batch.num_columns());
@@ -1505,11 +1485,14 @@ mod tests {
}
/// Execute SQL and return results
- async fn collect(ctx: &mut ExecutionContext, sql: &str) ->
Result<Vec<RecordBatch>> {
+ async fn plan_and_collect(
+ ctx: &mut ExecutionContext,
+ sql: &str,
+ ) -> Result<Vec<RecordBatch>> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
- ctx.collect(physical_plan).await
+ collect(physical_plan).await
}
fn field_names(result: &RecordBatch) -> Vec<String> {
@@ -1525,7 +1508,7 @@ mod tests {
async fn execute(sql: &str, partition_count: usize) ->
Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;
- collect(&mut ctx, sql).await
+ plan_and_collect(&mut ctx, sql).await
}
/// Execute SQL and write results to partitioned csv files
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs
b/rust/datafusion/src/execution/dataframe_impl.rs
index ce5d876..395b923 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -19,13 +19,13 @@
use std::sync::Arc;
-use crate::arrow::record_batch::RecordBatch;
use crate::dataframe::*;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan,
LogicalPlanBuilder,
};
+use crate::{arrow::record_batch::RecordBatch, physical_plan::collect};
use async_trait::async_trait;
@@ -122,7 +122,7 @@ impl DataFrame for DataFrameImpl {
let ctx = ExecutionContext::from(self.ctx_state.clone());
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
- Ok(ctx.collect(plan).await?)
+ Ok(collect(plan).await?)
}
/// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/physical_plan/filter.rs
b/rust/datafusion/src/physical_plan/filter.rs
index 3607997..b24abc2 100644
--- a/rust/datafusion/src/physical_plan/filter.rs
+++ b/rust/datafusion/src/physical_plan/filter.rs
@@ -174,12 +174,12 @@ impl RecordBatchStream for FilterExecStream {
mod tests {
use super::*;
- use crate::logical_plan::Operator;
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::physical_plan::expressions::*;
use crate::physical_plan::ExecutionPlan;
use crate::scalar::ScalarValue;
use crate::test;
+ use crate::{logical_plan::Operator, physical_plan::collect};
use std::iter::Iterator;
#[tokio::test]
@@ -212,7 +212,7 @@ mod tests {
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
- let results = test::execute(filter).await?;
+ let results = collect(filter).await?;
results
.iter()
diff --git a/rust/datafusion/src/physical_plan/mod.rs
b/rust/datafusion/src/physical_plan/mod.rs
index b7cddbc..1a8d1cc 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -32,6 +32,8 @@ use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
use futures::stream::Stream;
+use self::merge::MergeExec;
+
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
/// Returns the schema of this `RecordBatchStream`.
@@ -84,6 +86,24 @@ pub trait ExecutionPlan: Debug + Send + Sync {
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream>;
}
+/// Execute the [ExecutionPlan] and collect the results in memory
+pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
+ match plan.output_partitioning().partition_count() {
+ 0 => Ok(vec![]),
+ 1 => {
+ let it = plan.execute(0).await?;
+ common::collect(it).await
+ }
+ _ => {
+ // merge into a single partition
+ let plan = MergeExec::new(plan.clone());
+ // MergeExec must produce a single partition
+ assert_eq!(1, plan.output_partitioning().partition_count());
+ common::collect(plan.execute(0).await?).await
+ }
+ }
+}
+
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
diff --git a/rust/datafusion/src/physical_plan/sort.rs
b/rust/datafusion/src/physical_plan/sort.rs
index 267fb7d..cb364c5 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -249,10 +249,13 @@ impl RecordBatchStream for SortStream {
#[cfg(test)]
mod tests {
use super::*;
- use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::merge::MergeExec;
+ use crate::physical_plan::{
+ collect,
+ csv::{CsvExec, CsvReadOptions},
+ };
use crate::test;
use arrow::array::*;
use arrow::datatypes::*;
@@ -287,7 +290,7 @@ mod tests {
2,
)?);
- let result: Vec<RecordBatch> = test::execute(sort_exec).await?;
+ let result: Vec<RecordBatch> = collect(sort_exec).await?;
assert_eq!(result.len(), 1);
let columns = result[0].columns();
@@ -365,7 +368,7 @@ mod tests {
assert_eq!(DataType::Float32,
*sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64,
*sort_exec.schema().field(1).data_type());
- let result: Vec<RecordBatch> = test::execute(sort_exec).await?;
+ let result: Vec<RecordBatch> = collect(sort_exec).await?;
assert_eq!(result.len(), 1);
let columns = result[0].columns();
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 42bdf37..fb7dbdb 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -19,9 +19,7 @@
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
-use crate::execution::context::ExecutionContext;
use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
-use crate::physical_plan::ExecutionPlan;
use arrow::array::{self, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
@@ -54,12 +52,6 @@ pub fn arrow_testdata_path() -> String {
env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined")
}
-/// Execute a physical plan and collect the results
-pub async fn execute(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
- let ctx = ExecutionContext::new();
- ctx.collect(plan).await
-}
-
/// Generated partitioned copy of a CSV file
pub fn create_partitioned_csv(filename: &str, partitions: usize) ->
Result<String> {
let testdata = arrow_testdata_path();
diff --git a/rust/datafusion/tests/dataframe.rs
b/rust/datafusion/tests/dataframe.rs
index 472de2d..af6aeda 100644
--- a/rust/datafusion/tests/dataframe.rs
+++ b/rust/datafusion/tests/dataframe.rs
@@ -21,8 +21,8 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::Statistics;
-use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
+use datafusion::{datasource::TableProvider, physical_plan::collect};
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
@@ -186,7 +186,7 @@ async fn custom_source_dataframe() -> Result<()> {
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
- let batches = ctx.collect(physical_plan).await?;
+ let batches = collect(physical_plan).await?;
let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 1ba8c04..84c90a5 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -29,11 +29,14 @@ use arrow::{
util::display::array_value_to_string,
};
-use datafusion::datasource::{csv::CsvReadOptions, MemTable};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{LogicalPlan, ToDFSchema};
use datafusion::prelude::create_udf;
+use datafusion::{
+ datasource::{csv::CsvReadOptions, MemTable},
+ physical_plan::collect,
+};
#[tokio::test]
async fn nyc() -> Result<()> {
@@ -122,7 +125,7 @@ async fn parquet_single_nan_schema() {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
- let results = ctx.collect(plan).await.unwrap();
+ let results = collect(plan).await.unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
assert_eq!(1, batch.num_columns());
@@ -156,7 +159,7 @@ async fn parquet_list_columns() {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
- let results = ctx.collect(plan).await.unwrap();
+ let results = collect(plan).await.unwrap();
// int64_list utf8_list
// 0 [1, 2, 3] [abc, efg, hij]
@@ -535,7 +538,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
- let results = ctx.collect(plan).await.unwrap();
+ let results = collect(plan).await.unwrap();
let batch = &results[0];
let column = batch.column(0);
let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
@@ -1347,7 +1350,7 @@ async fn execute(ctx: &mut ExecutionContext, sql: &str)
-> Vec<Vec<String>> {
let physical_schema = plan.schema();
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
- let results = ctx.collect(plan).await.expect(&msg);
+ let results = collect(plan).await.expect(&msg);
assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
assert_eq!(