This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e8fcd4  ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now 
returns DataFrame
2e8fcd4 is described below

commit 2e8fcd418229c8dcd86cd60952d8fef692ddc742
Author: Andy Grove <[email protected]>
AuthorDate: Sat Aug 22 13:25:48 2020 -0600

    ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now returns DataFrame
    
    I need this change so that I can have Ballista use the DataFusion DataFrame 
trait and start testing the extension points for the physical planner.
    
    Closes #8027 from andygrove/ARROW-9762
    
    Authored-by: Andy Grove <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/benches/aggregate_query_sql.rs  |  3 ++-
 rust/datafusion/examples/csv_sql.rs             |  3 ++-
 rust/datafusion/examples/parquet_sql.rs         |  3 ++-
 rust/datafusion/src/bin/repl.rs                 |  3 ++-
 rust/datafusion/src/execution/context.rs        | 24 ++++++++----------------
 rust/datafusion/src/execution/dataframe_impl.rs |  8 ++++++--
 6 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/rust/datafusion/benches/aggregate_query_sql.rs 
b/rust/datafusion/benches/aggregate_query_sql.rs
index b42e7fc..d4a82c8 100644
--- a/rust/datafusion/benches/aggregate_query_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -32,7 +32,8 @@ use datafusion::execution::context::ExecutionContext;
 
 fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     // execute the query
-    let results = ctx.sql(&sql).unwrap();
+    let df = ctx.sql(&sql).unwrap();
+    let results = df.collect().unwrap();
 
     // display the relation
     for _batch in results {}
diff --git a/rust/datafusion/examples/csv_sql.rs 
b/rust/datafusion/examples/csv_sql.rs
index 771d99b..97085f8 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -36,12 +36,13 @@ fn main() -> Result<()> {
     )?;
 
     // execute the query
-    let results = ctx.sql(
+    let df = ctx.sql(
         "SELECT c1, MIN(c12), MAX(c12) \
         FROM aggregate_test_100 \
         WHERE c11 > 0.1 AND c11 < 0.9 \
         GROUP BY c1",
     )?;
+    let results = df.collect()?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs 
b/rust/datafusion/examples/parquet_sql.rs
index 6359023..cc9ab968 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/parquet_sql.rs
@@ -36,11 +36,12 @@ fn main() -> Result<()> {
     )?;
 
     // execute the query
-    let results = ctx.sql(
+    let df = ctx.sql(
         "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
         FROM alltypes_plain \
         WHERE id > 1 AND tinyint_col < double_col",
     )?;
+    let results = df.collect()?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
index 74d4320..d93401b 100644
--- a/rust/datafusion/src/bin/repl.rs
+++ b/rust/datafusion/src/bin/repl.rs
@@ -103,7 +103,8 @@ fn is_exit_command(line: &str) -> bool {
 fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
     let now = Instant::now();
 
-    let results = ctx.sql(&sql)?;
+    let df = ctx.sql(&sql)?;
+    let results = df.collect()?;
 
     if results.is_empty() {
         println!(
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 6a0b150..ca87c2e 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -133,15 +133,8 @@ impl ExecutionContext {
 
     /// Execute a SQL query and produce a Relation (a schema-aware iterator 
over a series
     /// of RecordBatch instances)
-    pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
+    pub fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
         let plan = self.create_logical_plan(sql)?;
-        return self.collect_plan(&plan);
-    }
-
-    /// Executes a logical plan and produce a Relation (a schema-aware 
iterator over a series
-    /// of RecordBatch instances). This function is intended for internal use 
and should not be
-    /// called directly.
-    pub fn collect_plan(&mut self, plan: &LogicalPlan) -> 
Result<Vec<RecordBatch>> {
         match plan {
             LogicalPlan::CreateExternalTable {
                 ref schema,
@@ -158,11 +151,13 @@ impl ExecutionContext {
                             .schema(&schema)
                             .has_header(*has_header),
                     )?;
-                    Ok(vec![])
+                    let plan = LogicalPlanBuilder::empty().build()?;
+                    Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
                 }
                 FileType::Parquet => {
                     self.register_parquet(name, location)?;
-                    Ok(vec![])
+                    let plan = LogicalPlanBuilder::empty().build()?;
+                    Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
                 }
                 _ => Err(ExecutionError::ExecutionError(format!(
                     "Unsupported file type {:?}.",
@@ -170,11 +165,7 @@ impl ExecutionContext {
                 ))),
             },
 
-            plan => {
-                let plan = self.optimize(&plan)?;
-                let plan = self.create_physical_plan(&plan)?;
-                Ok(self.collect(plan.as_ref())?)
-            }
+            plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), 
&plan))),
         }
     }
 
@@ -1095,7 +1086,8 @@ mod tests {
         let mut ctx = ExecutionContext::with_config(
             
ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
         );
-        ctx.sql("SELECT 1").expect_err("query not supported");
+        let df = ctx.sql("SELECT 1")?;
+        df.collect().expect_err("query not supported");
         Ok(())
     }
 
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs 
b/rust/datafusion/src/execution/dataframe_impl.rs
index 4698a84..d53fb38 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -104,9 +104,13 @@ impl DataFrame for DataFrameImpl {
         self.plan.clone()
     }
 
+    // Convert the logical plan represented by this DataFrame into a physical 
plan and
+    // execute it
     fn collect(&self) -> Result<Vec<RecordBatch>> {
-        let mut ctx = ExecutionContext::from(self.ctx_state.clone());
-        ctx.collect_plan(&self.plan.clone())
+        let ctx = ExecutionContext::from(self.ctx_state.clone());
+        let plan = ctx.optimize(&self.plan)?;
+        let plan = ctx.create_physical_plan(&plan)?;
+        Ok(ctx.collect(plan.as_ref())?)
     }
 
     /// Returns the schema from the logical plan

Reply via email to