This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e9979c65f Deprecate `RuntimeConfig`, update code to use new builder 
style (#13635)
5e9979c65f is described below

commit 5e9979c65f6945fbb5808df517a07dc7cd9619ec
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 4 17:08:46 2024 -0500

    Deprecate `RuntimeConfig`, update code to use new builder style (#13635)
    
    * Deprecate `RuntimeConfig`, update code to use new builder style
    
    * Update datafusion/execution/src/runtime_env.rs
    
    ---------
    
    Co-authored-by: Oleks V <[email protected]>
---
 benchmarks/src/bin/external_aggr.rs               | 12 +++-
 benchmarks/src/sort_tpch.rs                       | 10 ++--
 datafusion-cli/src/main.rs                        | 37 +++++-------
 datafusion/core/src/datasource/file_format/csv.rs |  3 -
 datafusion/core/src/execution/context/mod.rs      |  3 -
 datafusion/execution/src/disk_manager.rs          |  3 +-
 datafusion/execution/src/memory_pool/pool.rs      |  2 +-
 datafusion/execution/src/runtime_env.rs           | 70 ++++++++++++++---------
 datafusion/execution/src/task.rs                  | 10 +---
 datafusion/physical-plan/src/aggregates/mod.rs    |  4 +-
 datafusion/sql/src/planner.rs                     |  2 +-
 11 files changed, 80 insertions(+), 76 deletions(-)

diff --git a/benchmarks/src/bin/external_aggr.rs 
b/benchmarks/src/bin/external_aggr.rs
index 950c3048c1..f1d8337585 100644
--- a/benchmarks/src/bin/external_aggr.rs
+++ b/benchmarks/src/bin/external_aggr.rs
@@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::Result;
 use datafusion::execution::memory_pool::FairSpillPool;
 use datafusion::execution::memory_pool::{human_readable_size, units};
-use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::SessionStateBuilder;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::{collect, displayable};
 use datafusion::prelude::*;
@@ -195,10 +196,15 @@ impl ExternalAggrConfig {
         let query_name =
             format!("Q{query_id}({})", human_readable_size(mem_limit as 
usize));
         let config = self.common.config();
-        let runtime_config = RuntimeConfig::new()
+        let runtime_env = RuntimeEnvBuilder::new()
             .with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
             .build_arc()?;
-        let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_runtime_env(runtime_env)
+            .with_default_features()
+            .build();
+        let ctx = SessionContext::from(state);
 
         // register tables
         self.register_tables(&ctx).await?;
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index 137f4e20b7..566a5ea62c 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
 };
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::Result;
-use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::execution::SessionStateBuilder;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::{displayable, execute_stream};
 use datafusion::prelude::*;
@@ -188,9 +188,11 @@ impl RunOpt {
     /// Benchmark query `query_id` in `SORT_QUERIES`
     async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
         let config = self.common.config();
-
-        let runtime_config = RuntimeConfig::new().build_arc()?;
-        let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_default_features()
+            .build();
+        let ctx = SessionContext::from(state);
 
         // register tables
         self.register_tables(&ctx).await?;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 4c6c352ff3..d5dd2c4b33 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock};
 
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
-use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, 
MemoryPool};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicObjectStoreCatalog;
 use datafusion_cli::functions::ParquetMetadataFunc;
@@ -156,27 +156,22 @@ async fn main_inner() -> Result<()> {
         session_config = session_config.with_batch_size(batch_size);
     };
 
-    let rt_config = RuntimeConfig::new();
-    let rt_config =
-        // set memory pool size
-        if let Some(memory_limit) = args.memory_limit {
-            // set memory pool type
-            match args.mem_pool_type {
-                PoolType::Fair => rt_config
-                    
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
-                PoolType::Greedy => rt_config
-                    
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
-            }
-        } else {
-            rt_config
+    let mut rt_builder = RuntimeEnvBuilder::new();
+    // set memory pool size
+    if let Some(memory_limit) = args.memory_limit {
+        // set memory pool type
+        let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
+            PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
+            PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
         };
+        rt_builder = rt_builder.with_memory_pool(pool)
+    }
 
-    let runtime_env = create_runtime_env(rt_config.clone())?;
+    let runtime_env = rt_builder.build_arc()?;
 
     // enable dynamic file query
-    let ctx =
-        SessionContext::new_with_config_rt(session_config.clone(), 
Arc::new(runtime_env))
-            .enable_url_table();
+    let ctx = SessionContext::new_with_config_rt(session_config, runtime_env)
+        .enable_url_table();
     ctx.refresh_catalogs().await?;
     // install dynamic catalog provider that can register required object 
stores
     ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
@@ -231,10 +226,6 @@ async fn main_inner() -> Result<()> {
     Ok(())
 }
 
-fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
-    RuntimeEnv::try_new(rn_config)
-}
-
 fn parse_valid_file(dir: &str) -> Result<String, String> {
     if Path::new(dir).is_file() {
         Ok(dir.to_string())
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 9f979ddf01..9c96c68286 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -753,7 +753,6 @@ mod tests {
     use datafusion_common::cast::as_string_array;
     use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
-    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
     use datafusion_expr::{col, lit};
 
     use chrono::DateTime;
@@ -984,12 +983,10 @@ mod tests {
     async fn query_compress_data(
         file_compression_type: FileCompressionType,
     ) -> Result<()> {
-        let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
         let mut cfg = SessionConfig::new();
         cfg.options_mut().catalog.has_header = true;
         let session_state = SessionStateBuilder::new()
             .with_config(cfg)
-            .with_runtime_env(runtime)
             .with_default_features()
             .build();
         let integration = 
LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 4cc3200df1..67236c9a6b 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1792,7 +1792,6 @@ mod tests {
     use super::{super::options::CsvReadOptions, *};
     use crate::assert_batches_eq;
     use crate::execution::memory_pool::MemoryConsumer;
-    use crate::execution::runtime_env::RuntimeEnvBuilder;
     use crate::test;
     use crate::test_util::{plan_and_collect, populate_csv_partitions};
     use arrow_schema::{DataType, TimeUnit};
@@ -1932,14 +1931,12 @@ mod tests {
         let path = path.join("tests/tpch-csv");
         let url = format!("file://{}", path.display());
 
-        let runtime = RuntimeEnvBuilder::new().build_arc()?;
         let cfg = SessionConfig::new()
             .set_str("datafusion.catalog.location", url.as_str())
             .set_str("datafusion.catalog.format", "CSV")
             .set_str("datafusion.catalog.has_header", "true");
         let session_state = SessionStateBuilder::new()
             .with_config(cfg)
-            .with_runtime_env(runtime)
             .with_default_features()
             .build();
         let ctx = SessionContext::new_with_state(session_state);
diff --git a/datafusion/execution/src/disk_manager.rs 
b/datafusion/execution/src/disk_manager.rs
index c71071b809..756da7ed5b 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Manages files generated during query execution, files are
-//! hashed among the directories listed in RuntimeConfig::local_dirs.
+//! [`DiskManager`]: Manages files generated during query execution
 
 use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
 use log::debug;
diff --git a/datafusion/execution/src/memory_pool/pool.rs 
b/datafusion/execution/src/memory_pool/pool.rs
index c2ec42d0df..261332180e 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -62,7 +62,7 @@ pub struct GreedyMemoryPool {
 }
 
 impl GreedyMemoryPool {
-    /// Allocate up to `limit` bytes
+    /// Create a new pool that can allocate up to `pool_size` bytes
     pub fn new(pool_size: usize) -> Self {
         debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
         Self {
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index d302452f75..5420080efd 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -41,13 +41,32 @@ use url::Url;
 /// Execution runtime environment that manages system resources such
 /// as memory, disk, cache and storage.
 ///
-/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
+/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
 /// following resource management functionality:
 ///
 /// * [`MemoryPool`]: Manage memory
 /// * [`DiskManager`]: Manage temporary files on local disk
 /// * [`CacheManager`]: Manage temporary cache data during the session lifetime
 /// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
+///
+/// # Example: Create default `RuntimeEnv`
+/// ```
+/// # use datafusion_execution::runtime_env::RuntimeEnv;
+/// let runtime_env = RuntimeEnv::default();
+/// ```
+///
+/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new 
memory pool
+/// ```
+/// # use std::sync::Arc;
+/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
+/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
+/// // restrict to using at most 100MB of memory
+/// let pool_size = 100 * 1024 * 1024;
+/// let runtime_env = RuntimeEnvBuilder::new()
+///   .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
+///   .build()
+///   .unwrap();
+/// ```
 pub struct RuntimeEnv {
     /// Runtime memory management
     pub memory_pool: Arc<dyn MemoryPool>,
@@ -66,28 +85,16 @@ impl Debug for RuntimeEnv {
 }
 
 impl RuntimeEnv {
-    #[deprecated(since = "43.0.0", note = "please use `try_new` instead")]
+    #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` 
instead")]
+    #[allow(deprecated)]
     pub fn new(config: RuntimeConfig) -> Result<Self> {
         Self::try_new(config)
     }
     /// Create env based on configuration
+    #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` 
instead")]
+    #[allow(deprecated)]
     pub fn try_new(config: RuntimeConfig) -> Result<Self> {
-        let RuntimeConfig {
-            memory_pool,
-            disk_manager,
-            cache_manager,
-            object_store_registry,
-        } = config;
-
-        let memory_pool =
-            memory_pool.unwrap_or_else(|| 
Arc::new(UnboundedMemoryPool::default()));
-
-        Ok(Self {
-            memory_pool,
-            disk_manager: DiskManager::try_new(disk_manager)?,
-            cache_manager: CacheManager::try_new(&cache_manager)?,
-            object_store_registry,
-        })
+        config.build()
     }
 
     /// Registers a custom `ObjectStore` to be used with a specific url.
@@ -104,7 +111,7 @@ impl RuntimeEnv {
     /// # use std::sync::Arc;
     /// # use url::Url;
     /// # use datafusion_execution::runtime_env::RuntimeEnv;
-    /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
+    /// # let runtime_env = RuntimeEnv::default();
     /// let url = Url::try_from("file://").unwrap();
     /// let object_store = object_store::local::LocalFileSystem::new();
     /// // register the object store with the runtime environment
@@ -119,11 +126,12 @@ impl RuntimeEnv {
     /// # use std::sync::Arc;
     /// # use url::Url;
     /// # use datafusion_execution::runtime_env::RuntimeEnv;
-    /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
+    /// # let runtime_env = RuntimeEnv::default();
     /// # // use local store for example as http feature is not enabled
     /// # let http_store = object_store::local::LocalFileSystem::new();
     /// // create a new object store via object_store::http::HttpBuilder;
     /// let base_url = Url::parse("https://github.com";).unwrap();
+    /// // (note this example can't depend on the http feature)
     /// // let http_store = HttpBuilder::new()
     /// //    .with_url(base_url.clone())
     /// //    .build()
@@ -157,10 +165,13 @@ impl Default for RuntimeEnv {
 
 /// Please see: <https://github.com/apache/datafusion/issues/12156>
 /// This a type alias for backwards compatibility.
+#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` 
instead")]
 pub type RuntimeConfig = RuntimeEnvBuilder;
 
 #[derive(Clone)]
-/// Execution runtime configuration
+/// Execution runtime configuration builder.
+///
+/// See example on [`RuntimeEnv`]
 pub struct RuntimeEnvBuilder {
     /// DiskManager to manage temporary disk file usage
     pub disk_manager: DiskManagerConfig,
@@ -239,15 +250,20 @@ impl RuntimeEnvBuilder {
 
     /// Build a RuntimeEnv
     pub fn build(self) -> Result<RuntimeEnv> {
-        let memory_pool = self
-            .memory_pool
-            .unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
+        let Self {
+            disk_manager,
+            memory_pool,
+            cache_manager,
+            object_store_registry,
+        } = self;
+        let memory_pool =
+            memory_pool.unwrap_or_else(|| 
Arc::new(UnboundedMemoryPool::default()));
 
         Ok(RuntimeEnv {
             memory_pool,
-            disk_manager: DiskManager::try_new(self.disk_manager)?,
-            cache_manager: CacheManager::try_new(&self.cache_manager)?,
-            object_store_registry: self.object_store_registry,
+            disk_manager: DiskManager::try_new(disk_manager)?,
+            cache_manager: CacheManager::try_new(&cache_manager)?,
+            object_store_registry,
         })
     }
 
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 35494443b4..7cdb53c90d 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -16,10 +16,8 @@
 // under the License.
 
 use crate::{
-    config::SessionConfig,
-    memory_pool::MemoryPool,
-    registry::FunctionRegistry,
-    runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+    config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
+    runtime_env::RuntimeEnv,
 };
 use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
 use datafusion_expr::planner::ExprPlanner;
@@ -54,9 +52,7 @@ pub struct TaskContext {
 
 impl Default for TaskContext {
     fn default() -> Self {
-        let runtime = RuntimeEnvBuilder::new()
-            .build_arc()
-            .expect("default runtime created successfully");
+        let runtime = Arc::new(RuntimeEnv::default());
 
         // Create a default task context, mostly useful for testing
         Self {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 260c3a1c48..feca5eb1db 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1429,7 +1429,7 @@ mod tests {
 
     fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> 
{
         let session_config = SessionConfig::new().with_batch_size(batch_size);
-        let runtime = RuntimeEnvBuilder::default()
+        let runtime = RuntimeEnvBuilder::new()
             .with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
             .build_arc()
             .unwrap();
@@ -1914,7 +1914,7 @@ mod tests {
         let input: Arc<dyn ExecutionPlan> = 
Arc::new(TestYieldingExec::new(true));
         let input_schema = input.schema();
 
-        let runtime = RuntimeEnvBuilder::default()
+        let runtime = RuntimeEnvBuilder::new()
             .with_memory_limit(1, 1.0)
             .build_arc()?;
         let task_ctx = TaskContext::default().with_runtime(runtime);
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 496088b1e1..59fa4ca5f1 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     || matches!(tz_info, TimezoneInfo::WithTimeZone)
                 {
                     // Timestamp With Time Zone
-                    // INPUT : [SQLDataType]   TimestampTz + [RuntimeConfig] 
Time Zone
+                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
                     // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time 
Zone)>
                     self.context_provider.options().execution.time_zone.clone()
                 } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to