This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 020d29dc refactor: SessionBuilder to return Result<_> (#1138)
020d29dc is described below

commit 020d29dc89591a3314cd01e1a584204256decb94
Author: Marko Milenković <milenkov...@users.noreply.github.com>
AuthorDate: Wed Nov 27 16:24:58 2024 +0000

    refactor: SessionBuilder to return Result<_> (#1138)
    
    * refactor: SessionBuilder to return Result<_>
    
    * Update ballista/core/src/utils.rs
    
    Co-authored-by: Andy Grove <agr...@apache.org>
    
    ---------
    
    Co-authored-by: Andy Grove <agr...@apache.org>
---
 ballista/core/src/utils.rs                      | 10 ++++++----
 ballista/scheduler/src/cluster/memory.rs        |  4 ++--
 ballista/scheduler/src/scheduler_server/mod.rs  |  3 ++-
 ballista/scheduler/src/standalone.rs            |  8 +++++---
 ballista/scheduler/src/state/session_manager.rs |  8 ++++----
 examples/examples/custom-client.rs              |  2 +-
 examples/src/object_store.rs                    |  8 ++++----
 examples/tests/object_store.rs                  | 17 +++++++++--------
 8 files changed, 33 insertions(+), 27 deletions(-)

diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index bf753325..1506c2bb 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -62,12 +62,14 @@ use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
 
 /// Default session builder using the provided configuration
-pub fn default_session_builder(config: SessionConfig) -> SessionState {
-    SessionStateBuilder::new()
+pub fn default_session_builder(
+    config: SessionConfig,
+) -> datafusion::common::Result<SessionState> {
+    Ok(SessionStateBuilder::new()
         .with_default_features()
         .with_config(config)
-        
.with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()))
-        .build()
+        .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default())?))
+        .build())
 }
 
 pub fn default_config_producer() -> SessionConfig {
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index 6df04403..c9eac564 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -408,7 +408,7 @@ impl JobState for InMemoryJobState {
         &self,
         config: &SessionConfig,
     ) -> Result<Arc<SessionContext>> {
-        let session = create_datafusion_context(config, 
self.session_builder.clone());
+        let session = create_datafusion_context(config, 
self.session_builder.clone())?;
         self.sessions.insert(session.session_id(), session.clone());
 
         Ok(session)
@@ -419,7 +419,7 @@ impl JobState for InMemoryJobState {
         session_id: &str,
         config: &SessionConfig,
     ) -> Result<Arc<SessionContext>> {
-        let session = create_datafusion_context(config, 
self.session_builder.clone());
+        let session = create_datafusion_context(config, 
self.session_builder.clone())?;
         self.sessions
             .insert(session_id.to_string(), session.clone());
 
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index b6eeafda..653e2d41 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -56,7 +56,8 @@ mod external_scaler;
 mod grpc;
 pub(crate) mod query_stage_scheduler;
 
-pub type SessionBuilder = Arc<dyn Fn(SessionConfig) -> SessionState + Send + 
Sync>;
+pub type SessionBuilder =
+    Arc<dyn Fn(SessionConfig) -> datafusion::common::Result<SessionState> + 
Send + Sync>;
 
 #[derive(Clone)]
 pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> {
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index 9ad887c6..e9c48345 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -57,9 +57,11 @@ pub async fn new_standalone_scheduler_from_state(
     let session_config = session_state.config().clone();
     let session_state = session_state.clone();
     let session_builder = Arc::new(move |c: SessionConfig| {
-        SessionStateBuilder::new_from_existing(session_state.clone())
-            .with_config(c)
-            .build()
+        Ok(
+            SessionStateBuilder::new_from_existing(session_state.clone())
+                .with_config(c)
+                .build(),
+        )
     });
 
     let config_producer = Arc::new(move || session_config.clone());
diff --git a/ballista/scheduler/src/state/session_manager.rs 
b/ballista/scheduler/src/state/session_manager.rs
index 8a769edb..59813167 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -67,7 +67,7 @@ impl SessionManager {
 pub fn create_datafusion_context(
     session_config: &SessionConfig,
     session_builder: SessionBuilder,
-) -> Arc<SessionContext> {
+) -> datafusion::common::Result<Arc<SessionContext>> {
     let session_state = if session_config.round_robin_repartition() {
         let session_config = session_config
             .clone()
@@ -75,10 +75,10 @@ pub fn create_datafusion_context(
             .with_round_robin_repartition(false);
 
         log::warn!("session manager will override 
`datafusion.optimizer.enable_round_robin_repartition` to `false` ");
-        session_builder(session_config)
+        session_builder(session_config)?
     } else {
-        session_builder(session_config.clone())
+        session_builder(session_config.clone())?
     };
 
-    Arc::new(SessionContext::new_with_state(session_state))
+    Ok(Arc::new(SessionContext::new_with_state(session_state)))
 }
diff --git a/examples/examples/custom-client.rs 
b/examples/examples/custom-client.rs
index 3577621e..9e7ec859 100644
--- a/examples/examples/custom-client.rs
+++ b/examples/examples/custom-client.rs
@@ -62,7 +62,7 @@ async fn main() -> Result<()> {
 
     // new sessions state with required custom session configuration and 
runtime environment
     let state =
-        
custom_session_state_with_s3_support(custom_session_config_with_s3_options());
+        
custom_session_state_with_s3_support(custom_session_config_with_s3_options())?;
 
     let ctx: SessionContext =
         SessionContext::remote_with_state("df://localhost:50050", 
state).await?;
diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs
index 130d4705..3cd22fa6 100644
--- a/examples/src/object_store.rs
+++ b/examples/src/object_store.rs
@@ -88,13 +88,13 @@ pub fn custom_runtime_env_with_s3_support(
 /// and [RuntimeEnv].
 pub fn custom_session_state_with_s3_support(
     session_config: SessionConfig,
-) -> SessionState {
-    let runtime_env = 
custom_runtime_env_with_s3_support(&session_config).unwrap();
+) -> datafusion::common::Result<SessionState> {
+    let runtime_env = custom_runtime_env_with_s3_support(&session_config)?;
 
-    SessionStateBuilder::new()
+    Ok(SessionStateBuilder::new()
         .with_runtime_env(runtime_env)
         .with_config(session_config)
-        .build()
+        .build())
 }
 
 /// Custom [ObjectStoreRegistry] which will create
diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs
index cd5c2def..ca47c5cb 100644
--- a/examples/tests/object_store.rs
+++ b/examples/tests/object_store.rs
@@ -298,7 +298,7 @@ mod custom_s3_config {
         // object store registry.
 
         let session_builder = Arc::new(produce_state);
-        let state = session_builder(config_producer());
+        let state = session_builder(config_producer())?;
 
         // setting up ballista cluster with new runtime, configuration, and 
session state producers
         let (host, port) = crate::common::setup_test_cluster_with_builders(
@@ -416,7 +416,7 @@ mod custom_s3_config {
         // object store registry.
 
         let session_builder = Arc::new(produce_state);
-        let state = session_builder(config_producer());
+        let state = session_builder(config_producer())?;
 
         // // establishing cluster connection,
         let ctx: SessionContext = 
SessionContext::standalone_with_state(state).await?;
@@ -480,24 +480,25 @@ mod custom_s3_config {
         Ok(())
     }
 
-    fn produce_state(session_config: SessionConfig) -> SessionState {
+    fn produce_state(
+        session_config: SessionConfig,
+    ) -> datafusion::common::Result<SessionState> {
         let s3options = session_config
             .options()
             .extensions
             .get::<S3Options>()
             .ok_or(DataFusionError::Configuration(
                 "S3 Options not set".to_string(),
-            ))
-            .unwrap();
+            ))?;
 
         let config = RuntimeConfig::new().with_object_store_registry(Arc::new(
             CustomObjectStoreRegistry::new(s3options.clone()),
         ));
-        let runtime_env = RuntimeEnv::new(config).unwrap();
+        let runtime_env = RuntimeEnv::new(config)?;
 
-        SessionStateBuilder::new()
+        Ok(SessionStateBuilder::new()
             .with_runtime_env(runtime_env.into())
             .with_config(session_config)
-            .build()
+            .build())
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to