This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new 020d29dc refactor: SessionBuilder to return Result<_> (#1138) 020d29dc is described below commit 020d29dc89591a3314cd01e1a584204256decb94 Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Wed Nov 27 16:24:58 2024 +0000 refactor: SessionBuilder to return Result<_> (#1138) * refactor: SessionBuilder to return Result<_> * Update ballista/core/src/utils.rs Co-authored-by: Andy Grove <agr...@apache.org> --------- Co-authored-by: Andy Grove <agr...@apache.org> --- ballista/core/src/utils.rs | 10 ++++++---- ballista/scheduler/src/cluster/memory.rs | 4 ++-- ballista/scheduler/src/scheduler_server/mod.rs | 3 ++- ballista/scheduler/src/standalone.rs | 8 +++++--- ballista/scheduler/src/state/session_manager.rs | 8 ++++---- examples/examples/custom-client.rs | 2 +- examples/src/object_store.rs | 8 ++++---- examples/tests/object_store.rs | 17 +++++++++-------- 8 files changed, 33 insertions(+), 27 deletions(-) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index bf753325..1506c2bb 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -62,12 +62,14 @@ use tonic::codegen::StdError; use tonic::transport::{Channel, Error, Server}; /// Default session builder using the provided configuration -pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionStateBuilder::new() +pub fn default_session_builder( + config: SessionConfig, +) -> datafusion::common::Result<SessionState> { + Ok(SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) - .build() + .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default())?)) + .build()) } pub fn default_config_producer() -> SessionConfig { diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index 6df04403..c9eac564 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -408,7 +408,7 @@ impl JobState for InMemoryJobState { &self, config: &SessionConfig, ) -> Result<Arc<SessionContext>> { - let session = create_datafusion_context(config, self.session_builder.clone()); + let session = create_datafusion_context(config, self.session_builder.clone())?; self.sessions.insert(session.session_id(), session.clone()); Ok(session) @@ -419,7 +419,7 @@ impl JobState for InMemoryJobState { session_id: &str, config: &SessionConfig, ) -> Result<Arc<SessionContext>> { - let session = create_datafusion_context(config, self.session_builder.clone()); + let session = create_datafusion_context(config, self.session_builder.clone())?; self.sessions .insert(session_id.to_string(), session.clone()); diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index b6eeafda..653e2d41 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -56,7 +56,8 @@ mod external_scaler; mod grpc; pub(crate) mod query_stage_scheduler; -pub type SessionBuilder = Arc<dyn Fn(SessionConfig) -> SessionState + Send + Sync>; +pub type SessionBuilder = + Arc<dyn Fn(SessionConfig) -> datafusion::common::Result<SessionState> + Send + Sync>; #[derive(Clone)] pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> { diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs index 9ad887c6..e9c48345 100644 --- a/ballista/scheduler/src/standalone.rs +++ b/ballista/scheduler/src/standalone.rs @@ -57,9 +57,11 @@ pub async fn new_standalone_scheduler_from_state( let session_config = session_state.config().clone(); let session_state = session_state.clone(); let session_builder = Arc::new(move |c: SessionConfig| { - SessionStateBuilder::new_from_existing(session_state.clone()) - .with_config(c) - .build() + Ok( + SessionStateBuilder::new_from_existing(session_state.clone()) + .with_config(c) + .build(), + ) }); let config_producer = Arc::new(move || session_config.clone()); diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index 8a769edb..59813167 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -67,7 +67,7 @@ impl SessionManager { pub fn create_datafusion_context( session_config: &SessionConfig, session_builder: SessionBuilder, -) -> Arc<SessionContext> { +) -> datafusion::common::Result<Arc<SessionContext>> { let session_state = if session_config.round_robin_repartition() { let session_config = session_config .clone() @@ -75,10 +75,10 @@ pub fn create_datafusion_context( .with_round_robin_repartition(false); log::warn!("session manager will override `datafusion.optimizer.enable_round_robin_repartition` to `false` "); - session_builder(session_config) + session_builder(session_config)? } else { - session_builder(session_config.clone()) + session_builder(session_config.clone())? }; - Arc::new(SessionContext::new_with_state(session_state)) + Ok(Arc::new(SessionContext::new_with_state(session_state))) } diff --git a/examples/examples/custom-client.rs b/examples/examples/custom-client.rs index 3577621e..9e7ec859 100644 --- a/examples/examples/custom-client.rs +++ b/examples/examples/custom-client.rs @@ -62,7 +62,7 @@ async fn main() -> Result<()> { // new sessions state with required custom session configuration and runtime environment let state = - custom_session_state_with_s3_support(custom_session_config_with_s3_options()); + custom_session_state_with_s3_support(custom_session_config_with_s3_options())?; let ctx: SessionContext = SessionContext::remote_with_state("df://localhost:50050", state).await?; diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs index 130d4705..3cd22fa6 100644 --- a/examples/src/object_store.rs +++ b/examples/src/object_store.rs @@ -88,13 +88,13 @@ pub fn custom_runtime_env_with_s3_support( /// and [RuntimeEnv]. pub fn custom_session_state_with_s3_support( session_config: SessionConfig, -) -> SessionState { - let runtime_env = custom_runtime_env_with_s3_support(&session_config).unwrap(); +) -> datafusion::common::Result<SessionState> { + let runtime_env = custom_runtime_env_with_s3_support(&session_config)?; - SessionStateBuilder::new() + Ok(SessionStateBuilder::new() .with_runtime_env(runtime_env) .with_config(session_config) - .build() + .build()) } /// Custom [ObjectStoreRegistry] which will create diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs index cd5c2def..ca47c5cb 100644 --- a/examples/tests/object_store.rs +++ b/examples/tests/object_store.rs @@ -298,7 +298,7 @@ mod custom_s3_config { // object store registry. let session_builder = Arc::new(produce_state); - let state = session_builder(config_producer()); + let state = session_builder(config_producer())?; // setting up ballista cluster with new runtime, configuration, and session state producers let (host, port) = crate::common::setup_test_cluster_with_builders( @@ -416,7 +416,7 @@ mod custom_s3_config { // object store registry. let session_builder = Arc::new(produce_state); - let state = session_builder(config_producer()); + let state = session_builder(config_producer())?; // // establishing cluster connection, let ctx: SessionContext = SessionContext::standalone_with_state(state).await?; @@ -480,24 +480,25 @@ mod custom_s3_config { Ok(()) } - fn produce_state(session_config: SessionConfig) -> SessionState { + fn produce_state( + session_config: SessionConfig, + ) -> datafusion::common::Result<SessionState> { let s3options = session_config .options() .extensions .get::<S3Options>() .ok_or(DataFusionError::Configuration( "S3 Options not set".to_string(), - )) - .unwrap(); + ))?; let config = RuntimeConfig::new().with_object_store_registry(Arc::new( CustomObjectStoreRegistry::new(s3options.clone()), )); - let runtime_env = RuntimeEnv::new(config).unwrap(); + let runtime_env = RuntimeEnv::new(config)?; - SessionStateBuilder::new() + Ok(SessionStateBuilder::new() .with_runtime_env(runtime_env.into()) .with_config(session_config) - .build() + .build()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org