This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d4e32d1 Add Ballista support to DataFusion CLI (#889)
d4e32d1 is described below
commit d4e32d10e47ba0b1bbb6eddddced662ec4d335af
Author: Andy Grove <[email protected]>
AuthorDate: Mon Aug 16 07:24:08 2021 -0600
Add Ballista support to DataFusion CLI (#889)
---
ballista/rust/client/src/context.rs | 104 +++++++++++++++++++++++++-----------
ballista/rust/core/src/utils.rs | 23 +++++---
datafusion-cli/Cargo.toml | 5 +-
datafusion-cli/README.md | 74 +++++++++++++++++++++++++
datafusion-cli/src/lib.rs | 5 ++
datafusion-cli/src/main.rs | 88 +++++++++++++++++++++++-------
6 files changed, 240 insertions(+), 59 deletions(-)
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 162cd68..ee2f656 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -23,13 +23,17 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use ballista_core::config::BallistaConfig;
-use ballista_core::{datasource::DfTableAdapter,
utils::create_datafusion_context};
+use ballista_core::{
+ datasource::DfTableAdapter,
utils::create_df_ctx_with_ballista_query_planner,
+};
use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
-use datafusion::error::Result;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::csv::CsvReadOptions;
+use datafusion::sql::parser::FileType;
struct BallistaContextState {
/// Ballista configuration
@@ -129,12 +133,14 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;
// use local DataFusion context for now but later this might call the
scheduler
- let guard = self.state.lock().unwrap();
- let mut ctx = create_datafusion_context(
- &guard.scheduler_host,
- guard.scheduler_port,
- guard.config(),
- );
+ let mut ctx = {
+ let guard = self.state.lock().unwrap();
+ create_df_ctx_with_ballista_query_planner(
+ &guard.scheduler_host,
+ guard.scheduler_port,
+ guard.config(),
+ )
+ };
let df = ctx.read_parquet(path.to_str().unwrap())?;
Ok(df)
}
@@ -151,12 +157,14 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;
// use local DataFusion context for now but later this might call the
scheduler
- let guard = self.state.lock().unwrap();
- let mut ctx = create_datafusion_context(
- &guard.scheduler_host,
- guard.scheduler_port,
- guard.config(),
- );
+ let mut ctx = {
+ let guard = self.state.lock().unwrap();
+ create_df_ctx_with_ballista_query_planner(
+ &guard.scheduler_host,
+ guard.scheduler_port,
+ guard.config(),
+ )
+ };
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
Ok(df)
}
@@ -187,23 +195,59 @@ impl BallistaContext {
/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
- // use local DataFusion context for now but later this might call the
scheduler
- // register tables
- let state = self.state.lock().unwrap();
- let mut ctx = create_datafusion_context(
- &state.scheduler_host,
- state.scheduler_port,
- state.config(),
- );
- for (name, plan) in &state.tables {
- let plan = ctx.optimize(plan)?;
- let execution_plan = ctx.create_physical_plan(&plan)?;
- ctx.register_table(
- TableReference::Bare { table: name },
- Arc::new(DfTableAdapter::new(plan, execution_plan)),
- )?;
+ let mut ctx = {
+ let state = self.state.lock().unwrap();
+ create_df_ctx_with_ballista_query_planner(
+ &state.scheduler_host,
+ state.scheduler_port,
+ state.config(),
+ )
+ };
+
+ // register tables with DataFusion context
+ {
+ let state = self.state.lock().unwrap();
+ for (name, plan) in &state.tables {
+ let plan = ctx.optimize(plan)?;
+ let execution_plan = ctx.create_physical_plan(&plan)?;
+ ctx.register_table(
+ TableReference::Bare { table: name },
+ Arc::new(DfTableAdapter::new(plan, execution_plan)),
+ )?;
+ }
+ }
+
+ let plan = ctx.create_logical_plan(sql)?;
+ match plan {
+ LogicalPlan::CreateExternalTable {
+ ref schema,
+ ref name,
+ ref location,
+ ref file_type,
+ ref has_header,
+ } => match file_type {
+ FileType::CSV => {
+ self.register_csv(
+ name,
+ location,
+ CsvReadOptions::new()
+ .schema(&schema.as_ref().to_owned().into())
+ .has_header(*has_header),
+ )?;
+ Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+ }
+ FileType::Parquet => {
+ self.register_parquet(name, location)?;
+ Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+ }
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported file type {:?}.",
+ file_type
+ ))),
+ },
+
+ _ => ctx.sql(sql),
}
- ctx.sql(sql)
}
}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 4187faa..d753f70 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
use crate::config::BallistaConfig;
+use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::{
@@ -51,6 +52,7 @@ use
datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::csv::CsvExec;
+use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
@@ -236,8 +238,9 @@ fn build_exec_plan_diagram(
Ok(node_id)
}
-/// Create a DataFusion context that is compatible with Ballista
-pub fn create_datafusion_context(
+/// Create a DataFusion context that uses the BallistaQueryPlanner to send
logical plans
+/// to a Ballista scheduler
+pub fn create_df_ctx_with_ballista_query_planner(
scheduler_host: &str,
scheduler_port: u16,
config: &BallistaConfig,
@@ -272,11 +275,17 @@ impl QueryPlanner for BallistaQueryPlanner {
logical_plan: &LogicalPlan,
_ctx_state: &ExecutionContextState,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
- Ok(Arc::new(DistributedQueryExec::new(
- self.scheduler_url.clone(),
- self.config.clone(),
- logical_plan.clone(),
- )))
+ match logical_plan {
+ LogicalPlan::CreateExternalTable { .. } => {
+ // table state is managed locally in the BallistaContext, not
in the scheduler
+ Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+ }
+ _ => Ok(Arc::new(DistributedQueryExec::new(
+ self.scheduler_url.clone(),
+ self.config.clone(),
+ logical_plan.clone(),
+ ))),
+ }
}
}
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index d2f3045..9d40276 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -20,7 +20,7 @@ name = "datafusion-cli"
version = "4.0.0-SNAPSHOT"
authors = ["Apache Arrow <[email protected]>"]
edition = "2018"
-keywords = [ "arrow", "query", "sql", "cli", "repl" ]
+keywords = [ "arrow", "datafusion", "ballista", "query", "sql", "cli", "repl" ]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
@@ -31,4 +31,5 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
datafusion = { path = "../datafusion", version = "5.1.0" }
-arrow = { version = "5.0" }
+ballista = { path = "../ballista/rust/client", version = "0.6.0" }
+arrow = { version = "5.0" }
\ No newline at end of file
diff --git a/datafusion-cli/README.md b/datafusion-cli/README.md
new file mode 100644
index 0000000..6a4707e
--- /dev/null
+++ b/datafusion-cli/README.md
@@ -0,0 +1,74 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Command-line Interface
+
+The DataFusion CLI allows SQL queries to be executed by an in-process
DataFusion context, or by a distributed
+Ballista context.
+
+```ignore
+USAGE:
+ datafusion-cli [FLAGS] [OPTIONS]
+
+FLAGS:
+ -h, --help Prints help information
+ -q, --quiet Reduce printing other than the results and work quietly
+ -V, --version Prints version information
+
+OPTIONS:
+ -c, --batch-size <batch-size> The batch size of each query, or use
DataFusion default
+ -p, --data-path <data-path> Path to your data, default to current
directory
+ -f, --file <file>... Execute commands from file(s), then exit
+ --format <format> Output format [default: table] [possible
values: csv, tsv, table, json, ndjson]
+ --host <host> Ballista scheduler host
+ --port <port> Ballista scheduler port
+```
+
+## Example
+
+Create a CSV file to query.
+
+```bash,ignore
+$ echo "1,2" > data.csv
+```
+
+```sql,ignore
+$ datafusion-cli
+
+DataFusion CLI v4.0.0-SNAPSHOT
+
+> CREATE EXTERNAL TABLE foo (a INT, b INT) STORED AS CSV LOCATION 'data.csv';
+0 rows in set. Query took 0.001 seconds.
+
+> SELECT * FROM foo;
++---+---+
+| a | b |
++---+---+
+| 1 | 2 |
++---+---+
+1 row in set. Query took 0.017 seconds.
+```
+
+## Ballista
+
+The DataFusion CLI can connect to a Ballista scheduler for query execution.
+
+```bash,ignore
+datafusion-cli --host localhost --port 50050
+```
\ No newline at end of file
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 5b110d3..74b91ac 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -14,6 +14,11 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
+#![doc = include_str!("../README.md")]
+#![allow(unused_imports)]
+pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
+
pub mod print_format;
use datafusion::arrow::record_batch::RecordBatch;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 7742051..4a45888 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -17,14 +17,6 @@
#![allow(bare_trait_objects)]
-use clap::{crate_version, App, Arg};
-use datafusion::error::Result;
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion_cli::{
- print_format::{all_print_formats, PrintFormat},
- PrintOptions,
-};
-use rustyline::Editor;
use std::env;
use std::fs::File;
use std::io::prelude::*;
@@ -32,8 +24,27 @@ use std::io::BufReader;
use std::path::Path;
use std::time::Instant;
+use ballista::context::BallistaContext;
+use ballista::prelude::BallistaConfig;
+use clap::{crate_version, App, Arg};
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion_cli::{
+ print_format::{all_print_formats, PrintFormat},
+ PrintOptions, DATAFUSION_CLI_VERSION,
+};
+use rustyline::Editor;
+
+/// The CLI supports using a local DataFusion context or a distributed
BallistaContext
+enum Context {
+ /// In-process execution with DataFusion
+ Local(ExecutionContext),
+ /// Distributed execution with Ballista
+ Remote(BallistaContext),
+}
+
#[tokio::main]
-pub async fn main() {
+pub async fn main() -> Result<()> {
let matches = App::new("DataFusion")
.version(crate_version!())
.about(
@@ -83,6 +94,18 @@ pub async fn main() {
.takes_value(true),
)
.arg(
+ Arg::with_name("host")
+ .help("Ballista scheduler host")
+ .long("host")
+ .takes_value(true),
+ )
+ .arg(
+ Arg::with_name("port")
+ .help("Ballista scheduler port")
+ .long("port")
+ .takes_value(true),
+ )
+ .arg(
Arg::with_name("quiet")
.help("Reduce printing other than the results and work
quietly")
.short("q")
@@ -91,6 +114,17 @@ pub async fn main() {
)
.get_matches();
+ let quiet = matches.is_present("quiet");
+
+ if !quiet {
+ println!("DataFusion CLI v{}\n", DATAFUSION_CLI_VERSION);
+ }
+
+ let host = matches.value_of("host");
+ let port = matches
+ .value_of("port")
+ .and_then(|port| port.parse::<u16>().ok());
+
if let Some(path) = matches.value_of("data-path") {
let p = Path::new(path);
env::set_current_dir(&p).unwrap();
@@ -105,31 +139,43 @@ pub async fn main() {
execution_config = execution_config.with_batch_size(batch_size);
};
+ let ctx: Result<Context> = match (host, port) {
+ (Some(h), Some(p)) => {
+ let config: BallistaConfig = BallistaConfig::new()
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ Ok(Context::Remote(BallistaContext::remote(h, p, &config)))
+ }
+ _ => Ok(Context::Local(ExecutionContext::with_config(
+ execution_config.clone(),
+ ))),
+ };
+ let mut ctx = ctx?;
+
let format = matches
.value_of("format")
.expect("No format is specified")
.parse::<PrintFormat>()
.expect("Invalid format");
- let quiet = matches.is_present("quiet");
let print_options = PrintOptions { format, quiet };
if let Some(file_paths) = matches.values_of("file") {
let files = file_paths
.map(|file_path| File::open(file_path).unwrap())
.collect::<Vec<_>>();
- let mut ctx = ExecutionContext::with_config(execution_config);
for file in files {
let mut reader = BufReader::new(file);
exec_from_lines(&mut ctx, &mut reader,
print_options.clone()).await;
}
} else {
- exec_from_repl(execution_config, print_options).await;
+ exec_from_repl(&mut ctx, print_options).await;
}
+
+ Ok(())
}
async fn exec_from_lines(
- ctx: &mut ExecutionContext,
+ ctx: &mut Context,
reader: &mut BufReader<File>,
print_options: PrintOptions,
) {
@@ -168,9 +214,7 @@ async fn exec_from_lines(
}
}
-async fn exec_from_repl(execution_config: ExecutionConfig, print_options:
PrintOptions) {
- let mut ctx = ExecutionContext::with_config(execution_config);
-
+async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
@@ -186,7 +230,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig,
print_options: PrintO
Ok(ref line) if line.trim_end().ends_with(';') => {
query.push_str(line.trim_end());
rl.add_history_entry(query.clone());
- match exec_and_print(&mut ctx, print_options.clone(),
query).await {
+ match exec_and_print(ctx, print_options.clone(), query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
@@ -234,15 +278,19 @@ fn is_exit_command(line: &str) -> bool {
}
async fn exec_and_print(
- ctx: &mut ExecutionContext,
+ ctx: &mut Context,
print_options: PrintOptions,
sql: String,
) -> Result<()> {
let now = Instant::now();
- let df = ctx.sql(&sql)?;
- let results = df.collect().await?;
+ let df = match ctx {
+ Context::Local(datafusion) => datafusion.sql(&sql)?,
+ Context::Remote(ballista) => ballista.sql(&sql)?,
+ };
+ let results = df.collect().await?;
print_options.print_batches(&results, now)?;
+
Ok(())
}