alamb commented on code in PR #11403:
URL: https://github.com/apache/datafusion/pull/11403#discussion_r1675852702


##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -195,122 +196,10 @@ impl SessionState {
         runtime: Arc<RuntimeEnv>,
         catalog_list: Arc<dyn CatalogProviderList>,
     ) -> Self {
-        let session_id = Uuid::new_v4().to_string();
-
-        // Create table_factories for all default formats
-        let mut table_factories: HashMap<String, Arc<dyn 
TableProviderFactory>> =
-            HashMap::new();
-        #[cfg(feature = "parquet")]
-        table_factories.insert("PARQUET".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("CSV".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("JSON".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("NDJSON".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("AVRO".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("ARROW".into(), 
Arc::new(DefaultTableFactory::new()));
-
-        if config.create_default_catalog_and_schema() {
-            let default_catalog = MemoryCatalogProvider::new();
-
-            default_catalog
-                .register_schema(
-                    &config.options().catalog.default_schema,
-                    Arc::new(MemorySchemaProvider::new()),
-                )
-                .expect("memory catalog provider can register schema");
-
-            Self::register_default_schema(
-                &config,
-                &table_factories,
-                &runtime,
-                &default_catalog,
-            );
-
-            catalog_list.register_catalog(
-                config.options().catalog.default_catalog.clone(),
-                Arc::new(default_catalog),
-            );
-        }
-
-        let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
-            Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
-            // register crate of array expressions (if enabled)
-            #[cfg(feature = "array_expressions")]
-            Arc::new(functions_array::planner::ArrayFunctionPlanner),
-            #[cfg(feature = "array_expressions")]
-            Arc::new(functions_array::planner::FieldAccessPlanner),
-            #[cfg(any(
-                feature = "datetime_expressions",
-                feature = "unicode_expressions"
-            ))]
-            Arc::new(functions::planner::UserDefinedFunctionPlanner),
-        ];
-
-        let mut new_self = SessionState {
-            session_id,
-            analyzer: Analyzer::new(),
-            expr_planners,
-            optimizer: Optimizer::new(),
-            physical_optimizers: PhysicalOptimizer::new(),
-            query_planner: Arc::new(DefaultQueryPlanner {}),
-            catalog_list,
-            table_functions: HashMap::new(),
-            scalar_functions: HashMap::new(),
-            aggregate_functions: HashMap::new(),
-            window_functions: HashMap::new(),
-            serializer_registry: Arc::new(EmptySerializerRegistry),
-            file_formats: HashMap::new(),
-            table_options: 
TableOptions::default_from_session_config(config.options()),
-            config,
-            execution_props: ExecutionProps::new(),
-            runtime_env: runtime,
-            table_factories,
-            function_factory: None,
-        };
-
-        #[cfg(feature = "parquet")]
-        if let Err(e) =
-            
new_self.register_file_format(Arc::new(ParquetFormatFactory::new()), false)
-        {
-            log::info!("Unable to register default ParquetFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(JsonFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default JsonFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(CsvFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default CsvFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(ArrowFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default ArrowFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(AvroFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default AvroFormat: {e}")
-        };
-
-        // register built in functions
-        functions::register_all(&mut new_self)
-            .expect("can not register built in functions");
-
-        // register crate of array expressions (if enabled)
-        #[cfg(feature = "array_expressions")]
-        functions_array::register_all(&mut new_self)
-            .expect("can not register array expressions");
-
-        functions_aggregate::register_all(&mut new_self)
-            .expect("can not register aggregate functions");
-
-        new_self
+        SessionStateBuilder::new_with_config_rt(config, runtime)
+            .with_defaults(true)
+            .with_catalog_list(catalog_list)
+            .build()

Review Comment:
   this is so nice ❤️ 



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
+    /// The session id for the new builder will be set to a unique value; all
+    /// other fields will be cloned from what is set in the provided session 
state
+    pub fn new_from_existing(existing: &SessionState) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+
+        Self {
+            state: SessionState {
+                session_id,
+                ..existing.clone()
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Set to true (default = true) if defaults for table_factories, 
expr_planners, file formats

Review Comment:
   😍  this is perfect
   
   I feel like `defaults` is a somewhat overloaded term. Perhaps we could 
improve the naming here. Some thoughts 🤔 
   * `with_built_ins()`
   * `with_install_defaults()`
   * `with_default_features()`



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
+    /// The session id for the new builder will be set to a unique value; all
+    /// other fields will be cloned from what is set in the provided session 
state
+    pub fn new_from_existing(existing: &SessionState) -> Self {

Review Comment:
   In addition to this function, another nice to have UX feature would be a 
From impl, so users could do
   
   ```rust
   let state: SessionState = get_existing_state();
   let state = SessionStateBuilder::from(state)
     .with_optimizer_pass(Arc::new(MyNewOptimizer{}))
     .build()
   ```
   
   



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
+    /// The session id for the new builder will be set to a unique value; all
+    /// other fields will be cloned from what is set in the provided session 
state
+    pub fn new_from_existing(existing: &SessionState) -> Self {

Review Comment:
   Since this `clone`s SessionState anyways, I think it would make more sense 
for this to take the argument by value
   
   ```suggestion
       pub fn new_from_existing(existing: SessionState) -> Self {
   ```
   
   The rationale being there if the caller already has an owned SessionState 
they can use that and avoid a clone. This API basically requires them to do a 
clone



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,

Review Comment:
   Rather than using an inner SessionState, it might be possible to simply 
replicate the fields on the builder (possibly in `Option`) -- while this would 
involve replicated code, it would make doing things like deferred creation of 
session config and runtime env easier
   
   However, since this is an implementation detail we could also change it as a 
follow on PR without much issue



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(

Review Comment:
   Another way we could do this would be simply
   
   ```rust
   let builder = SessionStateBuilder::new()
     .with_config(config)
     .with_runtime_env(runtime_env);
   ```
   
   I think the original concern with an API like this is that this would likely 
create a new `SessionConfig::default()` just to replace it with `config` 
   
   However, I think we could optimize this eventually (e.g. by deferring the 
creation of `SessionState` until calling `build()` for example) and if we are 
going to change the API anyways I think we should consider doing the "better" 
thing



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -195,122 +196,10 @@ impl SessionState {
         runtime: Arc<RuntimeEnv>,
         catalog_list: Arc<dyn CatalogProviderList>,
     ) -> Self {
-        let session_id = Uuid::new_v4().to_string();
-
-        // Create table_factories for all default formats
-        let mut table_factories: HashMap<String, Arc<dyn 
TableProviderFactory>> =
-            HashMap::new();
-        #[cfg(feature = "parquet")]
-        table_factories.insert("PARQUET".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("CSV".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("JSON".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("NDJSON".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("AVRO".into(), 
Arc::new(DefaultTableFactory::new()));
-        table_factories.insert("ARROW".into(), 
Arc::new(DefaultTableFactory::new()));
-
-        if config.create_default_catalog_and_schema() {
-            let default_catalog = MemoryCatalogProvider::new();
-
-            default_catalog
-                .register_schema(
-                    &config.options().catalog.default_schema,
-                    Arc::new(MemorySchemaProvider::new()),
-                )
-                .expect("memory catalog provider can register schema");
-
-            Self::register_default_schema(
-                &config,
-                &table_factories,
-                &runtime,
-                &default_catalog,
-            );
-
-            catalog_list.register_catalog(
-                config.options().catalog.default_catalog.clone(),
-                Arc::new(default_catalog),
-            );
-        }
-
-        let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
-            Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
-            // register crate of array expressions (if enabled)
-            #[cfg(feature = "array_expressions")]
-            Arc::new(functions_array::planner::ArrayFunctionPlanner),
-            #[cfg(feature = "array_expressions")]
-            Arc::new(functions_array::planner::FieldAccessPlanner),
-            #[cfg(any(
-                feature = "datetime_expressions",
-                feature = "unicode_expressions"
-            ))]
-            Arc::new(functions::planner::UserDefinedFunctionPlanner),
-        ];
-
-        let mut new_self = SessionState {
-            session_id,
-            analyzer: Analyzer::new(),
-            expr_planners,
-            optimizer: Optimizer::new(),
-            physical_optimizers: PhysicalOptimizer::new(),
-            query_planner: Arc::new(DefaultQueryPlanner {}),
-            catalog_list,
-            table_functions: HashMap::new(),
-            scalar_functions: HashMap::new(),
-            aggregate_functions: HashMap::new(),
-            window_functions: HashMap::new(),
-            serializer_registry: Arc::new(EmptySerializerRegistry),
-            file_formats: HashMap::new(),
-            table_options: 
TableOptions::default_from_session_config(config.options()),
-            config,
-            execution_props: ExecutionProps::new(),
-            runtime_env: runtime,
-            table_factories,
-            function_factory: None,
-        };
-
-        #[cfg(feature = "parquet")]
-        if let Err(e) =
-            
new_self.register_file_format(Arc::new(ParquetFormatFactory::new()), false)
-        {
-            log::info!("Unable to register default ParquetFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(JsonFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default JsonFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(CsvFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default CsvFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(ArrowFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default ArrowFormat: {e}")
-        };
-
-        if let Err(e) =
-            new_self.register_file_format(Arc::new(AvroFormatFactory::new()), 
false)
-        {
-            log::info!("Unable to register default AvroFormat: {e}")
-        };
-
-        // register built in functions
-        functions::register_all(&mut new_self)
-            .expect("can not register built in functions");
-
-        // register crate of array expressions (if enabled)
-        #[cfg(feature = "array_expressions")]
-        functions_array::register_all(&mut new_self)
-            .expect("can not register array expressions");
-
-        functions_aggregate::register_all(&mut new_self)
-            .expect("can not register aggregate functions");
-
-        new_self
+        SessionStateBuilder::new_with_config_rt(config, runtime)
+            .with_defaults(true)

Review Comment:
   I think it would be good to mention / point at `SessionStateBuilder` in the 
docs here
   
   As a follow on PR it probably makes sense to deprecate 
`SessionState::new_with_config` and `new_with_config...` and redirect people to 
use `SessionStateBuilder`



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
+    /// The session id for the new builder will be set to a unique value; all
+    /// other fields will be cloned from what is set in the provided session 
state
+    pub fn new_from_existing(existing: &SessionState) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+
+        Self {
+            state: SessionState {
+                session_id,
+                ..existing.clone()
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Set to true (default = true) if defaults for table_factories, 
expr_planners, file formats
+    /// and builtin functions should be set.
+    /// Note that there is an explicit option for enabling catalog and schema 
default
+    /// via [SessionConfig::create_default_catalog_and_schema] which will only 
be used
+    /// if the use_defaults is enabled here.
+    /// Also note that if a field is explicitly set to a non-empty value -
+    /// for example by using the [SessionStateBuilder::with_file_formats] 
function,
+    /// then defaults for that field will not be set.
+    pub fn with_defaults(mut self, use_defaults: bool) -> Self {
+        self.use_defaults = use_defaults;
+        self
+    }
+
+    /// Replace the random session id.
+    pub fn with_session_id(mut self, session_id: String) -> Self {
+        self.state.session_id = session_id;
+        self
+    }
+
+    /// Override the [`AnalyzerRule`]s optimizer plan rules.
+    pub fn with_analyzer_rules(
+        mut self,
+        rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.analyzer = Analyzer::with_rules(rules);
+        self
+    }
+
+    /// Add `analyzer_rule` to the end of the list of
+    /// [`AnalyzerRule`]s used to rewrite queries.
+    pub fn add_analyzer_rule(
+        mut self,
+        analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
+    ) -> Self {
+        self.state.analyzer.rules.push(analyzer_rule);
+        self
+    }
+
+    /// Replace the entire list of [`OptimizerRule`]s used to optimize plans
+    pub fn with_optimizer_rules(
+        mut self,
+        rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.optimizer = Optimizer::with_rules(rules);
+        self
+    }
+
+    /// Add `optimizer_rule` to the end of the list of
+    /// [`OptimizerRule`]s used to rewrite queries.
+    pub fn add_optimizer_rule(
+        mut self,
+        optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
+    ) -> Self {
+        self.state.optimizer.rules.push(optimizer_rule);
+        self
+    }
+
+    /// Replace the entire list of [`ExprPlanner`]s used to customize the 
behavior of the SQL planner
+    pub fn with_expr_planners(
+        mut self,
+        expr_planners: Vec<Arc<dyn ExprPlanner>>,
+    ) -> Self {
+        self.state.expr_planners = expr_planners;
+        self
+    }
+
+    /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize 
plans
+    pub fn with_physical_optimizer_rules(
+        mut self,
+        physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.physical_optimizers =
+            PhysicalOptimizer::with_rules(physical_optimizers);
+        self
+    }
+
+    /// Add `physical_optimizer_rule` to the end of the list of
+    /// [`PhysicalOptimizerRule`]s used to rewrite queries.
+    pub fn add_physical_optimizer_rule(
+        mut self,
+        physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
+    ) -> Self {
+        self.state
+            .physical_optimizers
+            .rules
+            .push(physical_optimizer_rule);
+        self
+    }
+
+    /// override default query planner with `query_planner`
+    pub fn with_query_planner(
+        mut self,
+        query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+    ) -> Self {
+        self.state.query_planner = query_planner;
+        self
+    }
+
+    /// override default catalog list with `catalog_list`
+    pub fn with_catalog_list(
+        mut self,
+        catalog_list: Arc<dyn CatalogProviderList>,
+    ) -> Self {
+        self.state.catalog_list = catalog_list;
+        self
+    }
+
+    /// override default table functions with `table_functions`
+    pub fn with_table_functions(
+        mut self,
+        table_functions: HashMap<String, Arc<TableFunction>>,
+    ) -> Self {
+        self.state.table_functions = table_functions;
+        self
+    }
+
+    /// override default scalar functions with `scalar_functions`
+    pub fn with_scalar_functions(
+        mut self,
+        scalar_functions: HashMap<String, Arc<ScalarUDF>>,
+    ) -> Self {
+        self.state.scalar_functions = scalar_functions;
+        self
+    }
+
+    /// override default aggregate functions with `aggregate_functions`
+    pub fn with_aggregate_functions(
+        mut self,
+        aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    ) -> Self {
+        self.state.aggregate_functions = aggregate_functions;
+        self
+    }
+
+    /// override default window functions with `window_functions`
+    pub fn with_window_functions(
+        mut self,
+        window_functions: HashMap<String, Arc<WindowUDF>>,
+    ) -> Self {
+        self.state.window_functions = window_functions;
+        self
+    }
+
+    /// Registers a [`SerializerRegistry`]
+    pub fn with_serializer_registry(
+        mut self,
+        serializer_registry: Arc<dyn SerializerRegistry>,
+    ) -> Self {
+        self.state.serializer_registry = serializer_registry;
+        self
+    }
+
+    /// override default list of file formats with `file_formats`
+    pub fn with_file_formats(
+        mut self,
+        file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
+    ) -> Self {
+        self.state.file_formats = file_formats;
+        self
+    }
+
+    /// override the session config with `config`
+    pub fn with_config(mut self, config: SessionConfig) -> Self {
+        self.state.config = config;
+        self
+    }
+
+    /// override default table options with `table_options`
+    pub fn with_table_options(mut self, table_options: TableOptions) -> Self {
+        self.state.table_options = table_options;
+        self
+    }
+
+    /// Adds a new [`ConfigExtension`] to TableOptions
+    pub fn with_table_options_extension<T: ConfigExtension>(
+        mut self,
+        extension: T,
+    ) -> Self {
+        self.state.table_options.extensions.insert(extension);
+        self
+    }
+
+    /// override default execution props with `execution_props`
+    pub fn with_execution_props(mut self, execution_props: ExecutionProps) -> 
Self {
+        self.state.execution_props = execution_props;
+        self
+    }
+
+    /// override default table factories with `table_factories`
+    pub fn with_table_factories(
+        mut self,
+        table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
+    ) -> Self {
+        self.state.table_factories = table_factories;
+        self
+    }
+
+    /// override the runtime env with `runtime_env`
+    pub fn with_runtime_env(mut self, runtime_env: Arc<RuntimeEnv>) -> Self {
+        self.state.runtime_env = runtime_env;
+        self
+    }
+
+    /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
+    pub fn with_function_factory(
+        mut self,
+        function_factory: Option<Arc<dyn FunctionFactory>>,
+    ) -> Self {
+        self.state.function_factory = function_factory;
+        self
+    }
+
+    /// build a [`SessionState`] with the current configuration
+    pub fn build(mut self) -> SessionState {
+        if self.use_defaults {
+            if self.state.table_factories.is_empty() {
+                self.state.table_factories =
+                    SessionStateDefaults::default_table_factories();
+            }
+            if self.state.expr_planners.is_empty() {

Review Comment:
   the semantics of "use_defaults" is effectively ignored if the user has 
provided their own pieces might be confusing
   
   I think it would be easier to reason about if the defaults got installed as 
part of `SessionBuilder::with_use_defaults()` (rather than a flag that deferred 
the installation to the end) -- that way the user could decide if they wanted 
to modify the default lists or start from an empty list
   
   What I am hoping long term is that we could pass in the defaults as an 
argument (and implement it as a trait) thus breaking the dependency between 
SessionState and core
   
   ```rust
   let state = SessionStateBuilder::new()
     .with_defaults(SessionStateDefaults::new())?
     .build()
   ```



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -976,6 +837,482 @@ impl SessionState {
     }
 }
 
+/// A builder to be used for building [`SessionState`]'s. Defaults will be 
used for all values
+/// unless explicitly provided. Note that there is no `Default` or `new()` for 
SessionState,
+/// to avoid accidentally running queries or other operations without passing 
through
+/// the [`SessionConfig`] or [`RuntimeEnv`].
+pub struct SessionStateBuilder {
+    state: SessionState,
+    use_defaults: bool,
+}
+
+impl SessionStateBuilder {
+    /// Returns new [`SessionStateBuilder`] using the provided
+    /// [`SessionConfig`] and [`RuntimeEnv`].
+    pub fn new_with_config_rt(
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    ) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+        let catalog_list =
+            Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn 
CatalogProviderList>;
+
+        Self {
+            state: SessionState {
+                session_id,
+                analyzer: Analyzer::new(),
+                expr_planners: vec![],
+                optimizer: Optimizer::new(),
+                physical_optimizers: PhysicalOptimizer::new(),
+                query_planner: Arc::new(DefaultQueryPlanner {}),
+                catalog_list,
+                table_functions: HashMap::new(),
+                scalar_functions: HashMap::new(),
+                aggregate_functions: HashMap::new(),
+                window_functions: HashMap::new(),
+                serializer_registry: Arc::new(EmptySerializerRegistry),
+                file_formats: HashMap::new(),
+                table_options: TableOptions::default_from_session_config(
+                    config.options(),
+                ),
+                config,
+                execution_props: ExecutionProps::new(),
+                table_factories: HashMap::new(),
+                runtime_env,
+                function_factory: None,
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
+    /// The session id for the new builder will be set to a unique value; all
+    /// other fields will be cloned from what is set in the provided session 
state
+    pub fn new_from_existing(existing: &SessionState) -> Self {
+        let session_id = Uuid::new_v4().to_string();
+
+        Self {
+            state: SessionState {
+                session_id,
+                ..existing.clone()
+            },
+            use_defaults: true,
+        }
+    }
+
+    /// Set to true (default = true) if defaults for table_factories, 
expr_planners, file formats
+    /// and builtin functions should be set.
+    /// Note that there is an explicit option for enabling catalog and schema 
default
+    /// via [SessionConfig::create_default_catalog_and_schema] which will only 
be used
+    /// if the use_defaults is enabled here.
+    /// Also note that if a field is explicitly set to a non-empty value -
+    /// for example by using the [SessionStateBuilder::with_file_formats] 
function,
+    /// then defaults for that field will not be set.
+    pub fn with_defaults(mut self, use_defaults: bool) -> Self {
+        self.use_defaults = use_defaults;
+        self
+    }
+
+    /// Replace the random session id.
+    pub fn with_session_id(mut self, session_id: String) -> Self {
+        self.state.session_id = session_id;
+        self
+    }
+
+    /// Override the [`AnalyzerRule`]s optimizer plan rules.
+    pub fn with_analyzer_rules(
+        mut self,
+        rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.analyzer = Analyzer::with_rules(rules);
+        self
+    }
+
+    /// Add `analyzer_rule` to the end of the list of
+    /// [`AnalyzerRule`]s used to rewrite queries.
+    pub fn add_analyzer_rule(
+        mut self,
+        analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
+    ) -> Self {
+        self.state.analyzer.rules.push(analyzer_rule);
+        self
+    }
+
+    /// Replace the entire list of [`OptimizerRule`]s used to optimize plans
+    pub fn with_optimizer_rules(
+        mut self,
+        rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.optimizer = Optimizer::with_rules(rules);
+        self
+    }
+
+    /// Add `optimizer_rule` to the end of the list of
+    /// [`OptimizerRule`]s used to rewrite queries.
+    pub fn add_optimizer_rule(
+        mut self,
+        optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
+    ) -> Self {
+        self.state.optimizer.rules.push(optimizer_rule);
+        self
+    }
+
+    /// Replace the entire list of [`ExprPlanner`]s used to customize the 
behavior of the SQL planner
+    pub fn with_expr_planners(
+        mut self,
+        expr_planners: Vec<Arc<dyn ExprPlanner>>,
+    ) -> Self {
+        self.state.expr_planners = expr_planners;
+        self
+    }
+
+    /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize 
plans
+    pub fn with_physical_optimizer_rules(
+        mut self,
+        physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+    ) -> Self {
+        self.state.physical_optimizers =
+            PhysicalOptimizer::with_rules(physical_optimizers);
+        self
+    }
+
+    /// Add `physical_optimizer_rule` to the end of the list of
+    /// [`PhysicalOptimizerRule`]s used to rewrite queries.
+    pub fn add_physical_optimizer_rule(
+        mut self,
+        physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
+    ) -> Self {
+        self.state
+            .physical_optimizers
+            .rules
+            .push(physical_optimizer_rule);
+        self
+    }
+
+    /// override default query planner with `query_planner`
+    pub fn with_query_planner(
+        mut self,
+        query_planner: Arc<dyn QueryPlanner + Send + Sync>,
+    ) -> Self {
+        self.state.query_planner = query_planner;
+        self
+    }
+
+    /// override default catalog list with `catalog_list`
+    pub fn with_catalog_list(
+        mut self,
+        catalog_list: Arc<dyn CatalogProviderList>,
+    ) -> Self {
+        self.state.catalog_list = catalog_list;
+        self
+    }
+
+    /// override default table functions with `table_functions`
+    pub fn with_table_functions(
+        mut self,
+        table_functions: HashMap<String, Arc<TableFunction>>,
+    ) -> Self {
+        self.state.table_functions = table_functions;
+        self
+    }
+
+    /// override default scalar functions with `scalar_functions`
+    pub fn with_scalar_functions(
+        mut self,
+        scalar_functions: HashMap<String, Arc<ScalarUDF>>,
+    ) -> Self {
+        self.state.scalar_functions = scalar_functions;
+        self
+    }
+
+    /// override default aggregate functions with `aggregate_functions`
+    pub fn with_aggregate_functions(
+        mut self,
+        aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    ) -> Self {
+        self.state.aggregate_functions = aggregate_functions;
+        self
+    }
+
+    /// override default window functions with `window_functions`
+    pub fn with_window_functions(
+        mut self,
+        window_functions: HashMap<String, Arc<WindowUDF>>,
+    ) -> Self {
+        self.state.window_functions = window_functions;
+        self
+    }
+
+    /// Registers a [`SerializerRegistry`]
+    pub fn with_serializer_registry(
+        mut self,
+        serializer_registry: Arc<dyn SerializerRegistry>,
+    ) -> Self {
+        self.state.serializer_registry = serializer_registry;
+        self
+    }
+
+    /// override default list of file formats with `file_formats`
+    pub fn with_file_formats(
+        mut self,
+        file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
+    ) -> Self {
+        self.state.file_formats = file_formats;
+        self
+    }
+
+    /// override the session config with `config`
+    pub fn with_config(mut self, config: SessionConfig) -> Self {
+        self.state.config = config;
+        self
+    }
+
+    /// override default table options with `table_options`
+    pub fn with_table_options(mut self, table_options: TableOptions) -> Self {
+        self.state.table_options = table_options;
+        self
+    }
+
+    /// Adds a new [`ConfigExtension`] to TableOptions
+    pub fn with_table_options_extension<T: ConfigExtension>(
+        mut self,
+        extension: T,
+    ) -> Self {
+        self.state.table_options.extensions.insert(extension);
+        self
+    }
+
+    /// override default execution props with `execution_props`
+    pub fn with_execution_props(mut self, execution_props: ExecutionProps) -> 
Self {
+        self.state.execution_props = execution_props;
+        self
+    }
+
+    /// override default table factories with `table_factories`
+    pub fn with_table_factories(
+        mut self,
+        table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
+    ) -> Self {
+        self.state.table_factories = table_factories;
+        self
+    }
+
+    /// override the runtime env with `runtime_env`
+    pub fn with_runtime_env(mut self, runtime_env: Arc<RuntimeEnv>) -> Self {
+        self.state.runtime_env = runtime_env;
+        self
+    }
+
+    /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
+    pub fn with_function_factory(
+        mut self,
+        function_factory: Option<Arc<dyn FunctionFactory>>,
+    ) -> Self {
+        self.state.function_factory = function_factory;
+        self
+    }
+
+    /// build a [`SessionState`] with the current configuration
+    pub fn build(mut self) -> SessionState {
+        if self.use_defaults {
+            if self.state.table_factories.is_empty() {
+                self.state.table_factories =
+                    SessionStateDefaults::default_table_factories();
+            }
+            if self.state.expr_planners.is_empty() {
+                self.state.expr_planners = 
SessionStateDefaults::default_expr_planners();
+            }
+
+            if self.state.config.create_default_catalog_and_schema() {
+                let default_catalog = SessionStateDefaults::default_catalog(
+                    &self.state.config,
+                    &self.state.table_factories,
+                    &self.state.runtime_env,
+                );
+
+                self.state.catalog_list.register_catalog(
+                    
self.state.config.options().catalog.default_catalog.clone(),
+                    Arc::new(default_catalog),
+                );
+            }
+
+            if self.state.file_formats.is_empty() {
+                SessionStateDefaults::register_default_file_formats(&mut 
self.state);
+            }
+
+            if self.state.scalar_functions.is_empty() {
+                SessionStateDefaults::register_scalar_functions(&mut 
self.state);
+                SessionStateDefaults::register_array_functions(&mut 
self.state);
+            }
+
+            if self.state.aggregate_functions.is_empty() {
+                SessionStateDefaults::register_aggregate_functions(&mut 
self.state);
+            }
+        }
+
+        self.state.clone()
+    }
+}
+
+/// Defaults that are used as part of creating a SessionState such as table 
providers,

Review Comment:
   This is great. Thank you
   
   Eventually I would love to move this structure out of `session_state.rs` and 
into its own module / .rs file to make the distinction between SessionState and 
the default features clearer. However, we can do that as a follow on task too
   
   Eventually I am thinking that we could pass `SessionStateDefaults` to the 
builder somehow (and thus finally break the dependency of SessionState on 
datafusion-core (but let's not try and do that in this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to