This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 444c153  Add support show tables and show columns for ballista (#1593)
444c153 is described below

commit 444c153863520072ea22d4f8c498dee39437516d
Author: gaojun2048 <[email protected]>
AuthorDate: Wed Jan 19 01:27:46 2022 +0800

    Add support show tables and show columns for ballista (#1593)
    
    * add support show tables and show columns for ballista
    
    * format code style
    
    * fmt code style
    
    * fmt code style
    
    * fmt code
    
    * fmt code
    
    * add ballista.with_information_schema to BallistaConfig
    
    * add ballista.with_information_schema to BallistaConfig
    
    * fix test clippy
    
    * fix test clippy
    
    * fix test clippy
---
 ballista/rust/client/Cargo.toml     |   2 +
 ballista/rust/client/src/context.rs | 137 +++++++++++++++++++++++++++++++++++-
 ballista/rust/core/src/config.rs    |  59 +++++++++++++++-
 ballista/rust/core/src/utils.rs     |   5 +-
 4 files changed, 197 insertions(+), 6 deletions(-)

diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index b8ed26e..aa8297f 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -33,6 +33,8 @@ ballista-scheduler = { path = "../scheduler", version = 
"0.6.0", optional = true
 futures = "0.3"
 log = "0.4"
 tokio = "1.0"
+tempfile = "3"
+sqlparser = "0.13"
 
 datafusion = { path = "../../../datafusion", version = "6.0.0" }
 
diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index fff6f26..7a11c68 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -17,6 +17,7 @@
 
 //! Distributed execution context.
 
+use sqlparser::ast::Statement;
 use std::collections::HashMap;
 use std::fs;
 use std::path::PathBuf;
@@ -31,8 +32,10 @@ use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::dataframe_impl::DataFrameImpl;
 use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
-use datafusion::sql::parser::FileType;
+use datafusion::prelude::{
+    AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
+};
+use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
 
 struct BallistaContextState {
     /// Ballista configuration
@@ -242,6 +245,35 @@ impl BallistaContext {
         }
     }
 
+    /// is a 'show *' sql
+    pub async fn is_show_statement(&self, sql: &str) -> Result<bool> {
+        let mut is_show_variable: bool = false;
+        let statements = DFParser::parse_sql(sql)?;
+
+        if statements.len() != 1 {
+            return Err(DataFusionError::NotImplemented(
+                "The context currently only supports a single SQL 
statement".to_string(),
+            ));
+        }
+
+        if let DFStatement::Statement(s) = &statements[0] {
+            let st: &Statement = s;
+            match st {
+                Statement::ShowVariable { .. } => {
+                    is_show_variable = true;
+                }
+                Statement::ShowColumns { .. } => {
+                    is_show_variable = true;
+                }
+                _ => {
+                    is_show_variable = false;
+                }
+            }
+        };
+
+        Ok(is_show_variable)
+    }
+
     /// Create a DataFrame from a SQL statement.
     ///
     /// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
@@ -256,6 +288,17 @@ impl BallistaContext {
             )
         };
 
+        let is_show = self.is_show_statement(sql).await?;
+        // the show tables、 show columns sql can not run at scheduler because 
the tables is store at client
+        if is_show {
+            let state = self.state.lock().unwrap();
+            ctx = ExecutionContext::with_config(
+                ExecutionConfig::new().with_information_schema(
+                    state.config.default_with_information_schema(),
+                ),
+            );
+        }
+
         // register tables with DataFusion context
         {
             let state = self.state.lock().unwrap();
@@ -268,6 +311,7 @@ impl BallistaContext {
         }
 
         let plan = ctx.create_logical_plan(sql)?;
+
         match plan {
             LogicalPlan::CreateExternalTable(CreateExternalTable {
                 ref schema,
@@ -319,4 +363,93 @@ mod tests {
         let df = context.sql("SELECT 1;").await.unwrap();
         df.collect().await.unwrap();
     }
+
+    #[tokio::test]
+    #[cfg(feature = "standalone")]
+    async fn test_ballista_show_tables() {
+        use super::*;
+        use std::fs::File;
+        use std::io::Write;
+        use tempfile::TempDir;
+        let context = 
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
+            .await
+            .unwrap();
+
+        let data = "Jorge,2018-12-13T12:12:10.011Z\n\
+                    Andrew,2018-11-13T17:11:10.011Z";
+
+        let tmp_dir = TempDir::new().unwrap();
+        let file_path = tmp_dir.path().join("timestamps.csv");
+
+        // scope to ensure the file is closed and written
+        {
+            File::create(&file_path)
+                .expect("creating temp file")
+                .write_all(data.as_bytes())
+                .expect("writing data");
+        }
+
+        let sql = format!(
+            "CREATE EXTERNAL TABLE csv_with_timestamps (
+                  name VARCHAR,
+                  ts TIMESTAMP
+              )
+              STORED AS CSV
+              LOCATION '{}'
+              ",
+            file_path.to_str().expect("path is utf8")
+        );
+
+        context.sql(sql.as_str()).await.unwrap();
+
+        let df = context.sql("show columns from csv_with_timestamps;").await;
+
+        assert!(df.is_err());
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "standalone")]
+    async fn test_show_tables_not_with_information_schema() {
+        use super::*;
+        use ballista_core::config::{
+            BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+        };
+        use std::fs::File;
+        use std::io::Write;
+        use tempfile::TempDir;
+        let config = BallistaConfigBuilder::default()
+            .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+            .build()
+            .unwrap();
+        let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+        let data = "Jorge,2018-12-13T12:12:10.011Z\n\
+                    Andrew,2018-11-13T17:11:10.011Z";
+
+        let tmp_dir = TempDir::new().unwrap();
+        let file_path = tmp_dir.path().join("timestamps.csv");
+
+        // scope to ensure the file is closed and written
+        {
+            File::create(&file_path)
+                .expect("creating temp file")
+                .write_all(data.as_bytes())
+                .expect("writing data");
+        }
+
+        let sql = format!(
+            "CREATE EXTERNAL TABLE csv_with_timestamps (
+                  name VARCHAR,
+                  ts TIMESTAMP
+              )
+              STORED AS CSV
+              LOCATION '{}'
+              ",
+            file_path.to_str().expect("path is utf8")
+        );
+
+        context.sql(sql.as_str()).await.unwrap();
+        let df = context.sql("show tables;").await;
+        assert!(df.is_ok());
+    }
 }
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index 5d7b3c5..2256808 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -18,7 +18,10 @@
 
 //! Ballista configuration
 
+use core::fmt;
 use std::collections::HashMap;
+use std::result;
+use std::string::ParseError;
 
 use crate::error::{BallistaError, Result};
 
@@ -26,6 +29,9 @@ use datafusion::arrow::datatypes::DataType;
 use log::warn;
 
 pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = 
"ballista.shuffle.partitions";
+pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = 
"ballista.with_information_schema";
+
+pub type ParseResult<T> = result::Result<T, String>;
 
 /// Configuration option meta-data
 #[derive(Debug, Clone)]
@@ -103,9 +109,9 @@ impl BallistaConfig {
         for (name, entry) in &supported_entries {
             if let Some(v) = settings.get(name) {
                 // validate that we can parse the user-supplied value
-                let _ = v.parse::<usize>().map_err(|e| 
BallistaError::General(format!("Failed to parse user-supplied value '{}' for 
configuration setting '{}': {:?}", name, v, e)))?;
+                let _ = Self::parse_value(v.as_str(), 
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to 
parse user-supplied value '{}' for configuration setting '{}': {}", name, v, 
e)))?;
             } else if let Some(v) = entry.default_value.clone() {
-                let _ = v.parse::<usize>().map_err(|e| 
BallistaError::General(format!("Failed to parse default value '{}' for 
configuration setting '{}': {:?}", name, v, e)))?;
+                let _ = Self::parse_value(v.as_str(), 
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to 
parse default value '{}' for configuration setting '{}': {}", name, v, e)))?;
             } else {
                 return Err(BallistaError::General(format!(
                     "No value specified for mandatory configuration setting 
'{}'",
@@ -117,12 +123,35 @@ impl BallistaConfig {
         Ok(Self { settings })
     }
 
+    pub fn parse_value(val: &str, data_type: DataType) -> ParseResult<()> {
+        match data_type {
+            DataType::UInt16 => {
+                val.to_string()
+                    .parse::<usize>()
+                    .map_err(|e| format!("{:?}", e))?;
+            }
+            DataType::Boolean => {
+                val.to_string()
+                    .parse::<bool>()
+                    .map_err(|e| format!("{:?}", e))?;
+            }
+            _ => {
+                return Err(format!("not support data type: {}", data_type));
+            }
+        }
+
+        Ok(())
+    }
+
     /// All available configuration options
     pub fn valid_entries() -> HashMap<String, ConfigEntry> {
         let entries = vec![
             ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(),
                 "Sets the default number of partitions to create when 
repartitioning query stages".to_string(),
                 DataType::UInt16, Some("2".to_string())),
+            ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
+                "Sets whether enable information_schema".to_string(),
+                DataType::Boolean,Some("false".to_string())),
         ];
         entries
             .iter()
@@ -138,6 +167,10 @@ impl BallistaConfig {
         self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
     }
 
+    pub fn default_with_information_schema(&self) -> bool {
+        self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
+    }
+
     fn get_usize_setting(&self, key: &str) -> usize {
         if let Some(v) = self.settings.get(key) {
             // infallible because we validate all configs in the constructor
@@ -149,6 +182,18 @@ impl BallistaConfig {
             v.parse().unwrap()
         }
     }
+
+    fn get_bool_setting(&self, key: &str) -> bool {
+        if let Some(v) = self.settings.get(key) {
+            // infallible because we validate all configs in the constructor
+            v.parse::<bool>().unwrap()
+        } else {
+            let entries = Self::valid_entries();
+            // infallible because we validate all configs in the constructor
+            let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
+            v.parse::<bool>().unwrap()
+        }
+    }
 }
 
 #[cfg(test)]
@@ -159,6 +204,7 @@ mod tests {
     fn default_config() -> Result<()> {
         let config = BallistaConfig::new()?;
         assert_eq!(2, config.default_shuffle_partitions());
+        assert!(!config.default_with_information_schema());
         Ok(())
     }
 
@@ -166,8 +212,10 @@ mod tests {
     fn custom_config() -> Result<()> {
         let config = BallistaConfig::builder()
             .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123")
+            .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
             .build()?;
         assert_eq!(123, config.default_shuffle_partitions());
+        assert!(config.default_with_information_schema());
         Ok(())
     }
 
@@ -178,6 +226,13 @@ mod tests {
             .build();
         assert!(config.is_err());
         assert_eq!("General(\"Failed to parse user-supplied value 
'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { 
kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
+
+        let config = BallistaConfig::builder()
+            .set(BALLISTA_WITH_INFORMATION_SCHEMA, "123")
+            .build();
+        assert!(config.is_err());
+        assert_eq!("General(\"Failed to parse user-supplied value 
'ballista.with_information_schema' for configuration setting '123': 
ParseBoolError\")", format!("{:?}", config.unwrap_err()));
+
         Ok(())
     }
 }
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index bdb92c5..e208461 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -46,7 +46,7 @@ use datafusion::error::DataFusionError;
 use datafusion::execution::context::{
     ExecutionConfig, ExecutionContext, ExecutionContextState, QueryPlanner,
 };
-use datafusion::logical_plan::{LogicalPlan, Operator};
+use datafusion::logical_plan::{LogicalPlan, Operator, TableScan};
 use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
 use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
@@ -248,7 +248,8 @@ pub fn create_df_ctx_with_ballista_query_planner(
             scheduler_url,
             config.clone(),
         )))
-        .with_target_partitions(config.default_shuffle_partitions());
+        .with_target_partitions(config.default_shuffle_partitions())
+        .with_information_schema(true);
     ExecutionContext::with_config(config)
 }
 

Reply via email to