This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0388682e3 Support ShowVariable Statement (#3455)
0388682e3 is described below
commit 0388682e3d9f049cc7802da2f47aee73233810db
Author: Wei-Ting Kuo <[email protected]>
AuthorDate: Wed Sep 14 20:47:07 2022 +0800
Support ShowVariable Statement (#3455)
* support "SHOW VARIABLE;"
* fix test case
* fix comment
* fix clippy
* rename settings -> df_settings
---
datafusion/core/src/catalog/information_schema.rs | 82 ++++++++-
datafusion/core/src/execution/context.rs | 30 ++--
datafusion/core/src/physical_plan/planner.rs | 2 +
datafusion/core/tests/sql/information_schema.rs | 197 +++++++++++++++-------
datafusion/sql/src/planner.rs | 25 ++-
5 files changed, 253 insertions(+), 83 deletions(-)
diff --git a/datafusion/core/src/catalog/information_schema.rs
b/datafusion/core/src/catalog/information_schema.rs
index 00b7d7d35..eca399e27 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -24,6 +24,8 @@ use std::{
sync::{Arc, Weak},
};
+use parking_lot::RwLock;
+
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
@@ -39,15 +41,19 @@ use super::{
schema::SchemaProvider,
};
+use crate::config::ConfigOptions;
+
const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
+const DF_SETTINGS: &str = "df_settings";
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
+ config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}
@@ -55,10 +61,12 @@ pub(crate) struct CatalogWithInformationSchema {
impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
+ config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
+ config_options,
inner,
}
}
@@ -79,9 +87,13 @@ impl CatalogProvider for CatalogWithInformationSchema {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
- Weak::upgrade(&self.catalog_list).map(|catalog_list| {
- Arc::new(InformationSchemaProvider { catalog_list })
- as Arc<dyn SchemaProvider>
+ Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
+ Weak::upgrade(&self.config_options).map(|config_options| {
+ Arc::new(InformationSchemaProvider {
+ catalog_list,
+ config_options,
+ }) as Arc<dyn SchemaProvider>
+ })
})
} else {
self.inner.schema(name)
@@ -106,6 +118,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
+ config_options: Arc<RwLock<ConfigOptions>>,
}
impl InformationSchemaProvider {
@@ -141,6 +154,12 @@ impl InformationSchemaProvider {
COLUMNS,
TableType::View,
);
+ builder.add_table(
+ &catalog_name,
+ INFORMATION_SCHEMA,
+ DF_SETTINGS,
+ TableType::View,
+ );
}
let mem_table: MemTable = builder.into();
@@ -206,6 +225,19 @@ impl InformationSchemaProvider {
Arc::new(mem_table)
}
+
+ /// Construct the `information_schema.df_settings` virtual table
+ fn make_df_settings(&self) -> Arc<dyn TableProvider> {
+ let mut builder = InformationSchemaDfSettingsBuilder::new();
+
+ for (name, setting) in self.config_options.read().options() {
+ builder.add_setting(name, setting.to_string());
+ }
+
+ let mem_table: MemTable = builder.into();
+
+ Arc::new(mem_table)
+ }
}
impl SchemaProvider for InformationSchemaProvider {
@@ -224,6 +256,8 @@ impl SchemaProvider for InformationSchemaProvider {
Some(self.make_columns())
} else if name.eq_ignore_ascii_case("views") {
Some(self.make_views())
+ } else if name.eq_ignore_ascii_case("df_settings") {
+ Some(self.make_df_settings())
} else {
None
}
@@ -579,3 +613,45 @@ impl From<InformationSchemaColumnsBuilder> for MemTable {
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
+
+struct InformationSchemaDfSettingsBuilder {
+ names: StringBuilder,
+ settings: StringBuilder,
+}
+
+impl InformationSchemaDfSettingsBuilder {
+ fn new() -> Self {
+ Self {
+ names: StringBuilder::new(),
+ settings: StringBuilder::new(),
+ }
+ }
+
+ fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>)
{
+ self.names.append_value(name.as_ref());
+ self.settings.append_value(setting.as_ref());
+ }
+}
+
+impl From<InformationSchemaDfSettingsBuilder> for MemTable {
+ fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable {
+ let schema = Schema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("setting", DataType::Utf8, false),
+ ]);
+
+ let InformationSchemaDfSettingsBuilder {
+ mut names,
+ mut settings,
+ } = value;
+
+ let schema = Arc::new(schema);
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(names.finish()), Arc::new(settings.finish())],
+ )
+ .unwrap();
+
+ MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+ }
+}
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 9540ca7ad..cb0e16f54 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -841,6 +841,7 @@ impl SessionContext {
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
+ Arc::downgrade(&state.config.config_options),
catalog,
))
} else {
@@ -1130,7 +1131,7 @@ pub struct SessionConfig {
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
/// Configuration options
- pub config_options: ConfigOptions,
+ pub config_options: Arc<RwLock<ConfigOptions>>,
/// Opaque extensions.
extensions: AnyMap,
}
@@ -1147,7 +1148,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
- config_options: ConfigOptions::new(),
+ config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
@@ -1166,14 +1167,14 @@ impl SessionConfig {
/// Create an execution config with config options read from the
environment
pub fn from_env() -> Self {
Self {
- config_options: ConfigOptions::from_env(),
+ config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
..Default::default()
}
}
/// Set a configuration option
- pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
- self.config_options.set(key, value);
+ pub fn set(self, key: &str, value: ScalarValue) -> Self {
+ self.config_options.write().set(key, value);
self
}
@@ -1252,22 +1253,18 @@ impl SessionConfig {
/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.config_options
+ .read()
.get_u64(OPT_BATCH_SIZE)
.try_into()
.unwrap()
}
- /// Get the current configuration options
- pub fn config_options(&self) -> &ConfigOptions {
- &self.config_options
- }
-
/// Convert configuration options to name-value pairs with values
converted to strings. Note
/// that this method will eventually be deprecated and replaced by
[config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
- for (k, v) in self.config_options.options() {
+ for (k, v) in self.config_options.read().options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
@@ -1420,6 +1417,7 @@ impl SessionState {
let default_catalog: Arc<dyn CatalogProvider> = if
config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
+ Arc::downgrade(&config.config_options),
Arc::new(default_catalog),
))
} else {
@@ -1444,7 +1442,11 @@ impl SessionState {
Arc::new(ProjectionPushDown::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
];
- if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
+ if config
+ .config_options
+ .read()
+ .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
+ {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
@@ -1457,10 +1459,11 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
- if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
+ if config.config_options.read().get_bool(OPT_COALESCE_BATCHES) {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
+ .read()
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.try_into()
.unwrap(),
@@ -1564,6 +1567,7 @@ impl SessionState {
let mut optimizer_config =
OptimizerConfig::new().with_skip_failing_rules(
self.config
.config_options
+ .read()
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
);
optimizer_config.query_execution_start_time =
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 9f1e488ef..40109eda2 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1573,6 +1573,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
+ .read()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
{
stringified_plans = e.stringified_plans.clone();
@@ -1583,6 +1584,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
+ .read()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
{
let input = self
diff --git a/datafusion/core/tests/sql/information_schema.rs
b/datafusion/core/tests/sql/information_schema.rs
index 017330e44..c9a5c9aa9 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -52,13 +52,14 @@ async fn information_schema_tables_no_tables() {
.unwrap();
let expected = vec![
- "+---------------+--------------------+------------+------------+",
- "| table_catalog | table_schema | table_name | table_type |",
- "+---------------+--------------------+------------+------------+",
- "| datafusion | information_schema | columns | VIEW |",
- "| datafusion | information_schema | tables | VIEW |",
- "| datafusion | information_schema | views | VIEW |",
- "+---------------+--------------------+------------+------------+",
+ "+---------------+--------------------+-------------+------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+---------------+--------------------+-------------+------------+",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | information_schema | df_settings | VIEW |",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | views | VIEW |",
+ "+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
@@ -77,14 +78,15 @@ async fn information_schema_tables_tables_default_catalog()
{
.unwrap();
let expected = vec![
- "+---------------+--------------------+------------+------------+",
- "| table_catalog | table_schema | table_name | table_type |",
- "+---------------+--------------------+------------+------------+",
- "| datafusion | information_schema | columns | VIEW |",
- "| datafusion | information_schema | tables | VIEW |",
- "| datafusion | information_schema | views | VIEW |",
- "| datafusion | public | t | BASE TABLE |",
- "+---------------+--------------------+------------+------------+",
+ "+---------------+--------------------+-------------+------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+---------------+--------------------+-------------+------------+",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | information_schema | df_settings | VIEW |",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | views | VIEW |",
+ "| datafusion | public | t | BASE TABLE |",
+ "+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
@@ -97,15 +99,16 @@ async fn information_schema_tables_tables_default_catalog()
{
.unwrap();
let expected = vec![
- "+---------------+--------------------+------------+------------+",
- "| table_catalog | table_schema | table_name | table_type |",
- "+---------------+--------------------+------------+------------+",
- "| datafusion | information_schema | columns | VIEW |",
- "| datafusion | information_schema | tables | VIEW |",
- "| datafusion | information_schema | views | VIEW |",
- "| datafusion | public | t | BASE TABLE |",
- "| datafusion | public | t2 | BASE TABLE |",
- "+---------------+--------------------+------------+------------+",
+ "+---------------+--------------------+-------------+------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+---------------+--------------------+-------------+------------+",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | information_schema | df_settings | VIEW |",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | views | VIEW |",
+ "| datafusion | public | t | BASE TABLE |",
+ "| datafusion | public | t2 | BASE TABLE |",
+ "+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
@@ -142,22 +145,25 @@ async fn
information_schema_tables_tables_with_multiple_catalogs() {
.unwrap();
let expected = vec![
- "+------------------+--------------------+------------+------------+",
- "| table_catalog | table_schema | table_name | table_type |",
- "+------------------+--------------------+------------+------------+",
- "| datafusion | information_schema | columns | VIEW |",
- "| datafusion | information_schema | tables | VIEW |",
- "| datafusion | information_schema | views | VIEW |",
- "| my_catalog | information_schema | columns | VIEW |",
- "| my_catalog | information_schema | tables | VIEW |",
- "| my_catalog | information_schema | views | VIEW |",
- "| my_catalog | my_schema | t1 | BASE TABLE |",
- "| my_catalog | my_schema | t2 | BASE TABLE |",
- "| my_other_catalog | information_schema | columns | VIEW |",
- "| my_other_catalog | information_schema | tables | VIEW |",
- "| my_other_catalog | information_schema | views | VIEW |",
- "| my_other_catalog | my_other_schema | t3 | BASE TABLE |",
- "+------------------+--------------------+------------+------------+",
+ "+------------------+--------------------+-------------+------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+------------------+--------------------+-------------+------------+",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | information_schema | df_settings | VIEW |",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | views | VIEW |",
+ "| my_catalog | information_schema | columns | VIEW |",
+ "| my_catalog | information_schema | df_settings | VIEW |",
+ "| my_catalog | information_schema | tables | VIEW |",
+ "| my_catalog | information_schema | views | VIEW |",
+ "| my_catalog | my_schema | t1 | BASE TABLE |",
+ "| my_catalog | my_schema | t2 | BASE TABLE |",
+ "| my_other_catalog | information_schema | columns | VIEW |",
+ "| my_other_catalog | information_schema | df_settings | VIEW |",
+ "| my_other_catalog | information_schema | tables | VIEW |",
+ "| my_other_catalog | information_schema | views | VIEW |",
+ "| my_other_catalog | my_other_schema | t3 | BASE TABLE |",
+ "+------------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
@@ -206,16 +212,17 @@ async fn information_schema_tables_table_types() {
.unwrap();
let expected = vec![
-
"+---------------+--------------------+------------+-----------------+",
- "| table_catalog | table_schema | table_name | table_type
|",
-
"+---------------+--------------------+------------+-----------------+",
- "| datafusion | information_schema | columns | VIEW
|",
- "| datafusion | information_schema | tables | VIEW
|",
- "| datafusion | information_schema | views | VIEW
|",
- "| datafusion | public | physical | BASE TABLE
|",
- "| datafusion | public | query | VIEW
|",
- "| datafusion | public | temp | LOCAL TEMPORARY
|",
-
"+---------------+--------------------+------------+-----------------+",
+
"+---------------+--------------------+-------------+-----------------+",
+ "| table_catalog | table_schema | table_name | table_type
|",
+
"+---------------+--------------------+-------------+-----------------+",
+ "| datafusion | information_schema | columns | VIEW
|",
+ "| datafusion | information_schema | df_settings | VIEW
|",
+ "| datafusion | information_schema | tables | VIEW
|",
+ "| datafusion | information_schema | views | VIEW
|",
+ "| datafusion | public | physical | BASE TABLE
|",
+ "| datafusion | public | query | VIEW
|",
+ "| datafusion | public | temp | LOCAL TEMPORARY
|",
+
"+---------------+--------------------+-------------+-----------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
@@ -280,14 +287,15 @@ async fn information_schema_show_tables() {
let result = plan_and_collect(&ctx, "SHOW TABLES").await.unwrap();
let expected = vec![
- "+---------------+--------------------+------------+------------+",
- "| table_catalog | table_schema | table_name | table_type |",
- "+---------------+--------------------+------------+------------+",
- "| datafusion | information_schema | columns | VIEW |",
- "| datafusion | information_schema | tables | VIEW |",
- "| datafusion | information_schema | views | VIEW |",
- "| datafusion | public | t | BASE TABLE |",
- "+---------------+--------------------+------------+------------+",
+ "+---------------+--------------------+-------------+------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+---------------+--------------------+-------------+------------+",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | information_schema | df_settings | VIEW |",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | views | VIEW |",
+ "| datafusion | public | t | BASE TABLE |",
+ "+---------------+--------------------+-------------+------------+",
];
assert_batches_sorted_eq!(expected, &result);
@@ -436,15 +444,41 @@ async fn information_schema_show_table_table_names() {
);
}
+// FIXME once we support raise error while show non-existing variable, we
could add this back
+//#[tokio::test]
+//async fn show_unsupported() {
+// let ctx = SessionContext::with_config(SessionConfig::new());
+//
+// let err = plan_and_collect(&ctx, "SHOW SOMETHING_UNKNOWN")
+// .await
+// .unwrap_err();
+//
+// assert_eq!(err.to_string(), "This feature is not implemented: SHOW
SOMETHING_UNKNOWN not implemented. Supported syntax: SHOW <TABLES>");
+//}
+
+// FIXME
+// currently we cannot know whether a variable exists, this will output 0 row
instead
+// one way to fix this is to generate a ConfigOptions and get options' key to
compare
+// however config.rs is currently in core lib, could not be used by
datafusion_sql due to the dependency cycle
#[tokio::test]
-async fn show_unsupported() {
- let ctx = SessionContext::with_config(SessionConfig::new());
+async fn show_non_existing_variable() {
+ let ctx =
+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
- let err = plan_and_collect(&ctx, "SHOW SOMETHING_UNKNOWN")
+ let result = plan_and_collect(&ctx, "SHOW SOMETHING_UNKNOWN")
.await
- .unwrap_err();
+ .unwrap();
- assert_eq!(err.to_string(), "This feature is not implemented: SHOW
SOMETHING_UNKNOWN not implemented. Supported syntax: SHOW <TABLES>");
+ assert_eq!(result.len(), 0);
+}
+
+#[tokio::test]
+async fn show_unsupported_when_information_schema_off() {
+ let ctx = SessionContext::with_config(SessionConfig::new());
+
+ let err = plan_and_collect(&ctx, "SHOW SOMETHING").await.unwrap_err();
+
+ assert_eq!(err.to_string(), "Error during planning: SHOW [VARIABLE] is not
supported unless information_schema is enabled");
}
#[tokio::test]
@@ -632,3 +666,40 @@ async fn show_external_create_table() {
async fn plan_and_collect(ctx: &SessionContext, sql: &str) ->
Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
}
+
+#[tokio::test]
+async fn show_variable_in_config_options() {
+ let ctx =
+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+ let sql = "SHOW datafusion.execution.batch_size";
+ let results = plan_and_collect(&ctx, sql).await.unwrap();
+
+ let expected = vec![
+ "+---------------------------------+---------+",
+ "| name | setting |",
+ "+---------------------------------+---------+",
+ "| datafusion.execution.batch_size | 8192 |",
+ "+---------------------------------+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+}
+
+#[tokio::test]
+async fn show_all() {
+ let ctx =
+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+ let sql = "SHOW ALL";
+
+ let results = plan_and_collect(&ctx, sql).await.unwrap();
+
+ let expected_length = ctx
+ .state
+ .read()
+ .config
+ .config_options
+ .read()
+ .options()
+ .len();
+ assert_eq!(expected_length, results[0].num_rows());
+}
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 5d30b670f..0338d713a 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -2378,10 +2378,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan>
{
let variable = ObjectName(variable.to_vec()).to_string();
- Err(DataFusionError::NotImplemented(format!(
- "SHOW {} not implemented. Supported syntax: SHOW <TABLES>",
- variable
- )))
+
+ if !self.has_table("information_schema", "df_settings") {
+ return Err(DataFusionError::Plan(
+ "SHOW [VARIABLE] is not supported unless information_schema is
enabled"
+ .to_string(),
+ ));
+ }
+
+ let query = if variable.to_lowercase() == "all" {
+ String::from("SELECT name, setting FROM
information_schema.df_settings")
+ } else {
+ format!(
+ "SELECT name, setting FROM information_schema.df_settings
WHERE name = '{}'",
+ variable
+ )
+ };
+
+ let mut rewrite = DFParser::parse_sql(&query)?;
+ assert_eq!(rewrite.len(), 1);
+
+ self.statement_to_plan(rewrite.pop_front().unwrap())
}
fn show_columns_to_plan(