This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a22ba9  ARROW-10859: [Rust] [DataFusion] Made collect not require 
ExecutionContext
7a22ba9 is described below

commit 7a22ba9f4e04041e9be38824b405b3431d800294
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Sun Dec 13 07:41:34 2020 -0500

    ARROW-10859: [Rust] [DataFusion] Made collect not require ExecutionContext
    
    This PR observes that `ExecutionContext::collect(&self, plan: Arc<dyn 
ExecutionPlan>)` does not use `self` on its implementation.
    
    Using this observation, it refactors out `collect` out of 
`ExecutionContext` (into `physical_plan::collect`), thereby simplifying the 
execution of the plans, by not requiring creating an `ExecutionContext` just to 
execute a physical plan.
    
    From a design's perspective, this makes it obvious that the execution of a 
physical plan is entirely independent of a context and its state.
    
    Closes #8874 from jorgecarleitao/simplify_collect
    
    Authored-by: Jorge C. Leitao <[email protected]>
    Signed-off-by: Andrew Lamb <[email protected]>
---
 rust/benchmarks/src/bin/nyctaxi.rs              |  3 +-
 rust/benchmarks/src/bin/tpch.rs                 |  3 +-
 rust/datafusion/examples/flight_server.rs       |  8 +--
 rust/datafusion/src/execution/context.rs        | 75 ++++++++++---------------
 rust/datafusion/src/execution/dataframe_impl.rs |  4 +-
 rust/datafusion/src/physical_plan/filter.rs     |  4 +-
 rust/datafusion/src/physical_plan/mod.rs        | 20 +++++++
 rust/datafusion/src/physical_plan/sort.rs       |  9 ++-
 rust/datafusion/src/test/mod.rs                 |  8 ---
 rust/datafusion/tests/dataframe.rs              |  4 +-
 rust/datafusion/tests/sql.rs                    | 13 +++--
 11 files changed, 76 insertions(+), 75 deletions(-)

diff --git a/rust/benchmarks/src/bin/nyctaxi.rs 
b/rust/benchmarks/src/bin/nyctaxi.rs
index 1ffa684..a3cac44 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -27,6 +27,7 @@ use arrow::util::pretty;
 use datafusion::error::Result;
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 
+use datafusion::physical_plan::collect;
 use datafusion::physical_plan::csv::CsvReadOptions;
 use structopt::StructOpt;
 
@@ -116,7 +117,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, 
debug: bool) -> Resu
         println!("Optimized logical plan:\n{:?}", plan);
     }
     let physical_plan = ctx.create_physical_plan(&plan)?;
-    let result = ctx.collect(physical_plan).await?;
+    let result = collect(physical_plan).await?;
     if debug {
         pretty::print_batches(&result)?;
     }
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index cd3d9d8..1d7f4cf 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -27,6 +27,7 @@ use datafusion::datasource::parquet::ParquetTable;
 use datafusion::datasource::{CsvFile, MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::collect;
 use datafusion::physical_plan::csv::CsvExec;
 use datafusion::prelude::*;
 
@@ -987,7 +988,7 @@ async fn execute_query(
         println!("Optimized logical plan:\n{:?}", plan);
     }
     let physical_plan = ctx.create_physical_plan(&plan)?;
-    let result = ctx.collect(physical_plan).await?;
+    let result = collect(physical_plan).await?;
     if debug {
         pretty::print_batches(&result)?;
     }
diff --git a/rust/datafusion/examples/flight_server.rs 
b/rust/datafusion/examples/flight_server.rs
index d835ab0..549c005 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -21,9 +21,9 @@ use futures::Stream;
 use tonic::transport::Server;
 use tonic::{Request, Response, Status, Streaming};
 
-use datafusion::datasource::parquet::ParquetTable;
 use datafusion::datasource::TableProvider;
 use datafusion::prelude::*;
+use datafusion::{datasource::parquet::ParquetTable, physical_plan::collect};
 
 use arrow_flight::{
     flight_service_server::FlightService, 
flight_service_server::FlightServiceServer,
@@ -105,10 +105,8 @@ impl FlightService for FlightServiceImpl {
                     .map_err(|e| to_tonic_err(&e))?;
 
                 // execute the query
-                let results = ctx
-                    .collect(plan.clone())
-                    .await
-                    .map_err(|e| to_tonic_err(&e))?;
+                let results =
+                    collect(plan.clone()).await.map_err(|e| to_tonic_err(&e))?;
                 if results.is_empty() {
                     return Err(Status::internal("There were no results from 
ticket"));
                 }
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index d74c7db..d01bf6f 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -27,7 +27,6 @@ use futures::{StreamExt, TryStreamExt};
 
 use arrow::csv;
 use arrow::datatypes::*;
-use arrow::record_batch::RecordBatch;
 
 use crate::datasource::csv::CsvFile;
 use crate::datasource::parquet::ParquetTable;
@@ -40,9 +39,7 @@ use crate::logical_plan::{
 use crate::optimizer::filter_push_down::FilterPushDown;
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
-use crate::physical_plan::common;
 use crate::physical_plan::csv::CsvReadOptions;
-use crate::physical_plan::merge::MergeExec;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udf::ScalarUDF;
 use crate::physical_plan::ExecutionPlan;
@@ -321,27 +318,6 @@ impl ExecutionContext {
             .create_physical_plan(logical_plan, &self.state)
     }
 
-    /// Execute a physical plan and collect the results in memory
-    pub async fn collect(
-        &self,
-        plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Vec<RecordBatch>> {
-        match plan.output_partitioning().partition_count() {
-            0 => Ok(vec![]),
-            1 => {
-                let it = plan.execute(0).await?;
-                common::collect(it).await
-            }
-            _ => {
-                // merge into a single partition
-                let plan = MergeExec::new(plan.clone());
-                // MergeExec must produce a single partition
-                assert_eq!(1, plan.output_partitioning().partition_count());
-                common::collect(plan.execute(0).await?).await
-            }
-        }
-    }
-
     /// Execute a query and write the results to a partitioned CSV file
     pub async fn write_csv(
         &self,
@@ -554,6 +530,7 @@ mod tests {
 
     use super::*;
     use crate::logical_plan::{col, create_udf, sum};
+    use crate::physical_plan::collect;
     use crate::physical_plan::functions::ScalarFunctionImplementation;
     use crate::test;
     use crate::variable::VarType;
@@ -563,6 +540,7 @@ mod tests {
     };
     use arrow::array::{ArrayRef, Float64Array, Int32Array, StringArray};
     use arrow::compute::add;
+    use arrow::record_batch::RecordBatch;
     use std::fs::File;
     use std::thread::{self, JoinHandle};
     use std::{io::prelude::*, sync::Mutex};
@@ -602,7 +580,8 @@ mod tests {
         let provider = test::create_table_dual();
         ctx.register_table("dual", provider);
 
-        let results = collect(&mut ctx, "SELECT @@version, @name FROM 
dual").await?;
+        let results =
+            plan_and_collect(&mut ctx, "SELECT @@version, @name FROM 
dual").await?;
 
         let batch = &results[0];
         assert_eq!(2, batch.num_columns());
@@ -638,7 +617,7 @@ mod tests {
 
         let physical_plan = ctx.create_physical_plan(&logical_plan)?;
 
-        let results = ctx.collect(physical_plan).await?;
+        let results = collect(physical_plan).await?;
 
         // there should be one batch per partition
         assert_eq!(results.len(), partition_count);
@@ -685,7 +664,7 @@ mod tests {
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
 
-        let batches = ctx.collect(physical_plan).await?;
+        let batches = collect(physical_plan).await?;
         assert_eq!(4, batches.len());
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(10, batches[0].num_rows());
@@ -763,7 +742,7 @@ mod tests {
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("b", physical_plan.schema().field(0).name().as_str());
 
-        let batches = ctx.collect(physical_plan).await?;
+        let batches = collect(physical_plan).await?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(4, batches[0].num_rows());
@@ -1032,7 +1011,7 @@ mod tests {
             CsvReadOptions::new().schema(&schema).has_header(false),
         )?;
 
-        let results = collect(
+        let results = plan_and_collect(
             &mut ctx,
             "
               SELECT
@@ -1175,11 +1154,11 @@ mod tests {
         ctx.register_csv("part3", &format!("{}/part-3.csv", out_dir), 
csv_read_option)?;
         ctx.register_csv("allparts", &out_dir, csv_read_option)?;
 
-        let part0 = collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
-        let part1 = collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
-        let part2 = collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
-        let part3 = collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
-        let allparts = collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;
+        let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part0").await?;
+        let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part1").await?;
+        let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part2").await?;
+        let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part3").await?;
+        let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
allparts").await?;
 
         let part0_count: usize = part0.iter().map(|batch| 
batch.num_rows()).sum();
         let part1_count: usize = part1.iter().map(|batch| 
batch.num_rows()).sum();
@@ -1216,11 +1195,11 @@ mod tests {
         ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))?;
         ctx.register_parquet("allparts", &out_dir)?;
 
-        let part0 = collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
-        let part1 = collect(&mut ctx, "SELECT c1, c2 FROM part1").await?;
-        let part2 = collect(&mut ctx, "SELECT c1, c2 FROM part2").await?;
-        let part3 = collect(&mut ctx, "SELECT c1, c2 FROM part3").await?;
-        let allparts = collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;
+        let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part0").await?;
+        let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part1").await?;
+        let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part2").await?;
+        let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part3").await?;
+        let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
allparts").await?;
 
         let part0_count: usize = part0.iter().map(|batch| 
batch.num_rows()).sum();
         let part1_count: usize = part1.iter().map(|batch| 
batch.num_rows()).sum();
@@ -1254,7 +1233,8 @@ mod tests {
                 .file_extension(file_extension),
         )?;
         let results =
-            collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM 
test").await?;
+            plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM 
test")
+                .await?;
 
         assert_eq!(results.len(), 1);
         assert_eq!(results[0].num_rows(), 1);
@@ -1349,7 +1329,7 @@ mod tests {
 
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan)?;
-        let result = ctx.collect(plan).await?;
+        let result = collect(plan).await?;
 
         let batch = &result[0];
         assert_eq!(3, batch.num_columns());
@@ -1401,7 +1381,7 @@ mod tests {
             MemTable::try_new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
         ctx.register_table("t", Box::new(provider));
 
-        let result = collect(&mut ctx, "SELECT AVG(a) FROM t").await?;
+        let result = plan_and_collect(&mut ctx, "SELECT AVG(a) FROM t").await?;
 
         let batch = &result[0];
         assert_eq!(1, batch.num_columns());
@@ -1449,7 +1429,7 @@ mod tests {
 
         ctx.register_udaf(my_avg);
 
-        let result = collect(&mut ctx, "SELECT MY_AVG(a) FROM t").await?;
+        let result = plan_and_collect(&mut ctx, "SELECT MY_AVG(a) FROM 
t").await?;
 
         let batch = &result[0];
         assert_eq!(1, batch.num_columns());
@@ -1505,11 +1485,14 @@ mod tests {
     }
 
     /// Execute SQL and return results
-    async fn collect(ctx: &mut ExecutionContext, sql: &str) -> 
Result<Vec<RecordBatch>> {
+    async fn plan_and_collect(
+        ctx: &mut ExecutionContext,
+        sql: &str,
+    ) -> Result<Vec<RecordBatch>> {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
         let physical_plan = ctx.create_physical_plan(&logical_plan)?;
-        ctx.collect(physical_plan).await
+        collect(physical_plan).await
     }
 
     fn field_names(result: &RecordBatch) -> Vec<String> {
@@ -1525,7 +1508,7 @@ mod tests {
     async fn execute(sql: &str, partition_count: usize) -> 
Result<Vec<RecordBatch>> {
         let tmp_dir = TempDir::new()?;
         let mut ctx = create_ctx(&tmp_dir, partition_count)?;
-        collect(&mut ctx, sql).await
+        plan_and_collect(&mut ctx, sql).await
     }
 
     /// Execute SQL and write results to partitioned csv files
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs 
b/rust/datafusion/src/execution/dataframe_impl.rs
index ce5d876..395b923 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -19,13 +19,13 @@
 
 use std::sync::Arc;
 
-use crate::arrow::record_batch::RecordBatch;
 use crate::dataframe::*;
 use crate::error::Result;
 use crate::execution::context::{ExecutionContext, ExecutionContextState};
 use crate::logical_plan::{
     col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, 
LogicalPlanBuilder,
 };
+use crate::{arrow::record_batch::RecordBatch, physical_plan::collect};
 
 use async_trait::async_trait;
 
@@ -122,7 +122,7 @@ impl DataFrame for DataFrameImpl {
         let ctx = ExecutionContext::from(self.ctx_state.clone());
         let plan = ctx.optimize(&self.plan)?;
         let plan = ctx.create_physical_plan(&plan)?;
-        Ok(ctx.collect(plan).await?)
+        Ok(collect(plan).await?)
     }
 
     /// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/physical_plan/filter.rs 
b/rust/datafusion/src/physical_plan/filter.rs
index 3607997..b24abc2 100644
--- a/rust/datafusion/src/physical_plan/filter.rs
+++ b/rust/datafusion/src/physical_plan/filter.rs
@@ -174,12 +174,12 @@ impl RecordBatchStream for FilterExecStream {
 mod tests {
 
     use super::*;
-    use crate::logical_plan::Operator;
     use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
     use crate::physical_plan::expressions::*;
     use crate::physical_plan::ExecutionPlan;
     use crate::scalar::ScalarValue;
     use crate::test;
+    use crate::{logical_plan::Operator, physical_plan::collect};
     use std::iter::Iterator;
 
     #[tokio::test]
@@ -212,7 +212,7 @@ mod tests {
         let filter: Arc<dyn ExecutionPlan> =
             Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
 
-        let results = test::execute(filter).await?;
+        let results = collect(filter).await?;
 
         results
             .iter()
diff --git a/rust/datafusion/src/physical_plan/mod.rs 
b/rust/datafusion/src/physical_plan/mod.rs
index b7cddbc..1a8d1cc 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -32,6 +32,8 @@ use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
 use futures::stream::Stream;
 
+use self::merge::MergeExec;
+
 /// Trait for types that stream [arrow::record_batch::RecordBatch]
 pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
     /// Returns the schema of this `RecordBatchStream`.
@@ -84,6 +86,24 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream>;
 }
 
+/// Execute the [ExecutionPlan] and collect the results in memory
+pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> 
{
+    match plan.output_partitioning().partition_count() {
+        0 => Ok(vec![]),
+        1 => {
+            let it = plan.execute(0).await?;
+            common::collect(it).await
+        }
+        _ => {
+            // merge into a single partition
+            let plan = MergeExec::new(plan.clone());
+            // MergeExec must produce a single partition
+            assert_eq!(1, plan.output_partitioning().partition_count());
+            common::collect(plan.execute(0).await?).await
+        }
+    }
+}
+
 /// Partitioning schemes supported by operators.
 #[derive(Debug, Clone)]
 pub enum Partitioning {
diff --git a/rust/datafusion/src/physical_plan/sort.rs 
b/rust/datafusion/src/physical_plan/sort.rs
index 267fb7d..cb364c5 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -249,10 +249,13 @@ impl RecordBatchStream for SortStream {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::{
+        collect,
+        csv::{CsvExec, CsvReadOptions},
+    };
     use crate::test;
     use arrow::array::*;
     use arrow::datatypes::*;
@@ -287,7 +290,7 @@ mod tests {
             2,
         )?);
 
-        let result: Vec<RecordBatch> = test::execute(sort_exec).await?;
+        let result: Vec<RecordBatch> = collect(sort_exec).await?;
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();
@@ -365,7 +368,7 @@ mod tests {
         assert_eq!(DataType::Float32, 
*sort_exec.schema().field(0).data_type());
         assert_eq!(DataType::Float64, 
*sort_exec.schema().field(1).data_type());
 
-        let result: Vec<RecordBatch> = test::execute(sort_exec).await?;
+        let result: Vec<RecordBatch> = collect(sort_exec).await?;
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 42bdf37..fb7dbdb 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -19,9 +19,7 @@
 
 use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
-use crate::execution::context::ExecutionContext;
 use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
-use crate::physical_plan::ExecutionPlan;
 use arrow::array::{self, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
@@ -54,12 +52,6 @@ pub fn arrow_testdata_path() -> String {
     env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined")
 }
 
-/// Execute a physical plan and collect the results
-pub async fn execute(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> 
{
-    let ctx = ExecutionContext::new();
-    ctx.collect(plan).await
-}
-
 /// Generated partitioned copy of a CSV file
 pub fn create_partitioned_csv(filename: &str, partitions: usize) -> 
Result<String> {
     let testdata = arrow_testdata_path();
diff --git a/rust/datafusion/tests/dataframe.rs 
b/rust/datafusion/tests/dataframe.rs
index 472de2d..af6aeda 100644
--- a/rust/datafusion/tests/dataframe.rs
+++ b/rust/datafusion/tests/dataframe.rs
@@ -21,8 +21,8 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
 use datafusion::datasource::datasource::Statistics;
-use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
+use datafusion::{datasource::TableProvider, physical_plan::collect};
 
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
@@ -186,7 +186,7 @@ async fn custom_source_dataframe() -> Result<()> {
     assert_eq!(1, physical_plan.schema().fields().len());
     assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
 
-    let batches = ctx.collect(physical_plan).await?;
+    let batches = collect(physical_plan).await?;
     let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
     assert_eq!(1, batches.len());
     assert_eq!(1, batches[0].num_columns());
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 1ba8c04..84c90a5 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -29,11 +29,14 @@ use arrow::{
     util::display::array_value_to_string,
 };
 
-use datafusion::datasource::{csv::CsvReadOptions, MemTable};
 use datafusion::error::Result;
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{LogicalPlan, ToDFSchema};
 use datafusion::prelude::create_udf;
+use datafusion::{
+    datasource::{csv::CsvReadOptions, MemTable},
+    physical_plan::collect,
+};
 
 #[tokio::test]
 async fn nyc() -> Result<()> {
@@ -122,7 +125,7 @@ async fn parquet_single_nan_schema() {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let plan = ctx.create_physical_plan(&plan).unwrap();
-    let results = ctx.collect(plan).await.unwrap();
+    let results = collect(plan).await.unwrap();
     for batch in results {
         assert_eq!(1, batch.num_rows());
         assert_eq!(1, batch.num_columns());
@@ -156,7 +159,7 @@ async fn parquet_list_columns() {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let plan = ctx.create_physical_plan(&plan).unwrap();
-    let results = ctx.collect(plan).await.unwrap();
+    let results = collect(plan).await.unwrap();
 
     //   int64_list              utf8_list
     // 0  [1, 2, 3]        [abc, efg, hij]
@@ -535,7 +538,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let plan = ctx.create_physical_plan(&plan).unwrap();
-    let results = ctx.collect(plan).await.unwrap();
+    let results = collect(plan).await.unwrap();
     let batch = &results[0];
     let column = batch.column(0);
     let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
@@ -1347,7 +1350,7 @@ async fn execute(ctx: &mut ExecutionContext, sql: &str) 
-> Vec<Vec<String>> {
     let physical_schema = plan.schema();
 
     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
-    let results = ctx.collect(plan).await.expect(&msg);
+    let results = collect(plan).await.expect(&msg);
 
     assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
     assert_eq!(

Reply via email to