This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 444c153 Add support show tables and show columns for ballista (#1593)
444c153 is described below
commit 444c153863520072ea22d4f8c498dee39437516d
Author: gaojun2048 <[email protected]>
AuthorDate: Wed Jan 19 01:27:46 2022 +0800
Add support show tables and show columns for ballista (#1593)
* add support show tables and show columns for ballista
* format code style
* fmt code style
* fmt code style
* fmt code
* fmt code
* add ballista.with_information_schema to BallistaConfig
* add ballista.with_information_schema to BallistaConfig
* fix test clippy
* fix test clippy
* fix test clippy
---
ballista/rust/client/Cargo.toml | 2 +
ballista/rust/client/src/context.rs | 137 +++++++++++++++++++++++++++++++++++-
ballista/rust/core/src/config.rs | 59 +++++++++++++++-
ballista/rust/core/src/utils.rs | 5 +-
4 files changed, 197 insertions(+), 6 deletions(-)
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index b8ed26e..aa8297f 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -33,6 +33,8 @@ ballista-scheduler = { path = "../scheduler", version =
"0.6.0", optional = true
futures = "0.3"
log = "0.4"
tokio = "1.0"
+tempfile = "3"
+sqlparser = "0.13"
datafusion = { path = "../../../datafusion", version = "6.0.0" }
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index fff6f26..7a11c68 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -17,6 +17,7 @@
//! Distributed execution context.
+use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
@@ -31,8 +32,10 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
-use datafusion::sql::parser::FileType;
+use datafusion::prelude::{
+ AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
+};
+use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
struct BallistaContextState {
/// Ballista configuration
@@ -242,6 +245,35 @@ impl BallistaContext {
}
}
+ /// is a 'show *' sql
+ pub async fn is_show_statement(&self, sql: &str) -> Result<bool> {
+ let mut is_show_variable: bool = false;
+ let statements = DFParser::parse_sql(sql)?;
+
+ if statements.len() != 1 {
+ return Err(DataFusionError::NotImplemented(
+ "The context currently only supports a single SQL
statement".to_string(),
+ ));
+ }
+
+ if let DFStatement::Statement(s) = &statements[0] {
+ let st: &Statement = s;
+ match st {
+ Statement::ShowVariable { .. } => {
+ is_show_variable = true;
+ }
+ Statement::ShowColumns { .. } => {
+ is_show_variable = true;
+ }
+ _ => {
+ is_show_variable = false;
+ }
+ }
+ };
+
+ Ok(is_show_variable)
+ }
+
/// Create a DataFrame from a SQL statement.
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
@@ -256,6 +288,17 @@ impl BallistaContext {
)
};
+ let is_show = self.is_show_statement(sql).await?;
+ // the show tables、 show columns sql can not run at scheduler because
the tables is store at client
+ if is_show {
+ let state = self.state.lock().unwrap();
+ ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_information_schema(
+ state.config.default_with_information_schema(),
+ ),
+ );
+ }
+
// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
@@ -268,6 +311,7 @@ impl BallistaContext {
}
let plan = ctx.create_logical_plan(sql)?;
+
match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
@@ -319,4 +363,93 @@ mod tests {
let df = context.sql("SELECT 1;").await.unwrap();
df.collect().await.unwrap();
}
+
+ #[tokio::test]
+ #[cfg(feature = "standalone")]
+ async fn test_ballista_show_tables() {
+ use super::*;
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::TempDir;
+ let context =
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
+ .await
+ .unwrap();
+
+ let data = "Jorge,2018-12-13T12:12:10.011Z\n\
+ Andrew,2018-11-13T17:11:10.011Z";
+
+ let tmp_dir = TempDir::new().unwrap();
+ let file_path = tmp_dir.path().join("timestamps.csv");
+
+ // scope to ensure the file is closed and written
+ {
+ File::create(&file_path)
+ .expect("creating temp file")
+ .write_all(data.as_bytes())
+ .expect("writing data");
+ }
+
+ let sql = format!(
+ "CREATE EXTERNAL TABLE csv_with_timestamps (
+ name VARCHAR,
+ ts TIMESTAMP
+ )
+ STORED AS CSV
+ LOCATION '{}'
+ ",
+ file_path.to_str().expect("path is utf8")
+ );
+
+ context.sql(sql.as_str()).await.unwrap();
+
+ let df = context.sql("show columns from csv_with_timestamps;").await;
+
+ assert!(df.is_err());
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "standalone")]
+ async fn test_show_tables_not_with_information_schema() {
+ use super::*;
+ use ballista_core::config::{
+ BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+ };
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::TempDir;
+ let config = BallistaConfigBuilder::default()
+ .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+ .build()
+ .unwrap();
+ let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+ let data = "Jorge,2018-12-13T12:12:10.011Z\n\
+ Andrew,2018-11-13T17:11:10.011Z";
+
+ let tmp_dir = TempDir::new().unwrap();
+ let file_path = tmp_dir.path().join("timestamps.csv");
+
+ // scope to ensure the file is closed and written
+ {
+ File::create(&file_path)
+ .expect("creating temp file")
+ .write_all(data.as_bytes())
+ .expect("writing data");
+ }
+
+ let sql = format!(
+ "CREATE EXTERNAL TABLE csv_with_timestamps (
+ name VARCHAR,
+ ts TIMESTAMP
+ )
+ STORED AS CSV
+ LOCATION '{}'
+ ",
+ file_path.to_str().expect("path is utf8")
+ );
+
+ context.sql(sql.as_str()).await.unwrap();
+ let df = context.sql("show tables;").await;
+ assert!(df.is_ok());
+ }
}
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index 5d7b3c5..2256808 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -18,7 +18,10 @@
//! Ballista configuration
+use core::fmt;
use std::collections::HashMap;
+use std::result;
+use std::string::ParseError;
use crate::error::{BallistaError, Result};
@@ -26,6 +29,9 @@ use datafusion::arrow::datatypes::DataType;
use log::warn;
pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str =
"ballista.shuffle.partitions";
+pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str =
"ballista.with_information_schema";
+
+pub type ParseResult<T> = result::Result<T, String>;
/// Configuration option meta-data
#[derive(Debug, Clone)]
@@ -103,9 +109,9 @@ impl BallistaConfig {
for (name, entry) in &supported_entries {
if let Some(v) = settings.get(name) {
// validate that we can parse the user-supplied value
- let _ = v.parse::<usize>().map_err(|e|
BallistaError::General(format!("Failed to parse user-supplied value '{}' for
configuration setting '{}': {:?}", name, v, e)))?;
+ let _ = Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse user-supplied value '{}' for configuration setting '{}': {}", name, v,
e)))?;
} else if let Some(v) = entry.default_value.clone() {
- let _ = v.parse::<usize>().map_err(|e|
BallistaError::General(format!("Failed to parse default value '{}' for
configuration setting '{}': {:?}", name, v, e)))?;
+ let _ = Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse default value '{}' for configuration setting '{}': {}", name, v, e)))?;
} else {
return Err(BallistaError::General(format!(
"No value specified for mandatory configuration setting
'{}'",
@@ -117,12 +123,35 @@ impl BallistaConfig {
Ok(Self { settings })
}
+ pub fn parse_value(val: &str, data_type: DataType) -> ParseResult<()> {
+ match data_type {
+ DataType::UInt16 => {
+ val.to_string()
+ .parse::<usize>()
+ .map_err(|e| format!("{:?}", e))?;
+ }
+ DataType::Boolean => {
+ val.to_string()
+ .parse::<bool>()
+ .map_err(|e| format!("{:?}", e))?;
+ }
+ _ => {
+ return Err(format!("not support data type: {}", data_type));
+ }
+ }
+
+ Ok(())
+ }
+
/// All available configuration options
pub fn valid_entries() -> HashMap<String, ConfigEntry> {
let entries = vec![
ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(),
"Sets the default number of partitions to create when
repartitioning query stages".to_string(),
DataType::UInt16, Some("2".to_string())),
+ ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
+ "Sets whether enable information_schema".to_string(),
+ DataType::Boolean,Some("false".to_string())),
];
entries
.iter()
@@ -138,6 +167,10 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
}
+ pub fn default_with_information_schema(&self) -> bool {
+ self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA)
+ }
+
fn get_usize_setting(&self, key: &str) -> usize {
if let Some(v) = self.settings.get(key) {
// infallible because we validate all configs in the constructor
@@ -149,6 +182,18 @@ impl BallistaConfig {
v.parse().unwrap()
}
}
+
+ fn get_bool_setting(&self, key: &str) -> bool {
+ if let Some(v) = self.settings.get(key) {
+ // infallible because we validate all configs in the constructor
+ v.parse::<bool>().unwrap()
+ } else {
+ let entries = Self::valid_entries();
+ // infallible because we validate all configs in the constructor
+ let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
+ v.parse::<bool>().unwrap()
+ }
+ }
}
#[cfg(test)]
@@ -159,6 +204,7 @@ mod tests {
fn default_config() -> Result<()> {
let config = BallistaConfig::new()?;
assert_eq!(2, config.default_shuffle_partitions());
+ assert!(!config.default_with_information_schema());
Ok(())
}
@@ -166,8 +212,10 @@ mod tests {
fn custom_config() -> Result<()> {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123")
+ .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()?;
assert_eq!(123, config.default_shuffle_partitions());
+ assert!(config.default_with_information_schema());
Ok(())
}
@@ -178,6 +226,13 @@ mod tests {
.build();
assert!(config.is_err());
assert_eq!("General(\"Failed to parse user-supplied value
'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError {
kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
+
+ let config = BallistaConfig::builder()
+ .set(BALLISTA_WITH_INFORMATION_SCHEMA, "123")
+ .build();
+ assert!(config.is_err());
+ assert_eq!("General(\"Failed to parse user-supplied value
'ballista.with_information_schema' for configuration setting '123':
ParseBoolError\")", format!("{:?}", config.unwrap_err()));
+
Ok(())
}
}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index bdb92c5..e208461 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -46,7 +46,7 @@ use datafusion::error::DataFusionError;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContext, ExecutionContextState, QueryPlanner,
};
-use datafusion::logical_plan::{LogicalPlan, Operator};
+use datafusion::logical_plan::{LogicalPlan, Operator, TableScan};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
@@ -248,7 +248,8 @@ pub fn create_df_ctx_with_ballista_query_planner(
scheduler_url,
config.clone(),
)))
- .with_target_partitions(config.default_shuffle_partitions());
+ .with_target_partitions(config.default_shuffle_partitions())
+ .with_information_schema(true);
ExecutionContext::with_config(config)
}