This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e32d1  Add Ballista support to DataFusion CLI (#889)
d4e32d1 is described below

commit d4e32d10e47ba0b1bbb6eddddced662ec4d335af
Author: Andy Grove <[email protected]>
AuthorDate: Mon Aug 16 07:24:08 2021 -0600

    Add Ballista support to DataFusion CLI (#889)
---
 ballista/rust/client/src/context.rs | 104 +++++++++++++++++++++++++-----------
 ballista/rust/core/src/utils.rs     |  23 +++++---
 datafusion-cli/Cargo.toml           |   5 +-
 datafusion-cli/README.md            |  74 +++++++++++++++++++++++++
 datafusion-cli/src/lib.rs           |   5 ++
 datafusion-cli/src/main.rs          |  88 +++++++++++++++++++++++-------
 6 files changed, 240 insertions(+), 59 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 162cd68..ee2f656 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -23,13 +23,17 @@ use std::path::PathBuf;
 use std::sync::{Arc, Mutex};
 
 use ballista_core::config::BallistaConfig;
-use ballista_core::{datasource::DfTableAdapter, 
utils::create_datafusion_context};
+use ballista_core::{
+    datasource::DfTableAdapter, 
utils::create_df_ctx_with_ballista_query_planner,
+};
 
 use datafusion::catalog::TableReference;
 use datafusion::dataframe::DataFrame;
-use datafusion::error::Result;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::dataframe_impl::DataFrameImpl;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::csv::CsvReadOptions;
+use datafusion::sql::parser::FileType;
 
 struct BallistaContextState {
     /// Ballista configuration
@@ -129,12 +133,14 @@ impl BallistaContext {
         let path = fs::canonicalize(&path)?;
 
         // use local DataFusion context for now but later this might call the 
scheduler
-        let guard = self.state.lock().unwrap();
-        let mut ctx = create_datafusion_context(
-            &guard.scheduler_host,
-            guard.scheduler_port,
-            guard.config(),
-        );
+        let mut ctx = {
+            let guard = self.state.lock().unwrap();
+            create_df_ctx_with_ballista_query_planner(
+                &guard.scheduler_host,
+                guard.scheduler_port,
+                guard.config(),
+            )
+        };
         let df = ctx.read_parquet(path.to_str().unwrap())?;
         Ok(df)
     }
@@ -151,12 +157,14 @@ impl BallistaContext {
         let path = fs::canonicalize(&path)?;
 
         // use local DataFusion context for now but later this might call the 
scheduler
-        let guard = self.state.lock().unwrap();
-        let mut ctx = create_datafusion_context(
-            &guard.scheduler_host,
-            guard.scheduler_port,
-            guard.config(),
-        );
+        let mut ctx = {
+            let guard = self.state.lock().unwrap();
+            create_df_ctx_with_ballista_query_planner(
+                &guard.scheduler_host,
+                guard.scheduler_port,
+                guard.config(),
+            )
+        };
         let df = ctx.read_csv(path.to_str().unwrap(), options)?;
         Ok(df)
     }
@@ -187,23 +195,59 @@ impl BallistaContext {
 
     /// Create a DataFrame from a SQL statement
     pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
-        // use local DataFusion context for now but later this might call the 
scheduler
-        // register tables
-        let state = self.state.lock().unwrap();
-        let mut ctx = create_datafusion_context(
-            &state.scheduler_host,
-            state.scheduler_port,
-            state.config(),
-        );
-        for (name, plan) in &state.tables {
-            let plan = ctx.optimize(plan)?;
-            let execution_plan = ctx.create_physical_plan(&plan)?;
-            ctx.register_table(
-                TableReference::Bare { table: name },
-                Arc::new(DfTableAdapter::new(plan, execution_plan)),
-            )?;
+        let mut ctx = {
+            let state = self.state.lock().unwrap();
+            create_df_ctx_with_ballista_query_planner(
+                &state.scheduler_host,
+                state.scheduler_port,
+                state.config(),
+            )
+        };
+
+        // register tables with DataFusion context
+        {
+            let state = self.state.lock().unwrap();
+            for (name, plan) in &state.tables {
+                let plan = ctx.optimize(plan)?;
+                let execution_plan = ctx.create_physical_plan(&plan)?;
+                ctx.register_table(
+                    TableReference::Bare { table: name },
+                    Arc::new(DfTableAdapter::new(plan, execution_plan)),
+                )?;
+            }
+        }
+
+        let plan = ctx.create_logical_plan(sql)?;
+        match plan {
+            LogicalPlan::CreateExternalTable {
+                ref schema,
+                ref name,
+                ref location,
+                ref file_type,
+                ref has_header,
+            } => match file_type {
+                FileType::CSV => {
+                    self.register_csv(
+                        name,
+                        location,
+                        CsvReadOptions::new()
+                            .schema(&schema.as_ref().to_owned().into())
+                            .has_header(*has_header),
+                    )?;
+                    Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+                }
+                FileType::Parquet => {
+                    self.register_parquet(name, location)?;
+                    Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+                }
+                _ => Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported file type {:?}.",
+                    file_type
+                ))),
+            },
+
+            _ => ctx.sql(sql),
         }
-        ctx.sql(sql)
     }
 }
 
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 4187faa..d753f70 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
 use crate::config::BallistaConfig;
+use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::{
     array::{
@@ -51,6 +52,7 @@ use 
datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::csv::CsvExec;
+use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
@@ -236,8 +238,9 @@ fn build_exec_plan_diagram(
     Ok(node_id)
 }
 
-/// Create a DataFusion context that is compatible with Ballista
-pub fn create_datafusion_context(
+/// Create a DataFusion context that uses the BallistaQueryPlanner to send 
logical plans
+/// to a Ballista scheduler
+pub fn create_df_ctx_with_ballista_query_planner(
     scheduler_host: &str,
     scheduler_port: u16,
     config: &BallistaConfig,
@@ -272,11 +275,17 @@ impl QueryPlanner for BallistaQueryPlanner {
         logical_plan: &LogicalPlan,
         _ctx_state: &ExecutionContextState,
     ) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
-        Ok(Arc::new(DistributedQueryExec::new(
-            self.scheduler_url.clone(),
-            self.config.clone(),
-            logical_plan.clone(),
-        )))
+        match logical_plan {
+            LogicalPlan::CreateExternalTable { .. } => {
+                // table state is managed locally in the BallistaContext, not 
in the scheduler
+                Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+            }
+            _ => Ok(Arc::new(DistributedQueryExec::new(
+                self.scheduler_url.clone(),
+                self.config.clone(),
+                logical_plan.clone(),
+            ))),
+        }
     }
 }
 
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index d2f3045..9d40276 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -20,7 +20,7 @@ name = "datafusion-cli"
 version = "4.0.0-SNAPSHOT"
 authors = ["Apache Arrow <[email protected]>"]
 edition = "2018"
-keywords = [ "arrow", "query", "sql", "cli", "repl" ]
+keywords = [ "arrow", "datafusion", "ballista", "query", "sql", "cli", "repl" ]
 license = "Apache-2.0"
 homepage = "https://github.com/apache/arrow-datafusion";
 repository = "https://github.com/apache/arrow-datafusion";
@@ -31,4 +31,5 @@ clap = "2.33"
 rustyline = "8.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 datafusion = { path = "../datafusion", version = "5.1.0" }
-arrow = { version = "5.0"  }
+ballista = { path = "../ballista/rust/client", version = "0.6.0" }
+arrow = { version = "5.0"  }
\ No newline at end of file
diff --git a/datafusion-cli/README.md b/datafusion-cli/README.md
new file mode 100644
index 0000000..6a4707e
--- /dev/null
+++ b/datafusion-cli/README.md
@@ -0,0 +1,74 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Command-line Interface
+
+The DataFusion CLI allows SQL queries to be executed by an in-process 
DataFusion context, or by a distributed
+Ballista context.
+
+```ignore
+USAGE:
+    datafusion-cli [FLAGS] [OPTIONS]
+
+FLAGS:
+    -h, --help       Prints help information
+    -q, --quiet      Reduce printing other than the results and work quietly
+    -V, --version    Prints version information
+
+OPTIONS:
+    -c, --batch-size <batch-size>    The batch size of each query, or use 
DataFusion default
+    -p, --data-path <data-path>      Path to your data, default to current 
directory
+    -f, --file <file>...             Execute commands from file(s), then exit
+        --format <format>            Output format [default: table]  [possible 
values: csv, tsv, table, json, ndjson]
+        --host <host>                Ballista scheduler host
+        --port <port>                Ballista scheduler port
+```
+
+## Example
+
+Create a CSV file to query.
+
+```bash,ignore
+$ echo "1,2" > data.csv
+```
+
+```sql,ignore
+$ datafusion-cli
+
+DataFusion CLI v4.0.0-SNAPSHOT
+
+> CREATE EXTERNAL TABLE foo (a INT, b INT) STORED AS CSV LOCATION 'data.csv';
+0 rows in set. Query took 0.001 seconds.
+
+> SELECT * FROM foo;
++---+---+
+| a | b |
++---+---+
+| 1 | 2 |
++---+---+
+1 row in set. Query took 0.017 seconds.
+```
+
+## Ballista
+
+The DataFusion CLI can connect to a Ballista scheduler for query execution.
+
+```bash,ignore
+datafusion-cli --host localhost --port 50050
+```
\ No newline at end of file
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 5b110d3..74b91ac 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -14,6 +14,11 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
+#![doc = include_str!("../README.md")]
+#![allow(unused_imports)]
+pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
+
 pub mod print_format;
 
 use datafusion::arrow::record_batch::RecordBatch;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 7742051..4a45888 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -17,14 +17,6 @@
 
 #![allow(bare_trait_objects)]
 
-use clap::{crate_version, App, Arg};
-use datafusion::error::Result;
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion_cli::{
-    print_format::{all_print_formats, PrintFormat},
-    PrintOptions,
-};
-use rustyline::Editor;
 use std::env;
 use std::fs::File;
 use std::io::prelude::*;
@@ -32,8 +24,27 @@ use std::io::BufReader;
 use std::path::Path;
 use std::time::Instant;
 
+use ballista::context::BallistaContext;
+use ballista::prelude::BallistaConfig;
+use clap::{crate_version, App, Arg};
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion_cli::{
+    print_format::{all_print_formats, PrintFormat},
+    PrintOptions, DATAFUSION_CLI_VERSION,
+};
+use rustyline::Editor;
+
+/// The CLI supports using a local DataFusion context or a distributed 
BallistaContext
+enum Context {
+    /// In-process execution with DataFusion
+    Local(ExecutionContext),
+    /// Distributed execution with Ballista
+    Remote(BallistaContext),
+}
+
 #[tokio::main]
-pub async fn main() {
+pub async fn main() -> Result<()> {
     let matches = App::new("DataFusion")
         .version(crate_version!())
         .about(
@@ -83,6 +94,18 @@ pub async fn main() {
                 .takes_value(true),
         )
         .arg(
+            Arg::with_name("host")
+                .help("Ballista scheduler host")
+                .long("host")
+                .takes_value(true),
+        )
+        .arg(
+            Arg::with_name("port")
+                .help("Ballista scheduler port")
+                .long("port")
+                .takes_value(true),
+        )
+        .arg(
             Arg::with_name("quiet")
                 .help("Reduce printing other than the results and work 
quietly")
                 .short("q")
@@ -91,6 +114,17 @@ pub async fn main() {
         )
         .get_matches();
 
+    let quiet = matches.is_present("quiet");
+
+    if !quiet {
+        println!("DataFusion CLI v{}\n", DATAFUSION_CLI_VERSION);
+    }
+
+    let host = matches.value_of("host");
+    let port = matches
+        .value_of("port")
+        .and_then(|port| port.parse::<u16>().ok());
+
     if let Some(path) = matches.value_of("data-path") {
         let p = Path::new(path);
         env::set_current_dir(&p).unwrap();
@@ -105,31 +139,43 @@ pub async fn main() {
         execution_config = execution_config.with_batch_size(batch_size);
     };
 
+    let ctx: Result<Context> = match (host, port) {
+        (Some(h), Some(p)) => {
+            let config: BallistaConfig = BallistaConfig::new()
+                .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+            Ok(Context::Remote(BallistaContext::remote(h, p, &config)))
+        }
+        _ => Ok(Context::Local(ExecutionContext::with_config(
+            execution_config.clone(),
+        ))),
+    };
+    let mut ctx = ctx?;
+
     let format = matches
         .value_of("format")
         .expect("No format is specified")
         .parse::<PrintFormat>()
         .expect("Invalid format");
 
-    let quiet = matches.is_present("quiet");
     let print_options = PrintOptions { format, quiet };
 
     if let Some(file_paths) = matches.values_of("file") {
         let files = file_paths
             .map(|file_path| File::open(file_path).unwrap())
             .collect::<Vec<_>>();
-        let mut ctx = ExecutionContext::with_config(execution_config);
         for file in files {
             let mut reader = BufReader::new(file);
             exec_from_lines(&mut ctx, &mut reader, 
print_options.clone()).await;
         }
     } else {
-        exec_from_repl(execution_config, print_options).await;
+        exec_from_repl(&mut ctx, print_options).await;
     }
+
+    Ok(())
 }
 
 async fn exec_from_lines(
-    ctx: &mut ExecutionContext,
+    ctx: &mut Context,
     reader: &mut BufReader<File>,
     print_options: PrintOptions,
 ) {
@@ -168,9 +214,7 @@ async fn exec_from_lines(
     }
 }
 
-async fn exec_from_repl(execution_config: ExecutionConfig, print_options: 
PrintOptions) {
-    let mut ctx = ExecutionContext::with_config(execution_config);
-
+async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
     let mut rl = Editor::<()>::new();
     rl.load_history(".history").ok();
 
@@ -186,7 +230,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig, 
print_options: PrintO
             Ok(ref line) if line.trim_end().ends_with(';') => {
                 query.push_str(line.trim_end());
                 rl.add_history_entry(query.clone());
-                match exec_and_print(&mut ctx, print_options.clone(), 
query).await {
+                match exec_and_print(ctx, print_options.clone(), query).await {
                     Ok(_) => {}
                     Err(err) => println!("{:?}", err),
                 }
@@ -234,15 +278,19 @@ fn is_exit_command(line: &str) -> bool {
 }
 
 async fn exec_and_print(
-    ctx: &mut ExecutionContext,
+    ctx: &mut Context,
     print_options: PrintOptions,
     sql: String,
 ) -> Result<()> {
     let now = Instant::now();
 
-    let df = ctx.sql(&sql)?;
-    let results = df.collect().await?;
+    let df = match ctx {
+        Context::Local(datafusion) => datafusion.sql(&sql)?,
+        Context::Remote(ballista) => ballista.sql(&sql)?,
+    };
 
+    let results = df.collect().await?;
     print_options.print_batches(&results, now)?;
+
     Ok(())
 }

Reply via email to