This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 82b4d1bcc4 Split `SessionState` into its own module (#10794)
82b4d1bcc4 is described below
commit 82b4d1bcc40c0136c0b9d416e5bb8445834ad67c
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jun 6 06:48:52 2024 -0400
Split `SessionState` into its own module (#10794)
* Split `SessionState` into its own module
* fix docs
* fix test
* Rename state.rs to session_state.rs
---
datafusion/core/src/execution/context/mod.rs | 1111 +----------------------
datafusion/core/src/execution/mod.rs | 2 +
datafusion/core/src/execution/session_state.rs | 1148 ++++++++++++++++++++++++
3 files changed, 1195 insertions(+), 1066 deletions(-)
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 745eff550f..e247263964 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -17,30 +17,20 @@
//! [`SessionContext`] API for registering data sources and executing queries
-use std::collections::{hash_map::Entry, HashMap, HashSet};
+use std::collections::HashSet;
use std::fmt::Debug;
-use std::ops::ControlFlow;
use std::sync::{Arc, Weak};
use super::options::ReadOptions;
-#[cfg(feature = "array_expressions")]
-use crate::functions_array;
use crate::{
- catalog::information_schema::{InformationSchemaProvider,
INFORMATION_SCHEMA},
catalog::listing_schema::ListingSchemaProvider,
- catalog::schema::{MemorySchemaProvider, SchemaProvider},
- catalog::{
- CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
- MemoryCatalogProviderList,
- },
- config::ConfigOptions,
+ catalog::schema::MemorySchemaProvider,
+ catalog::{CatalogProvider, CatalogProviderList, MemoryCatalogProvider},
dataframe::DataFrame,
datasource::{
- cte_worktable::CteWorkTable,
- function::{TableFunction, TableFunctionImpl},
+ function::TableFunctionImpl,
listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
- object_store::ObjectStoreUrl,
- provider::{DefaultTableFactory, TableProviderFactory},
+ provider::TableProviderFactory,
},
datasource::{provider_as_source, MemTable, TableProvider, ViewTable},
error::{DataFusionError, Result},
@@ -50,51 +40,37 @@ use crate::{
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction,
DropTable,
- DropView, Explain, LogicalPlan, LogicalPlanBuilder, PlanType,
SetVariable,
- TableSource, TableType, ToStringifiedPlan, UNNAMED_TABLE,
+ DropView, LogicalPlan, LogicalPlanBuilder, SetVariable, TableType,
UNNAMED_TABLE,
},
- optimizer::analyzer::{Analyzer, AnalyzerRule},
- optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
- physical_expr::{create_physical_expr, PhysicalExpr},
- physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
+ physical_expr::PhysicalExpr,
physical_plan::ExecutionPlan,
- physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
-use crate::{functions, functions_aggregate};
-use arrow::datatypes::{DataType, SchemaRef};
+use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use datafusion_common::{
- alias::AliasGenerator,
config::{ConfigExtension, TableOptions},
- exec_err, not_impl_err, plan_datafusion_err, plan_err,
- tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor},
+ exec_err, not_impl_err, plan_err,
+ tree_node::{TreeNodeRecursion, TreeNodeVisitor},
DFSchema, SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
expr_rewriter::FunctionRewrite,
logical_plan::{DdlStatement, Statement},
- simplify::SimplifyInfo,
- var_provider::is_system_variables,
- Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
-};
-use datafusion_optimizer::simplify_expressions::ExprSimplifier;
-use datafusion_sql::{
- parser::{CopyToSource, CopyToStatement, DFParser},
- planner::{object_name_to_table_reference, ContextProvider, ParserOptions,
SqlToRel},
- ResolvedTableReference,
+ Expr, UserDefinedLogicalNode, WindowUDF,
};
-use sqlparser::dialect::dialect_from_str;
+
+// backwards compatibility
+pub use crate::execution::session_state::SessionState;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
-use uuid::Uuid;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
@@ -330,7 +306,7 @@ impl SessionContext {
/// Creates a new `SessionContext` using the provided [`SessionState`]
pub fn new_with_state(state: SessionState) -> Self {
Self {
- session_id: state.session_id.clone(),
+ session_id: state.session_id().to_string(),
session_start_time: Utc::now(),
state: Arc::new(RwLock::new(state)),
}
@@ -395,7 +371,7 @@ impl SessionContext {
/// Return the [RuntimeEnv] used to run queries with this `SessionContext`
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
- self.state.read().runtime_env.clone()
+ self.state.read().runtime_env().clone()
}
/// Returns an id that uniquely identifies this `SessionContext`.
@@ -416,7 +392,7 @@ impl SessionContext {
pub fn enable_ident_normalization(&self) -> bool {
self.state
.read()
- .config
+ .config()
.options()
.sql_parser
.enable_ident_normalization
@@ -424,7 +400,7 @@ impl SessionContext {
/// Return a copied version of config for this Session
pub fn copied_config(&self) -> SessionConfig {
- self.state.read().config.clone()
+ self.state.read().config().clone()
}
/// Return a copied version of table options for this Session
@@ -705,8 +681,8 @@ impl SessionContext {
let (catalog, schema_name) = match tokens.len() {
1 => {
let state = self.state.read();
- let name = &state.config.options().catalog.default_catalog;
- let catalog = state.catalog_list.catalog(name).ok_or_else(|| {
+ let name = &state.config().options().catalog.default_catalog;
+ let catalog = state.catalog_list().catalog(name).ok_or_else(||
{
DataFusionError::Execution(format!(
"Missing default catalog '{name}'"
))
@@ -749,7 +725,7 @@ impl SessionContext {
let new_catalog = Arc::new(MemoryCatalogProvider::new());
self.state
.write()
- .catalog_list
+ .catalog_list()
.register_catalog(catalog_name, new_catalog);
self.return_empty_dataframe()
}
@@ -800,7 +776,7 @@ impl SessionContext {
state.config_options().catalog.default_catalog.to_string()
}
};
- if let Some(catalog) = state.catalog_list.catalog(&catalog_name) {
+ if let Some(catalog) = state.catalog_list().catalog(&catalog_name)
{
catalog
} else if allow_missing {
return self.return_empty_dataframe();
@@ -826,7 +802,7 @@ impl SessionContext {
} = stmt;
let mut state = self.state.write();
- state.config.options_mut().set(&variable, &value)?;
+ state.config_mut().options_mut().set(&variable, &value)?;
drop(state);
self.return_empty_dataframe()
@@ -839,8 +815,8 @@ impl SessionContext {
let state = self.state.read().clone();
let file_type = cmd.file_type.to_uppercase();
let factory =
- &state
- .table_factories
+ state
+ .table_factories()
.get(file_type.as_str())
.ok_or_else(|| {
DataFusionError::Execution(format!(
@@ -863,7 +839,7 @@ impl SessionContext {
let state = self.state.read();
let resolved = state.resolve_table_ref(table_ref);
state
- .catalog_list
+ .catalog_list()
.catalog(&resolved.catalog)
.and_then(|c| c.schema(&resolved.schema))
};
@@ -883,7 +859,7 @@ impl SessionContext {
async fn create_function(&self, stmt: CreateFunction) -> Result<DataFrame>
{
let function = {
let state = self.state.read().clone();
- let function_factory = &state.function_factory;
+ let function_factory = state.function_factory();
match function_factory {
Some(f) => f.create(&state, stmt).await?,
@@ -937,16 +913,13 @@ impl SessionContext {
) {
self.state
.write()
- .execution_props
+ .execution_props_mut()
.add_var_provider(variable_type, provider);
}
/// Register a table UDF with this context
pub fn register_udtf(&self, name: &str, fun: Arc<dyn TableFunctionImpl>) {
- self.state.write().table_functions.insert(
- name.to_owned(),
- Arc::new(TableFunction::new(name.to_owned(), fun)),
- );
+ self.state.write().register_udtf(name, fun)
}
/// Registers a scalar UDF within this context.
@@ -1176,18 +1149,18 @@ impl SessionContext {
let name = name.into();
self.state
.read()
- .catalog_list
+ .catalog_list()
.register_catalog(name, catalog)
}
/// Retrieves the list of available catalog names.
pub fn catalog_names(&self) -> Vec<String> {
- self.state.read().catalog_list.catalog_names()
+ self.state.read().catalog_list().catalog_names()
}
/// Retrieves a [`CatalogProvider`] instance by name
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
- self.state.read().catalog_list.catalog(name)
+ self.state.read().catalog_list().catalog(name)
}
/// Registers a [`TableProvider`] as a table that can be
@@ -1280,7 +1253,7 @@ impl SessionContext {
/// `query_execution_start_time` to the current time
pub fn state(&self) -> SessionState {
let mut state = self.state.read().clone();
- state.execution_props.start_execution();
+ state.execution_props_mut().start_execution();
state
}
@@ -1291,7 +1264,7 @@ impl SessionContext {
/// Register [`CatalogProviderList`] in [`SessionState`]
pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn
CatalogProviderList>) {
- self.state.write().catalog_list = catalog_list;
+ self.state.write().register_catalog_list(catalog_list)
}
/// Registers a [`ConfigExtension`] as a table option extention that can be
@@ -1299,9 +1272,7 @@ impl SessionContext {
pub fn register_table_options_extension<T: ConfigExtension>(&self,
extension: T) {
self.state
.write()
- .table_option_namespace
- .extensions
- .insert(extension)
+ .register_table_options_extension(extension)
}
}
@@ -1342,6 +1313,13 @@ impl FunctionRegistry for SessionContext {
}
}
+/// Create a new task context instance from SessionContext
+impl From<&SessionContext> for TaskContext {
+ fn from(session: &SessionContext) -> Self {
+ TaskContext::from(&*session.state.read())
+ }
+}
+
/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
@@ -1353,23 +1331,6 @@ pub trait QueryPlanner {
) -> Result<Arc<dyn ExecutionPlan>>;
}
-/// The query planner used if no user defined planner is provided
-struct DefaultQueryPlanner {}
-
-#[async_trait]
-impl QueryPlanner for DefaultQueryPlanner {
- /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for
execution
- async fn create_physical_plan(
- &self,
- logical_plan: &LogicalPlan,
- session_state: &SessionState,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let planner = DefaultPhysicalPlanner::default();
- planner
- .create_physical_plan(logical_plan, session_state)
- .await
- }
-}
/// A pluggable interface to handle `CREATE FUNCTION` statements
/// and interact with [SessionState] to registers new udf, udaf or udwf.
@@ -1395,990 +1356,6 @@ pub enum RegisterFunction {
Table(String, Arc<dyn TableFunctionImpl>),
}
-/// Execution context for registering data sources and executing queries.
-/// See [`SessionContext`] for a higher level API.
-///
-/// Note that there is no `Default` or `new()` for SessionState,
-/// to avoid accidentally running queries or other operations without passing
through
-/// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionContext`].
-#[derive(Clone)]
-pub struct SessionState {
- /// A unique UUID that identifies the session
- session_id: String,
- /// Responsible for analyzing and rewrite a logical plan before
optimization
- analyzer: Analyzer,
- /// Responsible for optimizing a logical plan
- optimizer: Optimizer,
- /// Responsible for optimizing a physical execution plan
- physical_optimizers: PhysicalOptimizer,
- /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
- query_planner: Arc<dyn QueryPlanner + Send + Sync>,
- /// Collection of catalogs containing schemas and ultimately TableProviders
- catalog_list: Arc<dyn CatalogProviderList>,
- /// Table Functions
- table_functions: HashMap<String, Arc<TableFunction>>,
- /// Scalar functions that are registered with the context
- scalar_functions: HashMap<String, Arc<ScalarUDF>>,
- /// Aggregate functions registered in the context
- aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
- /// Window functions registered in the context
- window_functions: HashMap<String, Arc<WindowUDF>>,
- /// Deserializer registry for extensions.
- serializer_registry: Arc<dyn SerializerRegistry>,
- /// Session configuration
- config: SessionConfig,
- /// Table options
- table_option_namespace: TableOptions,
- /// Execution properties
- execution_props: ExecutionProps,
- /// TableProviderFactories for different file formats.
- ///
- /// Maps strings like "JSON" to an instance of [`TableProviderFactory`]
- ///
- /// This is used to create [`TableProvider`] instances for the
- /// `CREATE EXTERNAL TABLE ... STORED AS <FORMAT>` for custom file
- /// formats other than those built into DataFusion
- table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
- /// Runtime environment
- runtime_env: Arc<RuntimeEnv>,
-
- /// [FunctionFactory] to support pluggable user defined function handler.
- ///
- /// It will be invoked on `CREATE FUNCTION` statements.
- /// thus, changing dialect o PostgreSql is required
- function_factory: Option<Arc<dyn FunctionFactory>>,
-}
-
-impl Debug for SessionState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("SessionState")
- .field("session_id", &self.session_id)
- // TODO should we print out more?
- .finish()
- }
-}
-
-impl SessionState {
- /// Returns new [`SessionState`] using the provided
- /// [`SessionConfig`] and [`RuntimeEnv`].
- pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>)
-> Self {
- let catalog_list =
- Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn
CatalogProviderList>;
- Self::new_with_config_rt_and_catalog_list(config, runtime,
catalog_list)
- }
-
- /// Returns new [`SessionState`] using the provided
- /// [`SessionConfig`] and [`RuntimeEnv`].
- #[deprecated(since = "32.0.0", note = "Use
SessionState::new_with_config_rt")]
- pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) ->
Self {
- Self::new_with_config_rt(config, runtime)
- }
-
- /// Returns new [`SessionState`] using the provided
- /// [`SessionConfig`], [`RuntimeEnv`], and [`CatalogProviderList`]
- pub fn new_with_config_rt_and_catalog_list(
- config: SessionConfig,
- runtime: Arc<RuntimeEnv>,
- catalog_list: Arc<dyn CatalogProviderList>,
- ) -> Self {
- let session_id = Uuid::new_v4().to_string();
-
- // Create table_factories for all default formats
- let mut table_factories: HashMap<String, Arc<dyn
TableProviderFactory>> =
- HashMap::new();
- #[cfg(feature = "parquet")]
- table_factories.insert("PARQUET".into(),
Arc::new(DefaultTableFactory::new()));
- table_factories.insert("CSV".into(),
Arc::new(DefaultTableFactory::new()));
- table_factories.insert("JSON".into(),
Arc::new(DefaultTableFactory::new()));
- table_factories.insert("NDJSON".into(),
Arc::new(DefaultTableFactory::new()));
- table_factories.insert("AVRO".into(),
Arc::new(DefaultTableFactory::new()));
- table_factories.insert("ARROW".into(),
Arc::new(DefaultTableFactory::new()));
-
- if config.create_default_catalog_and_schema() {
- let default_catalog = MemoryCatalogProvider::new();
-
- default_catalog
- .register_schema(
- &config.options().catalog.default_schema,
- Arc::new(MemorySchemaProvider::new()),
- )
- .expect("memory catalog provider can register schema");
-
- Self::register_default_schema(
- &config,
- &table_factories,
- &runtime,
- &default_catalog,
- );
-
- catalog_list.register_catalog(
- config.options().catalog.default_catalog.clone(),
- Arc::new(default_catalog),
- );
- }
-
- let mut new_self = SessionState {
- session_id,
- analyzer: Analyzer::new(),
- optimizer: Optimizer::new(),
- physical_optimizers: PhysicalOptimizer::new(),
- query_planner: Arc::new(DefaultQueryPlanner {}),
- catalog_list,
- table_functions: HashMap::new(),
- scalar_functions: HashMap::new(),
- aggregate_functions: HashMap::new(),
- window_functions: HashMap::new(),
- serializer_registry: Arc::new(EmptySerializerRegistry),
- table_option_namespace: TableOptions::default_from_session_config(
- config.options(),
- ),
- config,
- execution_props: ExecutionProps::new(),
- runtime_env: runtime,
- table_factories,
- function_factory: None,
- };
-
- // register built in functions
- functions::register_all(&mut new_self)
- .expect("can not register built in functions");
-
- // register crate of array expressions (if enabled)
- #[cfg(feature = "array_expressions")]
- functions_array::register_all(&mut new_self)
- .expect("can not register array expressions");
-
- functions_aggregate::register_all(&mut new_self)
- .expect("can not register aggregate functions");
-
- new_self
- }
- /// Returns new [`SessionState`] using the provided
- /// [`SessionConfig`] and [`RuntimeEnv`].
- #[deprecated(
- since = "32.0.0",
- note = "Use SessionState::new_with_config_rt_and_catalog_list"
- )]
- pub fn with_config_rt_and_catalog_list(
- config: SessionConfig,
- runtime: Arc<RuntimeEnv>,
- catalog_list: Arc<dyn CatalogProviderList>,
- ) -> Self {
- Self::new_with_config_rt_and_catalog_list(config, runtime,
catalog_list)
- }
- fn register_default_schema(
- config: &SessionConfig,
- table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
- runtime: &Arc<RuntimeEnv>,
- default_catalog: &MemoryCatalogProvider,
- ) {
- let url = config.options().catalog.location.as_ref();
- let format = config.options().catalog.format.as_ref();
- let (url, format) = match (url, format) {
- (Some(url), Some(format)) => (url, format),
- _ => return,
- };
- let url = url.to_string();
- let format = format.to_string();
-
- let url = Url::parse(url.as_str()).expect("Invalid default catalog
location!");
- let authority = match url.host_str() {
- Some(host) => format!("{}://{}", url.scheme(), host),
- None => format!("{}://", url.scheme()),
- };
- let path = &url.as_str()[authority.len()..];
- let path = object_store::path::Path::parse(path).expect("Can't parse
path");
- let store = ObjectStoreUrl::parse(authority.as_str())
- .expect("Invalid default catalog url");
- let store = match runtime.object_store(store) {
- Ok(store) => store,
- _ => return,
- };
- let factory = match table_factories.get(format.as_str()) {
- Some(factory) => factory,
- _ => return,
- };
- let schema =
- ListingSchemaProvider::new(authority, path, factory.clone(),
store, format);
- let _ = default_catalog
- .register_schema("default", Arc::new(schema))
- .expect("Failed to register default schema");
- }
-
- fn resolve_table_ref(
- &self,
- table_ref: impl Into<TableReference>,
- ) -> ResolvedTableReference {
- let catalog = &self.config_options().catalog;
- table_ref
- .into()
- .resolve(&catalog.default_catalog, &catalog.default_schema)
- }
-
- pub(crate) fn schema_for_ref(
- &self,
- table_ref: impl Into<TableReference>,
- ) -> Result<Arc<dyn SchemaProvider>> {
- let resolved_ref = self.resolve_table_ref(table_ref);
- if self.config.information_schema() && *resolved_ref.schema ==
*INFORMATION_SCHEMA
- {
- return Ok(Arc::new(InformationSchemaProvider::new(
- self.catalog_list.clone(),
- )));
- }
-
- self.catalog_list
- .catalog(&resolved_ref.catalog)
- .ok_or_else(|| {
- plan_datafusion_err!(
- "failed to resolve catalog: {}",
- resolved_ref.catalog
- )
- })?
- .schema(&resolved_ref.schema)
- .ok_or_else(|| {
- plan_datafusion_err!("failed to resolve schema: {}",
resolved_ref.schema)
- })
- }
-
- /// Replace the random session id.
- pub fn with_session_id(mut self, session_id: String) -> Self {
- self.session_id = session_id;
- self
- }
-
- /// override default query planner with `query_planner`
- pub fn with_query_planner(
- mut self,
- query_planner: Arc<dyn QueryPlanner + Send + Sync>,
- ) -> Self {
- self.query_planner = query_planner;
- self
- }
-
- /// Override the [`AnalyzerRule`]s optimizer plan rules.
- pub fn with_analyzer_rules(
- mut self,
- rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
- ) -> Self {
- self.analyzer = Analyzer::with_rules(rules);
- self
- }
-
- /// Replace the entire list of [`OptimizerRule`]s used to optimize plans
- pub fn with_optimizer_rules(
- mut self,
- rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
- ) -> Self {
- self.optimizer = Optimizer::with_rules(rules);
- self
- }
-
- /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize
plans
- pub fn with_physical_optimizer_rules(
- mut self,
- physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
- ) -> Self {
- self.physical_optimizers =
PhysicalOptimizer::with_rules(physical_optimizers);
- self
- }
-
- /// Add `analyzer_rule` to the end of the list of
- /// [`AnalyzerRule`]s used to rewrite queries.
- pub fn add_analyzer_rule(
- mut self,
- analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
- ) -> Self {
- self.analyzer.rules.push(analyzer_rule);
- self
- }
-
- /// Add `optimizer_rule` to the end of the list of
- /// [`OptimizerRule`]s used to rewrite queries.
- pub fn add_optimizer_rule(
- mut self,
- optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
- ) -> Self {
- self.optimizer.rules.push(optimizer_rule);
- self
- }
-
- /// Add `physical_optimizer_rule` to the end of the list of
- /// [`PhysicalOptimizerRule`]s used to rewrite queries.
- pub fn add_physical_optimizer_rule(
- mut self,
- physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
- ) -> Self {
- self.physical_optimizers.rules.push(physical_optimizer_rule);
- self
- }
-
- /// Adds a new [`ConfigExtension`] to TableOptions
- pub fn add_table_options_extension<T: ConfigExtension>(
- mut self,
- extension: T,
- ) -> Self {
- self.table_option_namespace.extensions.insert(extension);
- self
- }
-
- /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
- pub fn with_function_factory(
- mut self,
- function_factory: Arc<dyn FunctionFactory>,
- ) -> Self {
- self.function_factory = Some(function_factory);
- self
- }
-
- /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
- pub fn set_function_factory(&mut self, function_factory: Arc<dyn
FunctionFactory>) {
- self.function_factory = Some(function_factory);
- }
-
- /// Replace the extension [`SerializerRegistry`]
- pub fn with_serializer_registry(
- mut self,
- registry: Arc<dyn SerializerRegistry>,
- ) -> Self {
- self.serializer_registry = registry;
- self
- }
-
- /// Get the table factories
- pub fn table_factories(&self) -> &HashMap<String, Arc<dyn
TableProviderFactory>> {
- &self.table_factories
- }
-
- /// Get the table factories
- pub fn table_factories_mut(
- &mut self,
- ) -> &mut HashMap<String, Arc<dyn TableProviderFactory>> {
- &mut self.table_factories
- }
-
- /// Parse an SQL string into an DataFusion specific AST
- /// [`Statement`]. See [`SessionContext::sql`] for running queries.
- pub fn sql_to_statement(
- &self,
- sql: &str,
- dialect: &str,
- ) -> Result<datafusion_sql::parser::Statement> {
- let dialect = dialect_from_str(dialect).ok_or_else(|| {
- plan_datafusion_err!(
- "Unsupported SQL dialect: {dialect}. Available dialects: \
- Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake,
Redshift, \
- MsSQL, ClickHouse, BigQuery, Ansi."
- )
- })?;
- let mut statements = DFParser::parse_sql_with_dialect(sql,
dialect.as_ref())?;
- if statements.len() > 1 {
- return not_impl_err!(
- "The context currently only supports a single SQL statement"
- );
- }
- let statement = statements.pop_front().ok_or_else(|| {
- DataFusionError::NotImplemented(
- "The context requires a statement!".to_string(),
- )
- })?;
- Ok(statement)
- }
-
- /// Resolve all table references in the SQL statement.
- pub fn resolve_table_references(
- &self,
- statement: &datafusion_sql::parser::Statement,
- ) -> Result<Vec<TableReference>> {
- use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES;
- use datafusion_sql::parser::Statement as DFStatement;
- use sqlparser::ast::*;
-
- // Getting `TableProviders` is async but planing is not -- thus
pre-fetch
- // table providers for all relations referenced in this query
- let mut relations = hashbrown::HashSet::with_capacity(10);
-
- struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
-
- impl<'a> RelationVisitor<'a> {
- /// Record that `relation` was used in this statement
- fn insert(&mut self, relation: &ObjectName) {
- self.0.get_or_insert_with(relation, |_| relation.clone());
- }
- }
-
- impl<'a> Visitor for RelationVisitor<'a> {
- type Break = ();
-
- fn pre_visit_relation(&mut self, relation: &ObjectName) ->
ControlFlow<()> {
- self.insert(relation);
- ControlFlow::Continue(())
- }
-
- fn pre_visit_statement(&mut self, statement: &Statement) ->
ControlFlow<()> {
- if let Statement::ShowCreate {
- obj_type: ShowCreateObject::Table | ShowCreateObject::View,
- obj_name,
- } = statement
- {
- self.insert(obj_name)
- }
- ControlFlow::Continue(())
- }
- }
-
- let mut visitor = RelationVisitor(&mut relations);
- fn visit_statement(statement: &DFStatement, visitor: &mut
RelationVisitor<'_>) {
- match statement {
- DFStatement::Statement(s) => {
- let _ = s.as_ref().visit(visitor);
- }
- DFStatement::CreateExternalTable(table) => {
- visitor
- .0
-
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
- }
- DFStatement::CopyTo(CopyToStatement { source, .. }) => match
source {
- CopyToSource::Relation(table_name) => {
- visitor.insert(table_name);
- }
- CopyToSource::Query(query) => {
- query.visit(visitor);
- }
- },
- DFStatement::Explain(explain) => {
- visit_statement(&explain.statement, visitor)
- }
- }
- }
-
- visit_statement(statement, &mut visitor);
-
- // Always include information_schema if available
- if self.config.information_schema() {
- for s in INFORMATION_SCHEMA_TABLES {
- relations.insert(ObjectName(vec![
- Ident::new(INFORMATION_SCHEMA),
- Ident::new(*s),
- ]));
- }
- }
-
- let enable_ident_normalization =
- self.config.options().sql_parser.enable_ident_normalization;
- relations
- .into_iter()
- .map(|x| object_name_to_table_reference(x,
enable_ident_normalization))
- .collect::<Result<_>>()
- }
-
- /// Convert an AST Statement into a LogicalPlan
- pub async fn statement_to_plan(
- &self,
- statement: datafusion_sql::parser::Statement,
- ) -> Result<LogicalPlan> {
- let references = self.resolve_table_references(&statement)?;
-
- let mut provider = SessionContextProvider {
- state: self,
- tables: HashMap::with_capacity(references.len()),
- };
-
- let enable_ident_normalization =
- self.config.options().sql_parser.enable_ident_normalization;
- let parse_float_as_decimal =
- self.config.options().sql_parser.parse_float_as_decimal;
- for reference in references {
- let resolved = &self.resolve_table_ref(reference);
- if let Entry::Vacant(v) =
provider.tables.entry(resolved.to_string()) {
- if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
- if let Some(table) = schema.table(&resolved.table).await? {
- v.insert(provider_as_source(table));
- }
- }
- }
- }
-
- let query = SqlToRel::new_with_options(
- &provider,
- ParserOptions {
- parse_float_as_decimal,
- enable_ident_normalization,
- },
- );
- query.statement_to_plan(statement)
- }
-
- /// Creates a [`LogicalPlan`] from the provided SQL string. This
- /// interface will plan any SQL DataFusion supports, including DML
- /// like `CREATE TABLE`, and `COPY` (which can write to local
- /// files.
- ///
- /// See [`SessionContext::sql`] and
- /// [`SessionContext::sql_with_options`] for a higher-level
- /// interface that handles DDL and verification of allowed
- /// statements.
- pub async fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
- let dialect = self.config.options().sql_parser.dialect.as_str();
- let statement = self.sql_to_statement(sql, dialect)?;
- let plan = self.statement_to_plan(statement).await?;
- Ok(plan)
- }
-
- /// Optimizes the logical plan by applying optimizer rules.
- pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
- if let LogicalPlan::Explain(e) = plan {
- let mut stringified_plans = e.stringified_plans.clone();
-
- // analyze & capture output of each rule
- let analyzer_result = self.analyzer.execute_and_check(
- e.plan.as_ref().clone(),
- self.options(),
- |analyzed_plan, analyzer| {
- let analyzer_name = analyzer.name().to_string();
- let plan_type = PlanType::AnalyzedLogicalPlan {
analyzer_name };
-
stringified_plans.push(analyzed_plan.to_stringified(plan_type));
- },
- );
- let analyzed_plan = match analyzer_result {
- Ok(plan) => plan,
- Err(DataFusionError::Context(analyzer_name, err)) => {
- let plan_type = PlanType::AnalyzedLogicalPlan {
analyzer_name };
- stringified_plans
- .push(StringifiedPlan::new(plan_type,
err.to_string()));
-
- return Ok(LogicalPlan::Explain(Explain {
- verbose: e.verbose,
- plan: e.plan.clone(),
- stringified_plans,
- schema: e.schema.clone(),
- logical_optimization_succeeded: false,
- }));
- }
- Err(e) => return Err(e),
- };
-
- // to delineate the analyzer & optimizer phases in explain output
- stringified_plans
-
.push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
-
- // optimize the child plan, capturing the output of each optimizer
- let optimized_plan = self.optimizer.optimize(
- analyzed_plan,
- self,
- |optimized_plan, optimizer| {
- let optimizer_name = optimizer.name().to_string();
- let plan_type = PlanType::OptimizedLogicalPlan {
optimizer_name };
-
stringified_plans.push(optimized_plan.to_stringified(plan_type));
- },
- );
- let (plan, logical_optimization_succeeded) = match optimized_plan {
- Ok(plan) => (Arc::new(plan), true),
- Err(DataFusionError::Context(optimizer_name, err)) => {
- let plan_type = PlanType::OptimizedLogicalPlan {
optimizer_name };
- stringified_plans
- .push(StringifiedPlan::new(plan_type,
err.to_string()));
- (e.plan.clone(), false)
- }
- Err(e) => return Err(e),
- };
-
- Ok(LogicalPlan::Explain(Explain {
- verbose: e.verbose,
- plan,
- stringified_plans,
- schema: e.schema.clone(),
- logical_optimization_succeeded,
- }))
- } else {
- let analyzed_plan = self.analyzer.execute_and_check(
- plan.clone(),
- self.options(),
- |_, _| {},
- )?;
- self.optimizer.optimize(analyzed_plan, self, |_, _| {})
- }
- }
-
- /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
- ///
- /// Note: this first calls [`Self::optimize`] on the provided
- /// plan.
- ///
- /// This function will error for [`LogicalPlan`]s such as catalog DDL like
- /// `CREATE TABLE`, which do not have corresponding physical plans and must
- /// be handled by another layer, typically [`SessionContext`].
- pub async fn create_physical_plan(
- &self,
- logical_plan: &LogicalPlan,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let logical_plan = self.optimize(logical_plan)?;
- self.query_planner
- .create_physical_plan(&logical_plan, self)
- .await
- }
-
- /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
- /// coercion, and function rewrites.
- ///
- /// Note: The expression is not [simplified] or otherwise optimized: `a =
1
- /// + 2` will not be simplified to `a = 3` as this is a more involved
process.
- /// See the [expr_api] example for how to simplify expressions.
- ///
- /// # See Also:
- /// * [`SessionContext::create_physical_expr`] for a higher-level API
- /// * [`create_physical_expr`] for a lower-level API
- ///
- /// [simplified]: datafusion_optimizer::simplify_expressions
- /// [expr_api]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
- pub fn create_physical_expr(
- &self,
- expr: Expr,
- df_schema: &DFSchema,
- ) -> Result<Arc<dyn PhysicalExpr>> {
- let simplifier =
- ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
- // apply type coercion here to ensure types match
- let mut expr = simplifier.coerce(expr, df_schema)?;
-
- // rewrite Exprs to functions if necessary
- let config_options = self.config_options();
- for rewrite in self.analyzer.function_rewrites() {
- expr = expr
- .transform_up(|expr| rewrite.rewrite(expr, df_schema,
config_options))?
- .data;
- }
- create_physical_expr(&expr, df_schema, self.execution_props())
- }
-
- /// Return the session ID
- pub fn session_id(&self) -> &str {
- &self.session_id
- }
-
- /// Return the runtime env
- pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
- &self.runtime_env
- }
-
- /// Return the execution properties
- pub fn execution_props(&self) -> &ExecutionProps {
- &self.execution_props
- }
-
- /// Return the [`SessionConfig`]
- pub fn config(&self) -> &SessionConfig {
- &self.config
- }
-
- /// Return the mutable [`SessionConfig`].
- pub fn config_mut(&mut self) -> &mut SessionConfig {
- &mut self.config
- }
-
- /// Return the physical optimizers
- pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule +
Send + Sync>] {
- &self.physical_optimizers.rules
- }
-
- /// return the configuration options
- pub fn config_options(&self) -> &ConfigOptions {
- self.config.options()
- }
-
- /// return the TableOptions options with its extensions
- pub fn default_table_options(&self) -> TableOptions {
- self.table_option_namespace
- .combine_with_session_config(self.config_options())
- }
-
- /// Get a new TaskContext to run in this session
- pub fn task_ctx(&self) -> Arc<TaskContext> {
- Arc::new(TaskContext::from(self))
- }
-
- /// Return catalog list
- pub fn catalog_list(&self) -> Arc<dyn CatalogProviderList> {
- self.catalog_list.clone()
- }
-
- /// Return reference to scalar_functions
- pub fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
- &self.scalar_functions
- }
-
- /// Return reference to aggregate_functions
- pub fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
- &self.aggregate_functions
- }
-
- /// Return reference to window functions
- pub fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
- &self.window_functions
- }
-
- /// Return [SerializerRegistry] for extensions
- pub fn serializer_registry(&self) -> Arc<dyn SerializerRegistry> {
- self.serializer_registry.clone()
- }
-
- /// Return version of the cargo package that produced this query
- pub fn version(&self) -> &str {
- env!("CARGO_PKG_VERSION")
- }
-}
-
-struct SessionSimplifyProvider<'a> {
- state: &'a SessionState,
- df_schema: &'a DFSchema,
-}
-
-impl<'a> SessionSimplifyProvider<'a> {
- fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
- Self { state, df_schema }
- }
-}
-
-impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
- fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
- Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
- }
-
- fn nullable(&self, expr: &Expr) -> Result<bool> {
- expr.nullable(self.df_schema)
- }
-
- fn execution_props(&self) -> &ExecutionProps {
- self.state.execution_props()
- }
-
- fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
- expr.get_type(self.df_schema)
- }
-}
-
-struct SessionContextProvider<'a> {
- state: &'a SessionState,
- tables: HashMap<String, Arc<dyn TableSource>>,
-}
-
-impl<'a> ContextProvider for SessionContextProvider<'a> {
- fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn
TableSource>> {
- let name = self.state.resolve_table_ref(name).to_string();
- self.tables
- .get(&name)
- .cloned()
- .ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
- }
-
- fn get_table_function_source(
- &self,
- name: &str,
- args: Vec<Expr>,
- ) -> Result<Arc<dyn TableSource>> {
- let tbl_func = self
- .state
- .table_functions
- .get(name)
- .cloned()
- .ok_or_else(|| plan_datafusion_err!("table function '{name}' not
found"))?;
- let provider = tbl_func.create_table_provider(&args)?;
-
- Ok(provider_as_source(provider))
- }
-
- /// Create a new CTE work table for a recursive CTE logical plan
- /// This table will be used in conjunction with a Worktable physical plan
- /// to read and write each iteration of a recursive CTE
- fn create_cte_work_table(
- &self,
- name: &str,
- schema: SchemaRef,
- ) -> Result<Arc<dyn TableSource>> {
- let table = Arc::new(CteWorkTable::new(name, schema));
- Ok(provider_as_source(table))
- }
-
- fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
- self.state.scalar_functions().get(name).cloned()
- }
-
- fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
- self.state.aggregate_functions().get(name).cloned()
- }
-
- fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
- self.state.window_functions().get(name).cloned()
- }
-
- fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType>
{
- if variable_names.is_empty() {
- return None;
- }
-
- let provider_type = if is_system_variables(variable_names) {
- VarType::System
- } else {
- VarType::UserDefined
- };
-
- self.state
- .execution_props
- .var_providers
- .as_ref()
- .and_then(|provider|
provider.get(&provider_type)?.get_type(variable_names))
- }
-
- fn options(&self) -> &ConfigOptions {
- self.state.config_options()
- }
-
- fn udf_names(&self) -> Vec<String> {
- self.state.scalar_functions().keys().cloned().collect()
- }
-
- fn udaf_names(&self) -> Vec<String> {
- self.state.aggregate_functions().keys().cloned().collect()
- }
-
- fn udwf_names(&self) -> Vec<String> {
- self.state.window_functions().keys().cloned().collect()
- }
-}
-
-impl FunctionRegistry for SessionState {
- fn udfs(&self) -> HashSet<String> {
- self.scalar_functions.keys().cloned().collect()
- }
-
- fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
- let result = self.scalar_functions.get(name);
-
- result.cloned().ok_or_else(|| {
- plan_datafusion_err!("There is no UDF named \"{name}\" in the
registry")
- })
- }
-
- fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
- let result = self.aggregate_functions.get(name);
-
- result.cloned().ok_or_else(|| {
- plan_datafusion_err!("There is no UDAF named \"{name}\" in the
registry")
- })
- }
-
- fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
- let result = self.window_functions.get(name);
-
- result.cloned().ok_or_else(|| {
- plan_datafusion_err!("There is no UDWF named \"{name}\" in the
registry")
- })
- }
-
- fn register_udf(&mut self, udf: Arc<ScalarUDF>) ->
Result<Option<Arc<ScalarUDF>>> {
- udf.aliases().iter().for_each(|alias| {
- self.scalar_functions.insert(alias.clone(), udf.clone());
- });
- Ok(self.scalar_functions.insert(udf.name().into(), udf))
- }
-
- fn register_udaf(
- &mut self,
- udaf: Arc<AggregateUDF>,
- ) -> Result<Option<Arc<AggregateUDF>>> {
- udaf.aliases().iter().for_each(|alias| {
- self.aggregate_functions.insert(alias.clone(), udaf.clone());
- });
- Ok(self.aggregate_functions.insert(udaf.name().into(), udaf))
- }
-
- fn register_udwf(&mut self, udwf: Arc<WindowUDF>) ->
Result<Option<Arc<WindowUDF>>> {
- udwf.aliases().iter().for_each(|alias| {
- self.window_functions.insert(alias.clone(), udwf.clone());
- });
- Ok(self.window_functions.insert(udwf.name().into(), udwf))
- }
-
- fn deregister_udf(&mut self, name: &str) -> Result<Option<Arc<ScalarUDF>>>
{
- let udf = self.scalar_functions.remove(name);
- if let Some(udf) = &udf {
- for alias in udf.aliases() {
- self.scalar_functions.remove(alias);
- }
- }
- Ok(udf)
- }
-
- fn deregister_udaf(&mut self, name: &str) ->
Result<Option<Arc<AggregateUDF>>> {
- let udaf = self.aggregate_functions.remove(name);
- if let Some(udaf) = &udaf {
- for alias in udaf.aliases() {
- self.aggregate_functions.remove(alias);
- }
- }
- Ok(udaf)
- }
-
- fn deregister_udwf(&mut self, name: &str) ->
Result<Option<Arc<WindowUDF>>> {
- let udwf = self.window_functions.remove(name);
- if let Some(udwf) = &udwf {
- for alias in udwf.aliases() {
- self.window_functions.remove(alias);
- }
- }
- Ok(udwf)
- }
-
- fn register_function_rewrite(
- &mut self,
- rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
- ) -> Result<()> {
- self.analyzer.add_function_rewrite(rewrite);
- Ok(())
- }
-}
-
-impl OptimizerConfig for SessionState {
- fn query_execution_start_time(&self) -> DateTime<Utc> {
- self.execution_props.query_execution_start_time
- }
-
- fn alias_generator(&self) -> Arc<AliasGenerator> {
- self.execution_props.alias_generator.clone()
- }
-
- fn options(&self) -> &ConfigOptions {
- self.config_options()
- }
-
- fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
- Some(self)
- }
-}
-
-/// Create a new task context instance from SessionContext
-impl From<&SessionContext> for TaskContext {
- fn from(session: &SessionContext) -> Self {
- TaskContext::from(&*session.state.read())
- }
-}
-
-/// Create a new task context instance from SessionState
-impl From<&SessionState> for TaskContext {
- fn from(state: &SessionState) -> Self {
- let task_id = None;
- TaskContext::new(
- task_id,
- state.session_id.clone(),
- state.config.clone(),
- state.scalar_functions.clone(),
- state.aggregate_functions.clone(),
- state.window_functions.clone(),
- state.runtime_env.clone(),
- )
- }
-}
-
/// Default implementation of [SerializerRegistry] that throws unimplemented
error
/// for all requests.
pub struct EmptySerializerRegistry;
@@ -2505,6 +1482,8 @@ mod tests {
use datafusion_common_runtime::SpawnedTask;
+ use crate::catalog::schema::SchemaProvider;
+ use crate::physical_planner::PhysicalPlanner;
use async_trait::async_trait;
use tempfile::TempDir;
@@ -2806,7 +1785,7 @@ mod tests {
let catalog_list_weak = {
let state = ctx.state.read();
- Arc::downgrade(&state.catalog_list)
+ Arc::downgrade(&state.catalog_list())
};
drop(ctx);
diff --git a/datafusion/core/src/execution/mod.rs
b/datafusion/core/src/execution/mod.rs
index 7e757fabac..ac02c73172 100644
--- a/datafusion/core/src/execution/mod.rs
+++ b/datafusion/core/src/execution/mod.rs
@@ -18,6 +18,8 @@
//! Shared state for query planning and execution.
pub mod context;
+pub mod session_state;
+
// backwards compatibility
pub use crate::datasource::file_format::options;
diff --git a/datafusion/core/src/execution/session_state.rs
b/datafusion/core/src/execution/session_state.rs
new file mode 100644
index 0000000000..3b072ebb73
--- /dev/null
+++ b/datafusion/core/src/execution/session_state.rs
@@ -0,0 +1,1148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`SessionState`]: information required to run queries in a session
+
+use crate::catalog::information_schema::{InformationSchemaProvider,
INFORMATION_SCHEMA};
+use crate::catalog::listing_schema::ListingSchemaProvider;
+use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
+use crate::catalog::{
+ CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
+ MemoryCatalogProviderList,
+};
+use crate::datasource::cte_worktable::CteWorkTable;
+use crate::datasource::function::{TableFunction, TableFunctionImpl};
+use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
+use crate::datasource::provider_as_source;
+use crate::execution::context::{EmptySerializerRegistry, FunctionFactory,
QueryPlanner};
+#[cfg(feature = "array_expressions")]
+use crate::functions_array;
+use crate::physical_optimizer::optimizer::PhysicalOptimizer;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
+use crate::{functions, functions_aggregate};
+use arrow_schema::{DataType, SchemaRef};
+use async_trait::async_trait;
+use chrono::{DateTime, Utc};
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
+use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
+use datafusion_common::tree_node::TreeNode;
+use datafusion_common::{
+ not_impl_err, plan_datafusion_err, DFSchema, DataFusionError,
ResolvedTableReference,
+ TableReference,
+};
+use datafusion_execution::config::SessionConfig;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ExecutionProps;
+use datafusion_expr::expr_rewriter::FunctionRewrite;
+use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
+use datafusion_expr::simplify::SimplifyInfo;
+use datafusion_expr::var_provider::{is_system_variables, VarType};
+use datafusion_expr::{
+ AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF,
TableSource,
+ WindowUDF,
+};
+use datafusion_optimizer::simplify_expressions::ExprSimplifier;
+use datafusion_optimizer::{
+ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
+};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser,
Statement};
+use datafusion_sql::planner::{
+ object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel,
+};
+use sqlparser::dialect::dialect_from_str;
+use std::collections::hash_map::Entry;
+use std::collections::{HashMap, HashSet};
+use std::fmt::Debug;
+use std::ops::ControlFlow;
+use std::sync::Arc;
+use url::Url;
+use uuid::Uuid;
+
+/// Execution context for registering data sources and executing queries.
+/// See [`SessionContext`] for a higher level API.
+///
+/// Note that there is no `Default` or `new()` for SessionState,
+/// to avoid accidentally running queries or other operations without passing
through
+/// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionContext`].
+///
+/// [`SessionContext`]: crate::execution::context::SessionContext
+#[derive(Clone)]
+pub struct SessionState {
+ /// A unique UUID that identifies the session
+ session_id: String,
+ /// Responsible for analyzing and rewrite a logical plan before
optimization
+ analyzer: Analyzer,
+ /// Responsible for optimizing a logical plan
+ optimizer: Optimizer,
+ /// Responsible for optimizing a physical execution plan
+ physical_optimizers: PhysicalOptimizer,
+ /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
+ query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+ /// Collection of catalogs containing schemas and ultimately TableProviders
+ catalog_list: Arc<dyn CatalogProviderList>,
+ /// Table Functions
+ table_functions: HashMap<String, Arc<TableFunction>>,
+ /// Scalar functions that are registered with the context
+ scalar_functions: HashMap<String, Arc<ScalarUDF>>,
+ /// Aggregate functions registered in the context
+ aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ /// Window functions registered in the context
+ window_functions: HashMap<String, Arc<WindowUDF>>,
+ /// Deserializer registry for extensions.
+ serializer_registry: Arc<dyn SerializerRegistry>,
+ /// Session configuration
+ config: SessionConfig,
+ /// Table options
+ table_options: TableOptions,
+ /// Execution properties
+ execution_props: ExecutionProps,
+ /// TableProviderFactories for different file formats.
+ ///
+ /// Maps strings like "JSON" to an instance of [`TableProviderFactory`]
+ ///
+ /// This is used to create [`TableProvider`] instances for the
+ /// `CREATE EXTERNAL TABLE ... STORED AS <FORMAT>` for custom file
+ /// formats other than those built into DataFusion
+ ///
+ /// [`TableProvider`]: crate::datasource::provider::TableProvider
+ table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
+ /// Runtime environment
+ runtime_env: Arc<RuntimeEnv>,
+
+ /// [FunctionFactory] to support pluggable user defined function handler.
+ ///
+ /// It will be invoked on `CREATE FUNCTION` statements.
+ /// thus, changing dialect o PostgreSql is required
+ function_factory: Option<Arc<dyn FunctionFactory>>,
+}
+
+impl Debug for SessionState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SessionState")
+ .field("session_id", &self.session_id)
+ // TODO should we print out more?
+ .finish()
+ }
+}
+
+impl SessionState {
+ /// Returns new [`SessionState`] using the provided
+ /// [`SessionConfig`] and [`RuntimeEnv`].
+ pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>)
-> Self {
+ let catalog_list =
+ Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn
CatalogProviderList>;
+ Self::new_with_config_rt_and_catalog_list(config, runtime,
catalog_list)
+ }
+
+ /// Returns new [`SessionState`] using the provided
+ /// [`SessionConfig`] and [`RuntimeEnv`].
+ #[deprecated(since = "32.0.0", note = "Use
SessionState::new_with_config_rt")]
+ pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) ->
Self {
+ Self::new_with_config_rt(config, runtime)
+ }
+
+ /// Returns new [`SessionState`] using the provided
+ /// [`SessionConfig`], [`RuntimeEnv`], and [`CatalogProviderList`]
+ pub fn new_with_config_rt_and_catalog_list(
+ config: SessionConfig,
+ runtime: Arc<RuntimeEnv>,
+ catalog_list: Arc<dyn CatalogProviderList>,
+ ) -> Self {
+ let session_id = Uuid::new_v4().to_string();
+
+ // Create table_factories for all default formats
+ let mut table_factories: HashMap<String, Arc<dyn
TableProviderFactory>> =
+ HashMap::new();
+ #[cfg(feature = "parquet")]
+ table_factories.insert("PARQUET".into(),
Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("CSV".into(),
Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("JSON".into(),
Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("NDJSON".into(),
Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("AVRO".into(),
Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("ARROW".into(),
Arc::new(DefaultTableFactory::new()));
+
+ if config.create_default_catalog_and_schema() {
+ let default_catalog = MemoryCatalogProvider::new();
+
+ default_catalog
+ .register_schema(
+ &config.options().catalog.default_schema,
+ Arc::new(MemorySchemaProvider::new()),
+ )
+ .expect("memory catalog provider can register schema");
+
+ Self::register_default_schema(
+ &config,
+ &table_factories,
+ &runtime,
+ &default_catalog,
+ );
+
+ catalog_list.register_catalog(
+ config.options().catalog.default_catalog.clone(),
+ Arc::new(default_catalog),
+ );
+ }
+
+ let mut new_self = SessionState {
+ session_id,
+ analyzer: Analyzer::new(),
+ optimizer: Optimizer::new(),
+ physical_optimizers: PhysicalOptimizer::new(),
+ query_planner: Arc::new(DefaultQueryPlanner {}),
+ catalog_list,
+ table_functions: HashMap::new(),
+ scalar_functions: HashMap::new(),
+ aggregate_functions: HashMap::new(),
+ window_functions: HashMap::new(),
+ serializer_registry: Arc::new(EmptySerializerRegistry),
+ table_options:
TableOptions::default_from_session_config(config.options()),
+ config,
+ execution_props: ExecutionProps::new(),
+ runtime_env: runtime,
+ table_factories,
+ function_factory: None,
+ };
+
+ // register built in functions
+ functions::register_all(&mut new_self)
+ .expect("can not register built in functions");
+
+ // register crate of array expressions (if enabled)
+ #[cfg(feature = "array_expressions")]
+ functions_array::register_all(&mut new_self)
+ .expect("can not register array expressions");
+
+ functions_aggregate::register_all(&mut new_self)
+ .expect("can not register aggregate functions");
+
+ new_self
+ }
+ /// Returns new [`SessionState`] using the provided
+ /// [`SessionConfig`] and [`RuntimeEnv`].
+ #[deprecated(
+ since = "32.0.0",
+ note = "Use SessionState::new_with_config_rt_and_catalog_list"
+ )]
+ pub fn with_config_rt_and_catalog_list(
+ config: SessionConfig,
+ runtime: Arc<RuntimeEnv>,
+ catalog_list: Arc<dyn CatalogProviderList>,
+ ) -> Self {
+ Self::new_with_config_rt_and_catalog_list(config, runtime,
catalog_list)
+ }
+ fn register_default_schema(
+ config: &SessionConfig,
+ table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
+ runtime: &Arc<RuntimeEnv>,
+ default_catalog: &MemoryCatalogProvider,
+ ) {
+ let url = config.options().catalog.location.as_ref();
+ let format = config.options().catalog.format.as_ref();
+ let (url, format) = match (url, format) {
+ (Some(url), Some(format)) => (url, format),
+ _ => return,
+ };
+ let url = url.to_string();
+ let format = format.to_string();
+
+ let url = Url::parse(url.as_str()).expect("Invalid default catalog
location!");
+ let authority = match url.host_str() {
+ Some(host) => format!("{}://{}", url.scheme(), host),
+ None => format!("{}://", url.scheme()),
+ };
+ let path = &url.as_str()[authority.len()..];
+ let path = object_store::path::Path::parse(path).expect("Can't parse
path");
+ let store = ObjectStoreUrl::parse(authority.as_str())
+ .expect("Invalid default catalog url");
+ let store = match runtime.object_store(store) {
+ Ok(store) => store,
+ _ => return,
+ };
+ let factory = match table_factories.get(format.as_str()) {
+ Some(factory) => factory,
+ _ => return,
+ };
+ let schema =
+ ListingSchemaProvider::new(authority, path, factory.clone(),
store, format);
+ let _ = default_catalog
+ .register_schema("default", Arc::new(schema))
+ .expect("Failed to register default schema");
+ }
+
+ pub(crate) fn resolve_table_ref(
+ &self,
+ table_ref: impl Into<TableReference>,
+ ) -> ResolvedTableReference {
+ let catalog = &self.config_options().catalog;
+ table_ref
+ .into()
+ .resolve(&catalog.default_catalog, &catalog.default_schema)
+ }
+
+ pub(crate) fn schema_for_ref(
+ &self,
+ table_ref: impl Into<TableReference>,
+ ) -> datafusion_common::Result<Arc<dyn SchemaProvider>> {
+ let resolved_ref = self.resolve_table_ref(table_ref);
+ if self.config.information_schema() && *resolved_ref.schema ==
*INFORMATION_SCHEMA
+ {
+ return Ok(Arc::new(InformationSchemaProvider::new(
+ self.catalog_list.clone(),
+ )));
+ }
+
+ self.catalog_list
+ .catalog(&resolved_ref.catalog)
+ .ok_or_else(|| {
+ plan_datafusion_err!(
+ "failed to resolve catalog: {}",
+ resolved_ref.catalog
+ )
+ })?
+ .schema(&resolved_ref.schema)
+ .ok_or_else(|| {
+ plan_datafusion_err!("failed to resolve schema: {}",
resolved_ref.schema)
+ })
+ }
+
+ /// Replace the random session id.
+ pub fn with_session_id(mut self, session_id: String) -> Self {
+ self.session_id = session_id;
+ self
+ }
+
+ /// override default query planner with `query_planner`
+ pub fn with_query_planner(
+ mut self,
+ query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+ ) -> Self {
+ self.query_planner = query_planner;
+ self
+ }
+
+ /// Override the [`AnalyzerRule`]s optimizer plan rules.
+ pub fn with_analyzer_rules(
+ mut self,
+ rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
+ ) -> Self {
+ self.analyzer = Analyzer::with_rules(rules);
+ self
+ }
+
+ /// Replace the entire list of [`OptimizerRule`]s used to optimize plans
+ pub fn with_optimizer_rules(
+ mut self,
+ rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+ ) -> Self {
+ self.optimizer = Optimizer::with_rules(rules);
+ self
+ }
+
+ /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize
plans
+ pub fn with_physical_optimizer_rules(
+ mut self,
+ physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+ ) -> Self {
+ self.physical_optimizers =
PhysicalOptimizer::with_rules(physical_optimizers);
+ self
+ }
+
+ /// Add `analyzer_rule` to the end of the list of
+ /// [`AnalyzerRule`]s used to rewrite queries.
+ pub fn add_analyzer_rule(
+ mut self,
+ analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
+ ) -> Self {
+ self.analyzer.rules.push(analyzer_rule);
+ self
+ }
+
+ /// Add `optimizer_rule` to the end of the list of
+ /// [`OptimizerRule`]s used to rewrite queries.
+ pub fn add_optimizer_rule(
+ mut self,
+ optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
+ ) -> Self {
+ self.optimizer.rules.push(optimizer_rule);
+ self
+ }
+
+ /// Add `physical_optimizer_rule` to the end of the list of
+ /// [`PhysicalOptimizerRule`]s used to rewrite queries.
+ pub fn add_physical_optimizer_rule(
+ mut self,
+ physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
+ ) -> Self {
+ self.physical_optimizers.rules.push(physical_optimizer_rule);
+ self
+ }
+
+ /// Adds a new [`ConfigExtension`] to TableOptions
+ pub fn add_table_options_extension<T: ConfigExtension>(
+ mut self,
+ extension: T,
+ ) -> Self {
+ self.table_options.extensions.insert(extension);
+ self
+ }
+
+ /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
+ pub fn with_function_factory(
+ mut self,
+ function_factory: Arc<dyn FunctionFactory>,
+ ) -> Self {
+ self.function_factory = Some(function_factory);
+ self
+ }
+
+ /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
+ pub fn set_function_factory(&mut self, function_factory: Arc<dyn
FunctionFactory>) {
+ self.function_factory = Some(function_factory);
+ }
+
+ /// Replace the extension [`SerializerRegistry`]
+ pub fn with_serializer_registry(
+ mut self,
+ registry: Arc<dyn SerializerRegistry>,
+ ) -> Self {
+ self.serializer_registry = registry;
+ self
+ }
+
+ /// Get the function factory
+ pub fn function_factory(&self) -> Option<&Arc<dyn FunctionFactory>> {
+ self.function_factory.as_ref()
+ }
+
+ /// Get the table factories
+ pub fn table_factories(&self) -> &HashMap<String, Arc<dyn
TableProviderFactory>> {
+ &self.table_factories
+ }
+
+ /// Get the table factories
+ pub fn table_factories_mut(
+ &mut self,
+ ) -> &mut HashMap<String, Arc<dyn TableProviderFactory>> {
+ &mut self.table_factories
+ }
+
+ /// Parse an SQL string into an DataFusion specific AST
+ /// [`Statement`]. See [`SessionContext::sql`] for running queries.
+ ///
+ /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql
+ pub fn sql_to_statement(
+ &self,
+ sql: &str,
+ dialect: &str,
+ ) -> datafusion_common::Result<Statement> {
+ let dialect = dialect_from_str(dialect).ok_or_else(|| {
+ plan_datafusion_err!(
+ "Unsupported SQL dialect: {dialect}. Available dialects: \
+ Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake,
Redshift, \
+ MsSQL, ClickHouse, BigQuery, Ansi."
+ )
+ })?;
+ let mut statements = DFParser::parse_sql_with_dialect(sql,
dialect.as_ref())?;
+ if statements.len() > 1 {
+ return not_impl_err!(
+ "The context currently only supports a single SQL statement"
+ );
+ }
+ let statement = statements.pop_front().ok_or_else(|| {
+ DataFusionError::NotImplemented(
+ "The context requires a statement!".to_string(),
+ )
+ })?;
+ Ok(statement)
+ }
+
+ /// Resolve all table references in the SQL statement.
+ pub fn resolve_table_references(
+ &self,
+ statement: &datafusion_sql::parser::Statement,
+ ) -> datafusion_common::Result<Vec<TableReference>> {
+ use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES;
+ use datafusion_sql::parser::Statement as DFStatement;
+ use sqlparser::ast::*;
+
+ // Getting `TableProviders` is async but planing is not -- thus
pre-fetch
+ // table providers for all relations referenced in this query
+ let mut relations = hashbrown::HashSet::with_capacity(10);
+
+ struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
+
+ impl<'a> RelationVisitor<'a> {
+ /// Record that `relation` was used in this statement
+ fn insert(&mut self, relation: &ObjectName) {
+ self.0.get_or_insert_with(relation, |_| relation.clone());
+ }
+ }
+
+ impl<'a> Visitor for RelationVisitor<'a> {
+ type Break = ();
+
+ fn pre_visit_relation(&mut self, relation: &ObjectName) ->
ControlFlow<()> {
+ self.insert(relation);
+ ControlFlow::Continue(())
+ }
+
+ fn pre_visit_statement(&mut self, statement: &Statement) ->
ControlFlow<()> {
+ if let Statement::ShowCreate {
+ obj_type: ShowCreateObject::Table | ShowCreateObject::View,
+ obj_name,
+ } = statement
+ {
+ self.insert(obj_name)
+ }
+ ControlFlow::Continue(())
+ }
+ }
+
+ let mut visitor = RelationVisitor(&mut relations);
+ fn visit_statement(statement: &DFStatement, visitor: &mut
RelationVisitor<'_>) {
+ match statement {
+ DFStatement::Statement(s) => {
+ let _ = s.as_ref().visit(visitor);
+ }
+ DFStatement::CreateExternalTable(table) => {
+ visitor
+ .0
+
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
+ }
+ DFStatement::CopyTo(CopyToStatement { source, .. }) => match
source {
+ CopyToSource::Relation(table_name) => {
+ visitor.insert(table_name);
+ }
+ CopyToSource::Query(query) => {
+ query.visit(visitor);
+ }
+ },
+ DFStatement::Explain(explain) => {
+ visit_statement(&explain.statement, visitor)
+ }
+ }
+ }
+
+ visit_statement(statement, &mut visitor);
+
+ // Always include information_schema if available
+ if self.config.information_schema() {
+ for s in INFORMATION_SCHEMA_TABLES {
+ relations.insert(ObjectName(vec![
+ Ident::new(INFORMATION_SCHEMA),
+ Ident::new(*s),
+ ]));
+ }
+ }
+
+ let enable_ident_normalization =
+ self.config.options().sql_parser.enable_ident_normalization;
+ relations
+ .into_iter()
+ .map(|x| object_name_to_table_reference(x,
enable_ident_normalization))
+ .collect::<datafusion_common::Result<_>>()
+ }
+
+ /// Convert an AST Statement into a LogicalPlan
+ pub async fn statement_to_plan(
+ &self,
+ statement: datafusion_sql::parser::Statement,
+ ) -> datafusion_common::Result<LogicalPlan> {
+ let references = self.resolve_table_references(&statement)?;
+
+ let mut provider = SessionContextProvider {
+ state: self,
+ tables: HashMap::with_capacity(references.len()),
+ };
+
+ let enable_ident_normalization =
+ self.config.options().sql_parser.enable_ident_normalization;
+ let parse_float_as_decimal =
+ self.config.options().sql_parser.parse_float_as_decimal;
+ for reference in references {
+ let resolved = &self.resolve_table_ref(reference);
+ if let Entry::Vacant(v) =
provider.tables.entry(resolved.to_string()) {
+ if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
+ if let Some(table) = schema.table(&resolved.table).await? {
+ v.insert(provider_as_source(table));
+ }
+ }
+ }
+ }
+
+ let query = SqlToRel::new_with_options(
+ &provider,
+ ParserOptions {
+ parse_float_as_decimal,
+ enable_ident_normalization,
+ },
+ );
+ query.statement_to_plan(statement)
+ }
+
+ /// Creates a [`LogicalPlan`] from the provided SQL string. This
+ /// interface will plan any SQL DataFusion supports, including DML
+ /// like `CREATE TABLE`, and `COPY` (which can write to local
+ /// files.
+ ///
+ /// See [`SessionContext::sql`] and
+ /// [`SessionContext::sql_with_options`] for a higher-level
+ /// interface that handles DDL and verification of allowed
+ /// statements.
+ ///
+ /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql
+ /// [`SessionContext::sql_with_options`]:
crate::execution::context::SessionContext::sql_with_options
+ pub async fn create_logical_plan(
+ &self,
+ sql: &str,
+ ) -> datafusion_common::Result<LogicalPlan> {
+ let dialect = self.config.options().sql_parser.dialect.as_str();
+ let statement = self.sql_to_statement(sql, dialect)?;
+ let plan = self.statement_to_plan(statement).await?;
+ Ok(plan)
+ }
+
+ /// Optimizes the logical plan by applying optimizer rules.
+ pub fn optimize(&self, plan: &LogicalPlan) ->
datafusion_common::Result<LogicalPlan> {
+ if let LogicalPlan::Explain(e) = plan {
+ let mut stringified_plans = e.stringified_plans.clone();
+
+ // analyze & capture output of each rule
+ let analyzer_result = self.analyzer.execute_and_check(
+ e.plan.as_ref().clone(),
+ self.options(),
+ |analyzed_plan, analyzer| {
+ let analyzer_name = analyzer.name().to_string();
+ let plan_type = PlanType::AnalyzedLogicalPlan {
analyzer_name };
+
stringified_plans.push(analyzed_plan.to_stringified(plan_type));
+ },
+ );
+ let analyzed_plan = match analyzer_result {
+ Ok(plan) => plan,
+ Err(DataFusionError::Context(analyzer_name, err)) => {
+ let plan_type = PlanType::AnalyzedLogicalPlan {
analyzer_name };
+ stringified_plans
+ .push(StringifiedPlan::new(plan_type,
err.to_string()));
+
+ return Ok(LogicalPlan::Explain(Explain {
+ verbose: e.verbose,
+ plan: e.plan.clone(),
+ stringified_plans,
+ schema: e.schema.clone(),
+ logical_optimization_succeeded: false,
+ }));
+ }
+ Err(e) => return Err(e),
+ };
+
+ // to delineate the analyzer & optimizer phases in explain output
+ stringified_plans
+
.push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
+
+ // optimize the child plan, capturing the output of each optimizer
+ let optimized_plan = self.optimizer.optimize(
+ analyzed_plan,
+ self,
+ |optimized_plan, optimizer| {
+ let optimizer_name = optimizer.name().to_string();
+ let plan_type = PlanType::OptimizedLogicalPlan {
optimizer_name };
+
stringified_plans.push(optimized_plan.to_stringified(plan_type));
+ },
+ );
+ let (plan, logical_optimization_succeeded) = match optimized_plan {
+ Ok(plan) => (Arc::new(plan), true),
+ Err(DataFusionError::Context(optimizer_name, err)) => {
+ let plan_type = PlanType::OptimizedLogicalPlan {
optimizer_name };
+ stringified_plans
+ .push(StringifiedPlan::new(plan_type,
err.to_string()));
+ (e.plan.clone(), false)
+ }
+ Err(e) => return Err(e),
+ };
+
+ Ok(LogicalPlan::Explain(Explain {
+ verbose: e.verbose,
+ plan,
+ stringified_plans,
+ schema: e.schema.clone(),
+ logical_optimization_succeeded,
+ }))
+ } else {
+ let analyzed_plan = self.analyzer.execute_and_check(
+ plan.clone(),
+ self.options(),
+ |_, _| {},
+ )?;
+ self.optimizer.optimize(analyzed_plan, self, |_, _| {})
+ }
+ }
+
+ /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
+ ///
+ /// Note: this first calls [`Self::optimize`] on the provided
+ /// plan.
+ ///
+ /// This function will error for [`LogicalPlan`]s such as catalog DDL like
+ /// `CREATE TABLE`, which do not have corresponding physical plans and must
+ /// be handled by another layer, typically [`SessionContext`].
+ ///
+ /// [`SessionContext`]: crate::execution::context::SessionContext
+ pub async fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ let logical_plan = self.optimize(logical_plan)?;
+ self.query_planner
+ .create_physical_plan(&logical_plan, self)
+ .await
+ }
+
+ /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
+ /// coercion, and function rewrites.
+ ///
+ /// Note: The expression is not [simplified] or otherwise optimized: `a =
1
+ /// + 2` will not be simplified to `a = 3` as this is a more involved
process.
+ /// See the [expr_api] example for how to simplify expressions.
+ ///
+ /// # See Also:
+ /// * [`SessionContext::create_physical_expr`] for a higher-level API
+ /// * [`create_physical_expr`] for a lower-level API
+ ///
+ /// [simplified]: datafusion_optimizer::simplify_expressions
+ /// [expr_api]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+ /// [`SessionContext::create_physical_expr`]:
crate::execution::context::SessionContext::create_physical_expr
+ pub fn create_physical_expr(
+ &self,
+ expr: Expr,
+ df_schema: &DFSchema,
+ ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
+ let simplifier =
+ ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
+ // apply type coercion here to ensure types match
+ let mut expr = simplifier.coerce(expr, df_schema)?;
+
+ // rewrite Exprs to functions if necessary
+ let config_options = self.config_options();
+ for rewrite in self.analyzer.function_rewrites() {
+ expr = expr
+ .transform_up(|expr| rewrite.rewrite(expr, df_schema,
config_options))?
+ .data;
+ }
+ create_physical_expr(&expr, df_schema, self.execution_props())
+ }
+
+ /// Return the session ID
+ pub fn session_id(&self) -> &str {
+ &self.session_id
+ }
+
+ /// Return the runtime env
+ pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+ &self.runtime_env
+ }
+
+ /// Return the execution properties
+ pub fn execution_props(&self) -> &ExecutionProps {
+ &self.execution_props
+ }
+
+ /// Return mutable execution properties
+ pub fn execution_props_mut(&mut self) -> &mut ExecutionProps {
+ &mut self.execution_props
+ }
+
+ /// Return the [`SessionConfig`]
+ pub fn config(&self) -> &SessionConfig {
+ &self.config
+ }
+
+ /// Return the mutable [`SessionConfig`].
+ pub fn config_mut(&mut self) -> &mut SessionConfig {
+ &mut self.config
+ }
+
+ /// Return the physical optimizers
+ pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule +
Send + Sync>] {
+ &self.physical_optimizers.rules
+ }
+
+ /// return the configuration options
+ pub fn config_options(&self) -> &ConfigOptions {
+ self.config.options()
+ }
+
+ /// return the TableOptions options with its extensions
+ pub fn default_table_options(&self) -> TableOptions {
+ self.table_options
+ .combine_with_session_config(self.config_options())
+ }
+
+ /// Return the table options
+ pub fn table_options(&self) -> &TableOptions {
+ &self.table_options
+ }
+
+ /// Return mutable table opptions
+ pub fn table_options_mut(&mut self) -> &mut TableOptions {
+ &mut self.table_options
+ }
+
+ /// Registers a [`ConfigExtension`] as a table option extention that can be
+ /// referenced from SQL statements executed against this context.
+ pub fn register_table_options_extension<T: ConfigExtension>(&mut self,
extension: T) {
+ self.table_options.extensions.insert(extension)
+ }
+
+ /// Get a new TaskContext to run in this session
+ pub fn task_ctx(&self) -> Arc<TaskContext> {
+ Arc::new(TaskContext::from(self))
+ }
+
+ /// Return catalog list
+ pub fn catalog_list(&self) -> Arc<dyn CatalogProviderList> {
+ self.catalog_list.clone()
+ }
+
+ /// set the catalog list
+ pub(crate) fn register_catalog_list(
+ &mut self,
+ catalog_list: Arc<dyn CatalogProviderList>,
+ ) {
+ self.catalog_list = catalog_list;
+ }
+
+ /// Return reference to scalar_functions
+ pub fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
+ &self.scalar_functions
+ }
+
+ /// Return reference to aggregate_functions
+ pub fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
+ &self.aggregate_functions
+ }
+
+ /// Return reference to window functions
+ pub fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
+ &self.window_functions
+ }
+
+ /// Return [SerializerRegistry] for extensions
+ pub fn serializer_registry(&self) -> Arc<dyn SerializerRegistry> {
+ self.serializer_registry.clone()
+ }
+
+ /// Return version of the cargo package that produced this query
+ pub fn version(&self) -> &str {
+ env!("CARGO_PKG_VERSION")
+ }
+
+ /// Register a user defined table function
+ pub fn register_udtf(&mut self, name: &str, fun: Arc<dyn
TableFunctionImpl>) {
+ self.table_functions.insert(
+ name.to_owned(),
+ Arc::new(TableFunction::new(name.to_owned(), fun)),
+ );
+ }
+}
+
+struct SessionContextProvider<'a> {
+ state: &'a SessionState,
+ tables: HashMap<String, Arc<dyn TableSource>>,
+}
+
+impl<'a> ContextProvider for SessionContextProvider<'a> {
+ fn get_table_source(
+ &self,
+ name: TableReference,
+ ) -> datafusion_common::Result<Arc<dyn TableSource>> {
+ let name = self.state.resolve_table_ref(name).to_string();
+ self.tables
+ .get(&name)
+ .cloned()
+ .ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
+ }
+
+ fn get_table_function_source(
+ &self,
+ name: &str,
+ args: Vec<Expr>,
+ ) -> datafusion_common::Result<Arc<dyn TableSource>> {
+ let tbl_func = self
+ .state
+ .table_functions
+ .get(name)
+ .cloned()
+ .ok_or_else(|| plan_datafusion_err!("table function '{name}' not
found"))?;
+ let provider = tbl_func.create_table_provider(&args)?;
+
+ Ok(provider_as_source(provider))
+ }
+
+ /// Create a new CTE work table for a recursive CTE logical plan
+ /// This table will be used in conjunction with a Worktable physical plan
+ /// to read and write each iteration of a recursive CTE
+ fn create_cte_work_table(
+ &self,
+ name: &str,
+ schema: SchemaRef,
+ ) -> datafusion_common::Result<Arc<dyn TableSource>> {
+ let table = Arc::new(CteWorkTable::new(name, schema));
+ Ok(provider_as_source(table))
+ }
+
+ fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
+ self.state.scalar_functions().get(name).cloned()
+ }
+
+ fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
+ self.state.aggregate_functions().get(name).cloned()
+ }
+
+ fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
+ self.state.window_functions().get(name).cloned()
+ }
+
+ fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType>
{
+ if variable_names.is_empty() {
+ return None;
+ }
+
+ let provider_type = if is_system_variables(variable_names) {
+ VarType::System
+ } else {
+ VarType::UserDefined
+ };
+
+ self.state
+ .execution_props
+ .var_providers
+ .as_ref()
+ .and_then(|provider|
provider.get(&provider_type)?.get_type(variable_names))
+ }
+
+ fn options(&self) -> &ConfigOptions {
+ self.state.config_options()
+ }
+
+ fn udf_names(&self) -> Vec<String> {
+ self.state.scalar_functions().keys().cloned().collect()
+ }
+
+ fn udaf_names(&self) -> Vec<String> {
+ self.state.aggregate_functions().keys().cloned().collect()
+ }
+
+ fn udwf_names(&self) -> Vec<String> {
+ self.state.window_functions().keys().cloned().collect()
+ }
+}
+
+impl FunctionRegistry for SessionState {
+ fn udfs(&self) -> HashSet<String> {
+ self.scalar_functions.keys().cloned().collect()
+ }
+
+ fn udf(&self, name: &str) -> datafusion_common::Result<Arc<ScalarUDF>> {
+ let result = self.scalar_functions.get(name);
+
+ result.cloned().ok_or_else(|| {
+ plan_datafusion_err!("There is no UDF named \"{name}\" in the
registry")
+ })
+ }
+
+ fn udaf(&self, name: &str) -> datafusion_common::Result<Arc<AggregateUDF>>
{
+ let result = self.aggregate_functions.get(name);
+
+ result.cloned().ok_or_else(|| {
+ plan_datafusion_err!("There is no UDAF named \"{name}\" in the
registry")
+ })
+ }
+
+ fn udwf(&self, name: &str) -> datafusion_common::Result<Arc<WindowUDF>> {
+ let result = self.window_functions.get(name);
+
+ result.cloned().ok_or_else(|| {
+ plan_datafusion_err!("There is no UDWF named \"{name}\" in the
registry")
+ })
+ }
+
+ fn register_udf(
+ &mut self,
+ udf: Arc<ScalarUDF>,
+ ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
+ udf.aliases().iter().for_each(|alias| {
+ self.scalar_functions.insert(alias.clone(), udf.clone());
+ });
+ Ok(self.scalar_functions.insert(udf.name().into(), udf))
+ }
+
+ fn register_udaf(
+ &mut self,
+ udaf: Arc<AggregateUDF>,
+ ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
+ udaf.aliases().iter().for_each(|alias| {
+ self.aggregate_functions.insert(alias.clone(), udaf.clone());
+ });
+ Ok(self.aggregate_functions.insert(udaf.name().into(), udaf))
+ }
+
+ fn register_udwf(
+ &mut self,
+ udwf: Arc<WindowUDF>,
+ ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
+ udwf.aliases().iter().for_each(|alias| {
+ self.window_functions.insert(alias.clone(), udwf.clone());
+ });
+ Ok(self.window_functions.insert(udwf.name().into(), udwf))
+ }
+
+ fn deregister_udf(
+ &mut self,
+ name: &str,
+ ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
+ let udf = self.scalar_functions.remove(name);
+ if let Some(udf) = &udf {
+ for alias in udf.aliases() {
+ self.scalar_functions.remove(alias);
+ }
+ }
+ Ok(udf)
+ }
+
+ fn deregister_udaf(
+ &mut self,
+ name: &str,
+ ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
+ let udaf = self.aggregate_functions.remove(name);
+ if let Some(udaf) = &udaf {
+ for alias in udaf.aliases() {
+ self.aggregate_functions.remove(alias);
+ }
+ }
+ Ok(udaf)
+ }
+
+ fn deregister_udwf(
+ &mut self,
+ name: &str,
+ ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
+ let udwf = self.window_functions.remove(name);
+ if let Some(udwf) = &udwf {
+ for alias in udwf.aliases() {
+ self.window_functions.remove(alias);
+ }
+ }
+ Ok(udwf)
+ }
+
+ fn register_function_rewrite(
+ &mut self,
+ rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
+ ) -> datafusion_common::Result<()> {
+ self.analyzer.add_function_rewrite(rewrite);
+ Ok(())
+ }
+}
+
+impl OptimizerConfig for SessionState {
+ fn query_execution_start_time(&self) -> DateTime<Utc> {
+ self.execution_props.query_execution_start_time
+ }
+
+ fn alias_generator(&self) -> Arc<AliasGenerator> {
+ self.execution_props.alias_generator.clone()
+ }
+
+ fn options(&self) -> &ConfigOptions {
+ self.config_options()
+ }
+
+ fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
+ Some(self)
+ }
+}
+
+/// Create a new task context instance from SessionState
+impl From<&SessionState> for TaskContext {
+ fn from(state: &SessionState) -> Self {
+ let task_id = None;
+ TaskContext::new(
+ task_id,
+ state.session_id.clone(),
+ state.config.clone(),
+ state.scalar_functions.clone(),
+ state.aggregate_functions.clone(),
+ state.window_functions.clone(),
+ state.runtime_env.clone(),
+ )
+ }
+}
+
+/// The query planner used if no user defined planner is provided
+struct DefaultQueryPlanner {}
+
+#[async_trait]
+impl QueryPlanner for DefaultQueryPlanner {
+ /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for
execution
+ async fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ let planner = DefaultPhysicalPlanner::default();
+ planner
+ .create_physical_plan(logical_plan, session_state)
+ .await
+ }
+}
+
+struct SessionSimplifyProvider<'a> {
+ state: &'a SessionState,
+ df_schema: &'a DFSchema,
+}
+
+impl<'a> SessionSimplifyProvider<'a> {
+ fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
+ Self { state, df_schema }
+ }
+}
+
+impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
+ fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
+ Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
+ }
+
+ fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
+ expr.nullable(self.df_schema)
+ }
+
+ fn execution_props(&self) -> &ExecutionProps {
+ self.state.execution_props()
+ }
+
+ fn get_data_type(&self, expr: &Expr) ->
datafusion_common::Result<DataType> {
+ expr.get_type(self.df_schema)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]