This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new cc2ddcb8 make session optional (#1146)
cc2ddcb8 is described below

commit cc2ddcb8bac7bb15c4f8a5d300fcec2f5b3eb1b3
Author: Marko Milenković <[email protected]>
AuthorDate: Thu Dec 5 16:00:30 2024 +0000

    make session optional (#1146)
---
 ballista/core/proto/ballista.proto                 | 11 +---
 .../core/src/execution_plans/distributed_query.rs  |  7 +-
 ballista/core/src/extension.rs                     | 77 ++++++++++------------
 ballista/core/src/serde/generated/ballista.rs      | 20 ++----
 ballista/scheduler/src/scheduler_server/grpc.rs    | 19 +++---
 5 files changed, 53 insertions(+), 81 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index a40e6f2d..cb3c148b 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -172,7 +172,7 @@ message TaskInputPartitions {
 
 message KeyValuePair {
   string key = 1;
-  string value = 2;
+  optional string value = 2;
 }
 
 message Action {
@@ -458,10 +458,6 @@ message MultiTaskDefinition {
   repeated KeyValuePair props = 9;
 }
 
-message SessionSettings {
-  repeated KeyValuePair configs = 1;
-}
-
 message JobSessionConfig {
   string session_id = 1;
   repeated KeyValuePair configs = 2;
@@ -526,9 +522,8 @@ message ExecuteQueryParams {
     bytes logical_plan = 1;
     string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL 
needed use `flight-sql`
   }
-  oneof optional_session_id {
-    string session_id = 3;
-  }
+  
+  optional string session_id = 3;
   repeated KeyValuePair settings = 4;
 }
 
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index dae4bb8e..785d3b0c 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -17,7 +17,6 @@
 
 use crate::client::BallistaClient;
 use crate::config::BallistaConfig;
-use crate::serde::protobuf::execute_query_params::OptionalSessionId;
 use crate::serde::protobuf::{
     execute_query_params::Query, execute_query_result, job_status,
     scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, 
GetJobStatusParams,
@@ -218,7 +217,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
             .map(
                 |datafusion::config::ConfigEntry { key, value, .. }| 
KeyValuePair {
                     key: key.to_owned(),
-                    value: value.clone().unwrap_or_else(|| String::from("")),
+                    value: value.clone(),
                 },
             )
             .collect();
@@ -226,9 +225,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         let query = ExecuteQueryParams {
             query: Some(Query::LogicalPlan(buf)),
             settings,
-            optional_session_id: Some(OptionalSessionId::SessionId(
-                self.session_id.clone(),
-            )),
+            session_id: Some(self.session_id.clone()),
         };
 
         let stream = futures::stream::once(
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 13892c1a..bb43e93b 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -287,60 +287,49 @@ impl SessionConfigHelperExt for SessionConfig {
         self.options()
             .entries()
             .iter()
-            .filter(|v| v.value.is_some())
-            .map(
-                // TODO MM make `value` optional value
-                |datafusion::config::ConfigEntry { key, value, .. }| {
-                    log::trace!(
-                        "sending configuration key: `{}`, value`{:?}`",
-                        key,
-                        value
-                    );
-                    KeyValuePair {
-                        key: key.to_owned(),
-                        value: value.clone().unwrap(),
-                    }
-                },
-            )
+            .map(|datafusion::config::ConfigEntry { key, value, .. }| {
+                log::trace!("sending configuration key: `{}`, value`{:?}`", 
key, value);
+                KeyValuePair {
+                    key: key.to_owned(),
+                    value: value.clone(),
+                }
+            })
             .collect()
     }
 
     fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> 
Self {
         let mut s = self;
-        for KeyValuePair { key, value } in key_value_pairs {
-            log::trace!(
-                "setting up configuration key: `{}`, value: `{}`",
-                key,
-                value
-            );
-            if let Err(e) = s.options_mut().set(key, value) {
-                // there is not much we can do about this error at the moment
-                log::debug!(
-                    "could not set configuration key: `{}`, value: `{}`, 
reason: {}",
-                    key,
-                    value,
-                    e.to_string()
-                )
-            }
-        }
+        s.update_from_key_value_pair_mut(key_value_pairs);
         s
     }
 
     fn update_from_key_value_pair_mut(&mut self, key_value_pairs: 
&[KeyValuePair]) {
         for KeyValuePair { key, value } in key_value_pairs {
-            log::trace!(
-                "setting up configuration key : `{}`, value: `{}`",
-                key,
-                value
-            );
-            if let Err(e) = self.options_mut().set(key, value) {
-                // there is not much we can do about this error at the moment
-                log::debug!(
-                    "could not set configuration key: `{}`, value: `{}`, 
reason: {}",
-                    key,
-                    value,
-                    e.to_string()
-                )
+            match value {
+                Some(value) => {
+                    log::trace!(
+                        "setting up configuration key: `{}`, value: `{:?}`",
+                        key,
+                        value
+                    );
+                    if let Err(e) = self.options_mut().set(key, value) {
+                        // there is not much we can do about this error at the 
moment.
+                        // it used to be warning but it gets very verbose
+                        // as even datafusion properties can't be parsed
+                        log::debug!(
+                            "could not set configuration key: `{}`, value: 
`{:?}`, reason: {}",
+                            key,
+                            value,
+                            e.to_string()
+                        )
+                    }
+                }
+                None => {
+                    log::trace!(
+                        "can't set up configuration key: `{}`, as value is 
None",
+                        key,
+                    )
+                }
             }
         }
     }
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index d61ef331..d4faef82 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -249,8 +249,8 @@ pub struct TaskInputPartitions {
 pub struct KeyValuePair {
     #[prost(string, tag = "1")]
     pub key: ::prost::alloc::string::String,
-    #[prost(string, tag = "2")]
-    pub value: ::prost::alloc::string::String,
+    #[prost(string, optional, tag = "2")]
+    pub value: ::core::option::Option<::prost::alloc::string::String>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Action {
@@ -708,11 +708,6 @@ pub struct MultiTaskDefinition {
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct SessionSettings {
-    #[prost(message, repeated, tag = "1")]
-    pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobSessionConfig {
     #[prost(string, tag = "1")]
     pub session_id: ::prost::alloc::string::String,
@@ -789,14 +784,12 @@ pub struct UpdateTaskStatusResult {
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryParams {
+    #[prost(string, optional, tag = "3")]
+    pub session_id: ::core::option::Option<::prost::alloc::string::String>,
     #[prost(message, repeated, tag = "4")]
     pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
     #[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
     pub query: ::core::option::Option<execute_query_params::Query>,
-    #[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")]
-    pub optional_session_id: ::core::option::Option<
-        execute_query_params::OptionalSessionId,
-    >,
 }
 /// Nested message and enum types in `ExecuteQueryParams`.
 pub mod execute_query_params {
@@ -808,11 +801,6 @@ pub mod execute_query_params {
         #[prost(string, tag = "2")]
         Sql(::prost::alloc::string::String),
     }
-    #[derive(Clone, PartialEq, ::prost::Oneof)]
-    pub enum OptionalSessionId {
-        #[prost(string, tag = "3")]
-        SessionId(::prost::alloc::string::String),
-    }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CreateSessionParams {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 52cdc985..02c21a88 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -18,7 +18,7 @@
 use axum::extract::ConnectInfo;
 use ballista_core::config::BALLISTA_JOB_NAME;
 use ballista_core::extension::SessionConfigHelperExt;
-use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, 
Query};
+use ballista_core::serde::protobuf::execute_query_params::Query;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
     execute_query_failure_result, execute_query_result, AvailableTaskSlots,
@@ -337,25 +337,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         let query_params = request.into_inner();
         if let ExecuteQueryParams {
             query: Some(query),
-            optional_session_id,
+            session_id,
             settings,
         } = query_params
         {
             let job_name = settings
                 .iter()
                 .find(|s| s.key == BALLISTA_JOB_NAME)
-                .map(|s| s.value.clone())
-                .unwrap_or_else(|| "None".to_string());
+                .and_then(|s| s.value.clone())
+                .unwrap_or_default();
 
-            let (session_id, session_ctx) = match optional_session_id {
-                Some(OptionalSessionId::SessionId(session_id)) => {
+            let (session_id, session_ctx) = match session_id {
+                Some(session_id) => {
                     match 
self.state.session_manager.get_session(&session_id).await {
                         Ok(ctx) => {
-                            // [SessionConfig] will be updated from received 
properties
+                            // Update [SessionConfig] using received properties
 
                             // TODO MM can we do something better here?
                             // move this to update session and use 
.update_session(&session_params.session_id, &session_config)
-                            // instead of get_session
+                            // instead of get_session.
+                            //
+                            // also we should consider sending properties 
if/when changed rather than
+                            // all properties every time
 
                             let state = ctx.state_ref();
                             let mut state = state.write();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to