This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 75b8112  refactor datafusion cli to be more modular (#1215)
75b8112 is described below

commit 75b8112ee33af81d6085be4a83a096bf965dbc89
Author: Jiayu Liu <[email protected]>
AuthorDate: Tue Nov 2 15:28:23 2021 +0800

    refactor datafusion cli to be more modular (#1215)
---
 datafusion-cli/src/context.rs                   |  57 +++++++++
 datafusion-cli/src/exec.rs                      | 125 ++++++++++++++++++++
 datafusion-cli/src/lib.rs                       |  41 +------
 datafusion-cli/src/main.rs                      | 148 +++---------------------
 datafusion-cli/src/{lib.rs => print_options.rs} |   8 +-
 5 files changed, 201 insertions(+), 178 deletions(-)

diff --git a/datafusion-cli/src/context.rs b/datafusion-cli/src/context.rs
new file mode 100644
index 0000000..6e46b9b
--- /dev/null
+++ b/datafusion-cli/src/context.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Context (remote or local)
+
+use ballista::context::BallistaContext;
+use ballista::prelude::BallistaConfig;
+use datafusion::dataframe::DataFrame;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use std::sync::Arc;
+
+/// The CLI supports using a local DataFusion context or a distributed 
BallistaContext
+pub enum Context {
+    /// In-process execution with DataFusion
+    Local(ExecutionContext),
+    /// Distributed execution with Ballista
+    Remote(BallistaContext),
+}
+
+impl Context {
+    /// create a new remote context with given host and port
+    pub fn new_remote(host: &str, port: u16) -> Result<Context> {
+        let config: BallistaConfig = BallistaConfig::new()
+            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+        Ok(Context::Remote(BallistaContext::remote(
+            host, port, &config,
+        )))
+    }
+
+    /// create a local context using the given config
+    pub fn new_local(config: &ExecutionConfig) -> Context {
+        Context::Local(ExecutionContext::with_config(config.clone()))
+    }
+
+    /// execute an SQL statement against the context
+    pub async fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
+        match self {
+            Context::Local(datafusion) => datafusion.sql(sql).await,
+            Context::Remote(ballista) => ballista.sql(sql).await,
+        }
+    }
+}
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
new file mode 100644
index 0000000..ed3ed98
--- /dev/null
+++ b/datafusion-cli/src/exec.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution functions
+
+use crate::{
+    context::Context,
+    print_format::{all_print_formats, PrintFormat},
+    print_options::PrintOptions,
+};
+use datafusion::error::{DataFusionError, Result};
+use rustyline::Editor;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::time::Instant;
+
+/// run and execute SQL statements and commands from a file, against a context 
with the given print options
+pub async fn exec_from_lines(
+    ctx: &mut Context,
+    reader: &mut BufReader<File>,
+    print_options: PrintOptions,
+) {
+    let mut query = "".to_owned();
+
+    for line in reader.lines() {
+        match line {
+            Ok(line) if line.starts_with("--") => {
+                continue;
+            }
+            Ok(line) => {
+                let line = line.trim_end();
+                query.push_str(line);
+                if line.ends_with(';') {
+                    match exec_and_print(ctx, print_options.clone(), 
query).await {
+                        Ok(_) => {}
+                        Err(err) => println!("{:?}", err),
+                    }
+                    query = "".to_owned();
+                } else {
+                    query.push('\n');
+                }
+            }
+            _ => {
+                break;
+            }
+        }
+    }
+
+    // run the left over query if the last statement doesn't contain ‘;’
+    if !query.is_empty() {
+        match exec_and_print(ctx, print_options, query).await {
+            Ok(_) => {}
+            Err(err) => println!("{:?}", err),
+        }
+    }
+}
+
+/// run and execute SQL statements and commands against a context with the 
given print options
+pub async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
+    let mut rl = Editor::<()>::new();
+    rl.load_history(".history").ok();
+
+    let mut query = "".to_owned();
+    loop {
+        match rl.readline("> ") {
+            Ok(ref line) if is_exit_command(line) && query.is_empty() => {
+                break;
+            }
+            Ok(ref line) if line.starts_with("--") => {
+                continue;
+            }
+            Ok(ref line) if line.trim_end().ends_with(';') => {
+                query.push_str(line.trim_end());
+                rl.add_history_entry(query.clone());
+                match exec_and_print(ctx, print_options.clone(), query).await {
+                    Ok(_) => {}
+                    Err(err) => println!("{:?}", err),
+                }
+                query = "".to_owned();
+            }
+            Ok(ref line) => {
+                query.push_str(line);
+                query.push('\n');
+            }
+            Err(_) => {
+                break;
+            }
+        }
+    }
+
+    rl.save_history(".history").ok();
+}
+
+async fn exec_and_print(
+    ctx: &mut Context,
+    print_options: PrintOptions,
+    sql: String,
+) -> Result<()> {
+    let now = Instant::now();
+    let df = ctx.sql(&sql).await?;
+    let results = df.collect().await?;
+    print_options.print_batches(&results, now)?;
+
+    Ok(())
+}
+
+fn is_exit_command(line: &str) -> bool {
+    let line = line.trim_end().to_lowercase();
+    line == "quit" || line == "exit"
+}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 74b91ac..a116c35 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -19,42 +19,7 @@
 #![allow(unused_imports)]
 pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
 
+pub mod context;
+pub mod exec;
 pub mod print_format;
-
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
-use print_format::PrintFormat;
-use std::time::Instant;
-
-#[derive(Debug, Clone)]
-pub struct PrintOptions {
-    pub format: PrintFormat,
-    pub quiet: bool,
-}
-
-fn print_timing_info(row_count: usize, now: Instant) {
-    println!(
-        "{} {} in set. Query took {:.3} seconds.",
-        row_count,
-        if row_count == 1 { "row" } else { "rows" },
-        now.elapsed().as_secs_f64()
-    );
-}
-
-impl PrintOptions {
-    /// print the batches to stdout using the specified format
-    pub fn print_batches(&self, batches: &[RecordBatch], now: Instant) -> 
Result<()> {
-        if batches.is_empty() {
-            if !self.quiet {
-                print_timing_info(0, now);
-            }
-        } else {
-            self.format.print_batches(batches)?;
-            if !self.quiet {
-                let row_count: usize = batches.iter().map(|b| 
b.num_rows()).sum();
-                print_timing_info(row_count, now);
-            }
-        }
-        Ok(())
-    }
-}
+pub mod print_options;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 4814485..cd4caca 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -15,33 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#![allow(bare_trait_objects)]
-
-use std::env;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-use std::path::Path;
-use std::time::Instant;
-
-use ballista::context::BallistaContext;
-use ballista::prelude::BallistaConfig;
 use clap::{crate_version, App, Arg};
-use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion::error::Result;
+use datafusion::execution::context::ExecutionConfig;
 use datafusion_cli::{
+    context::Context,
+    exec,
     print_format::{all_print_formats, PrintFormat},
-    PrintOptions, DATAFUSION_CLI_VERSION,
+    print_options::PrintOptions,
+    DATAFUSION_CLI_VERSION,
 };
-use rustyline::Editor;
-
-/// The CLI supports using a local DataFusion context or a distributed 
BallistaContext
-enum Context {
-    /// In-process execution with DataFusion
-    Local(ExecutionContext),
-    /// Distributed execution with Ballista
-    Remote(BallistaContext),
-}
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::Path;
 
 #[tokio::main]
 pub async fn main() -> Result<()> {
@@ -139,17 +126,10 @@ pub async fn main() -> Result<()> {
         execution_config = execution_config.with_batch_size(batch_size);
     };
 
-    let ctx: Result<Context> = match (host, port) {
-        (Some(h), Some(p)) => {
-            let config: BallistaConfig = BallistaConfig::new()
-                .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-            Ok(Context::Remote(BallistaContext::remote(h, p, &config)))
-        }
-        _ => Ok(Context::Local(ExecutionContext::with_config(
-            execution_config.clone(),
-        ))),
+    let mut ctx: Context = match (host, port) {
+        (Some(h), Some(p)) => Context::new_remote(h, p)?,
+        _ => Context::new_local(&execution_config),
     };
-    let mut ctx = ctx?;
 
     let format = matches
         .value_of("format")
@@ -165,90 +145,15 @@ pub async fn main() -> Result<()> {
             .collect::<Vec<_>>();
         for file in files {
             let mut reader = BufReader::new(file);
-            exec_from_lines(&mut ctx, &mut reader, 
print_options.clone()).await;
+            exec::exec_from_lines(&mut ctx, &mut reader, 
print_options.clone()).await;
         }
     } else {
-        exec_from_repl(&mut ctx, print_options).await;
+        exec::exec_from_repl(&mut ctx, print_options).await;
     }
 
     Ok(())
 }
 
-async fn exec_from_lines(
-    ctx: &mut Context,
-    reader: &mut BufReader<File>,
-    print_options: PrintOptions,
-) {
-    let mut query = "".to_owned();
-
-    for line in reader.lines() {
-        match line {
-            Ok(line) if line.starts_with("--") => {
-                continue;
-            }
-            Ok(line) => {
-                let line = line.trim_end();
-                query.push_str(line);
-                if line.ends_with(';') {
-                    match exec_and_print(ctx, print_options.clone(), 
query).await {
-                        Ok(_) => {}
-                        Err(err) => println!("{:?}", err),
-                    }
-                    query = "".to_owned();
-                } else {
-                    query.push('\n');
-                }
-            }
-            _ => {
-                break;
-            }
-        }
-    }
-
-    // run the left over query if the last statement doesn't contain ‘;’
-    if !query.is_empty() {
-        match exec_and_print(ctx, print_options, query).await {
-            Ok(_) => {}
-            Err(err) => println!("{:?}", err),
-        }
-    }
-}
-
-async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) {
-    let mut rl = Editor::<()>::new();
-    rl.load_history(".history").ok();
-
-    let mut query = "".to_owned();
-    loop {
-        match rl.readline("> ") {
-            Ok(ref line) if is_exit_command(line) && query.is_empty() => {
-                break;
-            }
-            Ok(ref line) if line.starts_with("--") => {
-                continue;
-            }
-            Ok(ref line) if line.trim_end().ends_with(';') => {
-                query.push_str(line.trim_end());
-                rl.add_history_entry(query.clone());
-                match exec_and_print(ctx, print_options.clone(), query).await {
-                    Ok(_) => {}
-                    Err(err) => println!("{:?}", err),
-                }
-                query = "".to_owned();
-            }
-            Ok(ref line) => {
-                query.push_str(line);
-                query.push('\n');
-            }
-            Err(_) => {
-                break;
-            }
-        }
-    }
-
-    rl.save_history(".history").ok();
-}
-
 fn is_valid_file(dir: String) -> std::result::Result<(), String> {
     if Path::new(&dir).is_file() {
         Ok(())
@@ -271,26 +176,3 @@ fn is_valid_batch_size(size: String) -> 
std::result::Result<(), String> {
         _ => Err(format!("Invalid batch size '{}'", size)),
     }
 }
-
-fn is_exit_command(line: &str) -> bool {
-    let line = line.trim_end().to_lowercase();
-    line == "quit" || line == "exit"
-}
-
-async fn exec_and_print(
-    ctx: &mut Context,
-    print_options: PrintOptions,
-    sql: String,
-) -> Result<()> {
-    let now = Instant::now();
-
-    let df = match ctx {
-        Context::Local(datafusion) => datafusion.sql(&sql).await?,
-        Context::Remote(ballista) => ballista.sql(&sql).await?,
-    };
-
-    let results = df.collect().await?;
-    print_options.print_batches(&results, now)?;
-
-    Ok(())
-}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/print_options.rs
similarity index 90%
copy from datafusion-cli/src/lib.rs
copy to datafusion-cli/src/print_options.rs
index 74b91ac..5e37926 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -15,15 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#![doc = include_str!("../README.md")]
-#![allow(unused_imports)]
-pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
-
-pub mod print_format;
-
+use crate::print_format::PrintFormat;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::Result;
-use print_format::PrintFormat;
 use std::time::Instant;
 
 #[derive(Debug, Clone)]

Reply via email to