This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 75b8112 refactor datafusion cli to be more modular (#1215)
75b8112 is described below
commit 75b8112ee33af81d6085be4a83a096bf965dbc89
Author: Jiayu Liu <[email protected]>
AuthorDate: Tue Nov 2 15:28:23 2021 +0800
refactor datafusion cli to be more modular (#1215)
---
datafusion-cli/src/context.rs | 57 +++++++++
datafusion-cli/src/exec.rs | 125 ++++++++++++++++++++
datafusion-cli/src/lib.rs | 41 +------
datafusion-cli/src/main.rs | 148 +++---------------------
datafusion-cli/src/{lib.rs => print_options.rs} | 8 +-
5 files changed, 201 insertions(+), 178 deletions(-)
diff --git a/datafusion-cli/src/context.rs b/datafusion-cli/src/context.rs
new file mode 100644
index 0000000..6e46b9b
--- /dev/null
+++ b/datafusion-cli/src/context.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Context (remote or local)
+
+use ballista::context::BallistaContext;
+use ballista::prelude::BallistaConfig;
+use datafusion::dataframe::DataFrame;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use std::sync::Arc;
+
+/// The CLI supports using a local DataFusion context or a distributed
BallistaContext
+pub enum Context {
+ /// In-process execution with DataFusion
+ Local(ExecutionContext),
+ /// Distributed execution with Ballista
+ Remote(BallistaContext),
+}
+
+impl Context {
+ /// create a new remote context with given host and port
+ pub fn new_remote(host: &str, port: u16) -> Result<Context> {
+ let config: BallistaConfig = BallistaConfig::new()
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ Ok(Context::Remote(BallistaContext::remote(
+ host, port, &config,
+ )))
+ }
+
+ /// create a local context using the given config
+ pub fn new_local(config: &ExecutionConfig) -> Context {
+ Context::Local(ExecutionContext::with_config(config.clone()))
+ }
+
+ /// execute an SQL statement against the context
+ pub async fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
+ match self {
+ Context::Local(datafusion) => datafusion.sql(sql).await,
+ Context::Remote(ballista) => ballista.sql(sql).await,
+ }
+ }
+}
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
new file mode 100644
index 0000000..ed3ed98
--- /dev/null
+++ b/datafusion-cli/src/exec.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution functions
+
+use crate::{
+ context::Context,
+ print_format::{all_print_formats, PrintFormat},
+ print_options::PrintOptions,
+};
+use datafusion::error::{DataFusionError, Result};
+use rustyline::Editor;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::time::Instant;
+
+/// run and execute SQL statements and commands from a file, against a context
with the given print options
+pub async fn exec_from_lines(
+ ctx: &mut Context,
+ reader: &mut BufReader<File>,
+ print_options: PrintOptions,
+) {
+ let mut query = "".to_owned();
+
+ for line in reader.lines() {
+ match line {
+ Ok(line) if line.starts_with("--") => {
+ continue;
+ }
+ Ok(line) => {
+ let line = line.trim_end();
+ query.push_str(line);
+ if line.ends_with(';') {
+ match exec_and_print(ctx, print_options.clone(),
query).await {
+ Ok(_) => {}
+ Err(err) => println!("{:?}", err),
+ }
+ query = "".to_owned();
+ } else {
+ query.push('\n');
+ }
+ }
+ _ => {
+ break;
+ }
+ }
+ }
+
+ // run the left over query if the last statement doesn't contain ‘;’
+ if !query.is_empty() {
+ match exec_and_print(ctx, print_options, query).await {
+ Ok(_) => {}
+ Err(err) => println!("{:?}", err),
+ }
+ }
+}
+
+/// run and execute SQL statements and commands against a context with the
given print options
+pub async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
+ let mut rl = Editor::<()>::new();
+ rl.load_history(".history").ok();
+
+ let mut query = "".to_owned();
+ loop {
+ match rl.readline("> ") {
+ Ok(ref line) if is_exit_command(line) && query.is_empty() => {
+ break;
+ }
+ Ok(ref line) if line.starts_with("--") => {
+ continue;
+ }
+ Ok(ref line) if line.trim_end().ends_with(';') => {
+ query.push_str(line.trim_end());
+ rl.add_history_entry(query.clone());
+ match exec_and_print(ctx, print_options.clone(), query).await {
+ Ok(_) => {}
+ Err(err) => println!("{:?}", err),
+ }
+ query = "".to_owned();
+ }
+ Ok(ref line) => {
+ query.push_str(line);
+ query.push('\n');
+ }
+ Err(_) => {
+ break;
+ }
+ }
+ }
+
+ rl.save_history(".history").ok();
+}
+
+async fn exec_and_print(
+ ctx: &mut Context,
+ print_options: PrintOptions,
+ sql: String,
+) -> Result<()> {
+ let now = Instant::now();
+ let df = ctx.sql(&sql).await?;
+ let results = df.collect().await?;
+ print_options.print_batches(&results, now)?;
+
+ Ok(())
+}
+
+fn is_exit_command(line: &str) -> bool {
+ let line = line.trim_end().to_lowercase();
+ line == "quit" || line == "exit"
+}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 74b91ac..a116c35 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -19,42 +19,7 @@
#![allow(unused_imports)]
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
+pub mod context;
+pub mod exec;
pub mod print_format;
-
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
-use print_format::PrintFormat;
-use std::time::Instant;
-
-#[derive(Debug, Clone)]
-pub struct PrintOptions {
- pub format: PrintFormat,
- pub quiet: bool,
-}
-
-fn print_timing_info(row_count: usize, now: Instant) {
- println!(
- "{} {} in set. Query took {:.3} seconds.",
- row_count,
- if row_count == 1 { "row" } else { "rows" },
- now.elapsed().as_secs_f64()
- );
-}
-
-impl PrintOptions {
- /// print the batches to stdout using the specified format
- pub fn print_batches(&self, batches: &[RecordBatch], now: Instant) ->
Result<()> {
- if batches.is_empty() {
- if !self.quiet {
- print_timing_info(0, now);
- }
- } else {
- self.format.print_batches(batches)?;
- if !self.quiet {
- let row_count: usize = batches.iter().map(|b|
b.num_rows()).sum();
- print_timing_info(row_count, now);
- }
- }
- Ok(())
- }
-}
+pub mod print_options;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 4814485..cd4caca 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -15,33 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-#![allow(bare_trait_objects)]
-
-use std::env;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-use std::path::Path;
-use std::time::Instant;
-
-use ballista::context::BallistaContext;
-use ballista::prelude::BallistaConfig;
use clap::{crate_version, App, Arg};
-use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion::error::Result;
+use datafusion::execution::context::ExecutionConfig;
use datafusion_cli::{
+ context::Context,
+ exec,
print_format::{all_print_formats, PrintFormat},
- PrintOptions, DATAFUSION_CLI_VERSION,
+ print_options::PrintOptions,
+ DATAFUSION_CLI_VERSION,
};
-use rustyline::Editor;
-
-/// The CLI supports using a local DataFusion context or a distributed
BallistaContext
-enum Context {
- /// In-process execution with DataFusion
- Local(ExecutionContext),
- /// Distributed execution with Ballista
- Remote(BallistaContext),
-}
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::Path;
#[tokio::main]
pub async fn main() -> Result<()> {
@@ -139,17 +126,10 @@ pub async fn main() -> Result<()> {
execution_config = execution_config.with_batch_size(batch_size);
};
- let ctx: Result<Context> = match (host, port) {
- (Some(h), Some(p)) => {
- let config: BallistaConfig = BallistaConfig::new()
- .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
- Ok(Context::Remote(BallistaContext::remote(h, p, &config)))
- }
- _ => Ok(Context::Local(ExecutionContext::with_config(
- execution_config.clone(),
- ))),
+ let mut ctx: Context = match (host, port) {
+ (Some(h), Some(p)) => Context::new_remote(h, p)?,
+ _ => Context::new_local(&execution_config),
};
- let mut ctx = ctx?;
let format = matches
.value_of("format")
@@ -165,90 +145,15 @@ pub async fn main() -> Result<()> {
.collect::<Vec<_>>();
for file in files {
let mut reader = BufReader::new(file);
- exec_from_lines(&mut ctx, &mut reader,
print_options.clone()).await;
+ exec::exec_from_lines(&mut ctx, &mut reader,
print_options.clone()).await;
}
} else {
- exec_from_repl(&mut ctx, print_options).await;
+ exec::exec_from_repl(&mut ctx, print_options).await;
}
Ok(())
}
-async fn exec_from_lines(
- ctx: &mut Context,
- reader: &mut BufReader<File>,
- print_options: PrintOptions,
-) {
- let mut query = "".to_owned();
-
- for line in reader.lines() {
- match line {
- Ok(line) if line.starts_with("--") => {
- continue;
- }
- Ok(line) => {
- let line = line.trim_end();
- query.push_str(line);
- if line.ends_with(';') {
- match exec_and_print(ctx, print_options.clone(),
query).await {
- Ok(_) => {}
- Err(err) => println!("{:?}", err),
- }
- query = "".to_owned();
- } else {
- query.push('\n');
- }
- }
- _ => {
- break;
- }
- }
- }
-
- // run the left over query if the last statement doesn't contain ‘;’
- if !query.is_empty() {
- match exec_and_print(ctx, print_options, query).await {
- Ok(_) => {}
- Err(err) => println!("{:?}", err),
- }
- }
-}
-
-async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
- let mut rl = Editor::<()>::new();
- rl.load_history(".history").ok();
-
- let mut query = "".to_owned();
- loop {
- match rl.readline("> ") {
- Ok(ref line) if is_exit_command(line) && query.is_empty() => {
- break;
- }
- Ok(ref line) if line.starts_with("--") => {
- continue;
- }
- Ok(ref line) if line.trim_end().ends_with(';') => {
- query.push_str(line.trim_end());
- rl.add_history_entry(query.clone());
- match exec_and_print(ctx, print_options.clone(), query).await {
- Ok(_) => {}
- Err(err) => println!("{:?}", err),
- }
- query = "".to_owned();
- }
- Ok(ref line) => {
- query.push_str(line);
- query.push('\n');
- }
- Err(_) => {
- break;
- }
- }
- }
-
- rl.save_history(".history").ok();
-}
-
fn is_valid_file(dir: String) -> std::result::Result<(), String> {
if Path::new(&dir).is_file() {
Ok(())
@@ -271,26 +176,3 @@ fn is_valid_batch_size(size: String) ->
std::result::Result<(), String> {
_ => Err(format!("Invalid batch size '{}'", size)),
}
}
-
-fn is_exit_command(line: &str) -> bool {
- let line = line.trim_end().to_lowercase();
- line == "quit" || line == "exit"
-}
-
-async fn exec_and_print(
- ctx: &mut Context,
- print_options: PrintOptions,
- sql: String,
-) -> Result<()> {
- let now = Instant::now();
-
- let df = match ctx {
- Context::Local(datafusion) => datafusion.sql(&sql).await?,
- Context::Remote(ballista) => ballista.sql(&sql).await?,
- };
-
- let results = df.collect().await?;
- print_options.print_batches(&results, now)?;
-
- Ok(())
-}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/print_options.rs
similarity index 90%
copy from datafusion-cli/src/lib.rs
copy to datafusion-cli/src/print_options.rs
index 74b91ac..5e37926 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -15,15 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-#![doc = include_str!("../README.md")]
-#![allow(unused_imports)]
-pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
-
-pub mod print_format;
-
+use crate::print_format::PrintFormat;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
-use print_format::PrintFormat;
use std::time::Instant;
#[derive(Debug, Clone)]