This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5e9979c65f Deprecate `RuntimeConfig`, update code to use new builder
style (#13635)
5e9979c65f is described below
commit 5e9979c65f6945fbb5808df517a07dc7cd9619ec
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 4 17:08:46 2024 -0500
Deprecate `RuntimeConfig`, update code to use new builder style (#13635)
* Deprecate `RuntimeConfig`, update code to use new builder style
* Update datafusion/execution/src/runtime_env.rs
---------
Co-authored-by: Oleks V <[email protected]>
---
benchmarks/src/bin/external_aggr.rs | 12 +++-
benchmarks/src/sort_tpch.rs | 10 ++--
datafusion-cli/src/main.rs | 37 +++++-------
datafusion/core/src/datasource/file_format/csv.rs | 3 -
datafusion/core/src/execution/context/mod.rs | 3 -
datafusion/execution/src/disk_manager.rs | 3 +-
datafusion/execution/src/memory_pool/pool.rs | 2 +-
datafusion/execution/src/runtime_env.rs | 70 ++++++++++++++---------
datafusion/execution/src/task.rs | 10 +---
datafusion/physical-plan/src/aggregates/mod.rs | 4 +-
datafusion/sql/src/planner.rs | 2 +-
11 files changed, 80 insertions(+), 76 deletions(-)
diff --git a/benchmarks/src/bin/external_aggr.rs
b/benchmarks/src/bin/external_aggr.rs
index 950c3048c1..f1d8337585 100644
--- a/benchmarks/src/bin/external_aggr.rs
+++ b/benchmarks/src/bin/external_aggr.rs
@@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::memory_pool::{human_readable_size, units};
-use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
@@ -195,10 +196,15 @@ impl ExternalAggrConfig {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as
usize));
let config = self.common.config();
- let runtime_config = RuntimeConfig::new()
+ let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.build_arc()?;
- let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_runtime_env(runtime_env)
+ .with_default_features()
+ .build();
+ let ctx = SessionContext::from(state);
// register tables
self.register_tables(&ctx).await?;
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index 137f4e20b7..566a5ea62c 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
-use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
@@ -188,9 +188,11 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) ->
Result<Vec<QueryResult>> {
let config = self.common.config();
-
- let runtime_config = RuntimeConfig::new().build_arc()?;
- let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+ let ctx = SessionContext::from(state);
// register tables
self.register_tables(&ctx).await?;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 4c6c352ff3..d5dd2c4b33 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
-use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool,
MemoryPool};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -156,27 +156,22 @@ async fn main_inner() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};
- let rt_config = RuntimeConfig::new();
- let rt_config =
- // set memory pool size
- if let Some(memory_limit) = args.memory_limit {
- // set memory pool type
- match args.mem_pool_type {
- PoolType::Fair => rt_config
-
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
- PoolType::Greedy => rt_config
-
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
- }
- } else {
- rt_config
+ let mut rt_builder = RuntimeEnvBuilder::new();
+ // set memory pool size
+ if let Some(memory_limit) = args.memory_limit {
+ // set memory pool type
+ let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
+ PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
+ PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
};
+ rt_builder = rt_builder.with_memory_pool(pool)
+ }
- let runtime_env = create_runtime_env(rt_config.clone())?;
+ let runtime_env = rt_builder.build_arc()?;
// enable dynamic file query
- let ctx =
- SessionContext::new_with_config_rt(session_config.clone(),
Arc::new(runtime_env))
- .enable_url_table();
+ let ctx = SessionContext::new_with_config_rt(session_config, runtime_env)
+ .enable_url_table();
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that can register required object
stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
@@ -231,10 +226,6 @@ async fn main_inner() -> Result<()> {
Ok(())
}
-fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
- RuntimeEnv::try_new(rn_config)
-}
-
fn parse_valid_file(dir: &str) -> Result<String, String> {
if Path::new(dir).is_file() {
Ok(dir.to_string())
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 9f979ddf01..9c96c68286 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -753,7 +753,6 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
- use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit};
use chrono::DateTime;
@@ -984,12 +983,10 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
- let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionStateBuilder::new()
.with_config(cfg)
- .with_runtime_env(runtime)
.with_default_features()
.build();
let integration =
LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 4cc3200df1..67236c9a6b 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1792,7 +1792,6 @@ mod tests {
use super::{super::options::CsvReadOptions, *};
use crate::assert_batches_eq;
use crate::execution::memory_pool::MemoryConsumer;
- use crate::execution::runtime_env::RuntimeEnvBuilder;
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use arrow_schema::{DataType, TimeUnit};
@@ -1932,14 +1931,12 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());
- let runtime = RuntimeEnvBuilder::new().build_arc()?;
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
.set_str("datafusion.catalog.has_header", "true");
let session_state = SessionStateBuilder::new()
.with_config(cfg)
- .with_runtime_env(runtime)
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(session_state);
diff --git a/datafusion/execution/src/disk_manager.rs
b/datafusion/execution/src/disk_manager.rs
index c71071b809..756da7ed5b 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Manages files generated during query execution, files are
-//! hashed among the directories listed in RuntimeConfig::local_dirs.
+//! [`DiskManager`]: Manages files generated during query execution
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
diff --git a/datafusion/execution/src/memory_pool/pool.rs
b/datafusion/execution/src/memory_pool/pool.rs
index c2ec42d0df..261332180e 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -62,7 +62,7 @@ pub struct GreedyMemoryPool {
}
impl GreedyMemoryPool {
- /// Allocate up to `limit` bytes
+ /// Create a new pool that can allocate up to `pool_size` bytes
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
diff --git a/datafusion/execution/src/runtime_env.rs
b/datafusion/execution/src/runtime_env.rs
index d302452f75..5420080efd 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -41,13 +41,32 @@ use url::Url;
/// Execution runtime environment that manages system resources such
/// as memory, disk, cache and storage.
///
-/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
+/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
/// following resource management functionality:
///
/// * [`MemoryPool`]: Manage memory
/// * [`DiskManager`]: Manage temporary files on local disk
/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
+///
+/// # Example: Create default `RuntimeEnv`
+/// ```
+/// # use datafusion_execution::runtime_env::RuntimeEnv;
+/// let runtime_env = RuntimeEnv::default();
+/// ```
+///
+/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new
memory pool
+/// ```
+/// # use std::sync::Arc;
+/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
+/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
+/// // restrict to using at most 100MB of memory
+/// let pool_size = 100 * 1024 * 1024;
+/// let runtime_env = RuntimeEnvBuilder::new()
+/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
+/// .build()
+/// .unwrap();
+/// ```
pub struct RuntimeEnv {
/// Runtime memory management
pub memory_pool: Arc<dyn MemoryPool>,
@@ -66,28 +85,16 @@ impl Debug for RuntimeEnv {
}
impl RuntimeEnv {
- #[deprecated(since = "43.0.0", note = "please use `try_new` instead")]
+ #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder`
instead")]
+ #[allow(deprecated)]
pub fn new(config: RuntimeConfig) -> Result<Self> {
Self::try_new(config)
}
/// Create env based on configuration
+ #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder`
instead")]
+ #[allow(deprecated)]
pub fn try_new(config: RuntimeConfig) -> Result<Self> {
- let RuntimeConfig {
- memory_pool,
- disk_manager,
- cache_manager,
- object_store_registry,
- } = config;
-
- let memory_pool =
- memory_pool.unwrap_or_else(||
Arc::new(UnboundedMemoryPool::default()));
-
- Ok(Self {
- memory_pool,
- disk_manager: DiskManager::try_new(disk_manager)?,
- cache_manager: CacheManager::try_new(&cache_manager)?,
- object_store_registry,
- })
+ config.build()
}
/// Registers a custom `ObjectStore` to be used with a specific url.
@@ -104,7 +111,7 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
- /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
+ /// # let runtime_env = RuntimeEnv::default();
/// let url = Url::try_from("file://").unwrap();
/// let object_store = object_store::local::LocalFileSystem::new();
/// // register the object store with the runtime environment
@@ -119,11 +126,12 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
- /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
+ /// # let runtime_env = RuntimeEnv::default();
/// # // use local store for example as http feature is not enabled
/// # let http_store = object_store::local::LocalFileSystem::new();
/// // create a new object store via object_store::http::HttpBuilder;
/// let base_url = Url::parse("https://github.com").unwrap();
+ /// // (note this example can't depend on the http feature)
/// // let http_store = HttpBuilder::new()
/// // .with_url(base_url.clone())
/// // .build()
@@ -157,10 +165,13 @@ impl Default for RuntimeEnv {
/// Please see: <https://github.com/apache/datafusion/issues/12156>
/// This a type alias for backwards compatibility.
+#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder`
instead")]
pub type RuntimeConfig = RuntimeEnvBuilder;
#[derive(Clone)]
-/// Execution runtime configuration
+/// Execution runtime configuration builder.
+///
+/// See example on [`RuntimeEnv`]
pub struct RuntimeEnvBuilder {
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
@@ -239,15 +250,20 @@ impl RuntimeEnvBuilder {
/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
- let memory_pool = self
- .memory_pool
- .unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
+ let Self {
+ disk_manager,
+ memory_pool,
+ cache_manager,
+ object_store_registry,
+ } = self;
+ let memory_pool =
+ memory_pool.unwrap_or_else(||
Arc::new(UnboundedMemoryPool::default()));
Ok(RuntimeEnv {
memory_pool,
- disk_manager: DiskManager::try_new(self.disk_manager)?,
- cache_manager: CacheManager::try_new(&self.cache_manager)?,
- object_store_registry: self.object_store_registry,
+ disk_manager: DiskManager::try_new(disk_manager)?,
+ cache_manager: CacheManager::try_new(&cache_manager)?,
+ object_store_registry,
})
}
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 35494443b4..7cdb53c90d 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -16,10 +16,8 @@
// under the License.
use crate::{
- config::SessionConfig,
- memory_pool::MemoryPool,
- registry::FunctionRegistry,
- runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+ config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
+ runtime_env::RuntimeEnv,
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::ExprPlanner;
@@ -54,9 +52,7 @@ pub struct TaskContext {
impl Default for TaskContext {
fn default() -> Self {
- let runtime = RuntimeEnvBuilder::new()
- .build_arc()
- .expect("default runtime created successfully");
+ let runtime = Arc::new(RuntimeEnv::default());
// Create a default task context, mostly useful for testing
Self {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 260c3a1c48..feca5eb1db 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1429,7 +1429,7 @@ mod tests {
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext>
{
let session_config = SessionConfig::new().with_batch_size(batch_size);
- let runtime = RuntimeEnvBuilder::default()
+ let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build_arc()
.unwrap();
@@ -1914,7 +1914,7 @@ mod tests {
let input: Arc<dyn ExecutionPlan> =
Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();
- let runtime = RuntimeEnvBuilder::default()
+ let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(1, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 496088b1e1..59fa4ca5f1 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
{
// Timestamp With Time Zone
- // INPUT : [SQLDataType] TimestampTz + [RuntimeConfig]
Time Zone
+ // INPUT : [SQLDataType] TimestampTz + [Config] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time
Zone)>
self.context_provider.options().execution.time_zone.clone()
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]