This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new cc2ddcb8 make session optional (#1146)
cc2ddcb8 is described below
commit cc2ddcb8bac7bb15c4f8a5d300fcec2f5b3eb1b3
Author: Marko Milenković <[email protected]>
AuthorDate: Thu Dec 5 16:00:30 2024 +0000
make session optional (#1146)
---
ballista/core/proto/ballista.proto | 11 +---
.../core/src/execution_plans/distributed_query.rs | 7 +-
ballista/core/src/extension.rs | 77 ++++++++++------------
ballista/core/src/serde/generated/ballista.rs | 20 ++----
ballista/scheduler/src/scheduler_server/grpc.rs | 19 +++---
5 files changed, 53 insertions(+), 81 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index a40e6f2d..cb3c148b 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -172,7 +172,7 @@ message TaskInputPartitions {
message KeyValuePair {
string key = 1;
- string value = 2;
+ optional string value = 2;
}
message Action {
@@ -458,10 +458,6 @@ message MultiTaskDefinition {
repeated KeyValuePair props = 9;
}
-message SessionSettings {
- repeated KeyValuePair configs = 1;
-}
-
message JobSessionConfig {
string session_id = 1;
repeated KeyValuePair configs = 2;
@@ -526,9 +522,8 @@ message ExecuteQueryParams {
bytes logical_plan = 1;
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL
needed use `flight-sql`
}
- oneof optional_session_id {
- string session_id = 3;
- }
+
+ optional string session_id = 3;
repeated KeyValuePair settings = 4;
}
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index dae4bb8e..785d3b0c 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -17,7 +17,6 @@
use crate::client::BallistaClient;
use crate::config::BallistaConfig;
-use crate::serde::protobuf::execute_query_params::OptionalSessionId;
use crate::serde::protobuf::{
execute_query_params::Query, execute_query_result, job_status,
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams,
GetJobStatusParams,
@@ -218,7 +217,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
.map(
|datafusion::config::ConfigEntry { key, value, .. }|
KeyValuePair {
key: key.to_owned(),
- value: value.clone().unwrap_or_else(|| String::from("")),
+ value: value.clone(),
},
)
.collect();
@@ -226,9 +225,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
let query = ExecuteQueryParams {
query: Some(Query::LogicalPlan(buf)),
settings,
- optional_session_id: Some(OptionalSessionId::SessionId(
- self.session_id.clone(),
- )),
+ session_id: Some(self.session_id.clone()),
};
let stream = futures::stream::once(
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 13892c1a..bb43e93b 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -287,60 +287,49 @@ impl SessionConfigHelperExt for SessionConfig {
self.options()
.entries()
.iter()
- .filter(|v| v.value.is_some())
- .map(
- // TODO MM make `value` optional value
- |datafusion::config::ConfigEntry { key, value, .. }| {
- log::trace!(
- "sending configuration key: `{}`, value`{:?}`",
- key,
- value
- );
- KeyValuePair {
- key: key.to_owned(),
- value: value.clone().unwrap(),
- }
- },
- )
+ .map(|datafusion::config::ConfigEntry { key, value, .. }| {
+ log::trace!("sending configuration key: `{}`, value`{:?}`",
key, value);
+ KeyValuePair {
+ key: key.to_owned(),
+ value: value.clone(),
+ }
+ })
.collect()
}
fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) ->
Self {
let mut s = self;
- for KeyValuePair { key, value } in key_value_pairs {
- log::trace!(
- "setting up configuration key: `{}`, value: `{}`",
- key,
- value
- );
- if let Err(e) = s.options_mut().set(key, value) {
- // there is not much we can do about this error at the moment
- log::debug!(
- "could not set configuration key: `{}`, value: `{}`,
reason: {}",
- key,
- value,
- e.to_string()
- )
- }
- }
+ s.update_from_key_value_pair_mut(key_value_pairs);
s
}
fn update_from_key_value_pair_mut(&mut self, key_value_pairs:
&[KeyValuePair]) {
for KeyValuePair { key, value } in key_value_pairs {
- log::trace!(
- "setting up configuration key : `{}`, value: `{}`",
- key,
- value
- );
- if let Err(e) = self.options_mut().set(key, value) {
- // there is not much we can do about this error at the moment
- log::debug!(
- "could not set configuration key: `{}`, value: `{}`,
reason: {}",
- key,
- value,
- e.to_string()
- )
+ match value {
+ Some(value) => {
+ log::trace!(
+ "setting up configuration key: `{}`, value: `{:?}`",
+ key,
+ value
+ );
+ if let Err(e) = self.options_mut().set(key, value) {
+ // there is not much we can do about this error at the
moment.
+ // it used to be warning but it gets very verbose
+ // as even datafusion properties can't be parsed
+ log::debug!(
+ "could not set configuration key: `{}`, value:
`{:?}`, reason: {}",
+ key,
+ value,
+ e.to_string()
+ )
+ }
+ }
+ None => {
+ log::trace!(
+ "can't set up configuration key: `{}`, as value is
None",
+ key,
+ )
+ }
}
}
}
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index d61ef331..d4faef82 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -249,8 +249,8 @@ pub struct TaskInputPartitions {
pub struct KeyValuePair {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
- #[prost(string, tag = "2")]
- pub value: ::prost::alloc::string::String,
+ #[prost(string, optional, tag = "2")]
+ pub value: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
@@ -708,11 +708,6 @@ pub struct MultiTaskDefinition {
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct SessionSettings {
- #[prost(message, repeated, tag = "1")]
- pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobSessionConfig {
#[prost(string, tag = "1")]
pub session_id: ::prost::alloc::string::String,
@@ -789,14 +784,12 @@ pub struct UpdateTaskStatusResult {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryParams {
+ #[prost(string, optional, tag = "3")]
+ pub session_id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, repeated, tag = "4")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
#[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
pub query: ::core::option::Option<execute_query_params::Query>,
- #[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")]
- pub optional_session_id: ::core::option::Option<
- execute_query_params::OptionalSessionId,
- >,
}
/// Nested message and enum types in `ExecuteQueryParams`.
pub mod execute_query_params {
@@ -808,11 +801,6 @@ pub mod execute_query_params {
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
- #[derive(Clone, PartialEq, ::prost::Oneof)]
- pub enum OptionalSessionId {
- #[prost(string, tag = "3")]
- SessionId(::prost::alloc::string::String),
- }
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateSessionParams {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 52cdc985..02c21a88 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -18,7 +18,7 @@
use axum::extract::ConnectInfo;
use ballista_core::config::BALLISTA_JOB_NAME;
use ballista_core::extension::SessionConfigHelperExt;
-use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId,
Query};
+use ballista_core::serde::protobuf::execute_query_params::Query;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
execute_query_failure_result, execute_query_result, AvailableTaskSlots,
@@ -337,25 +337,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let query_params = request.into_inner();
if let ExecuteQueryParams {
query: Some(query),
- optional_session_id,
+ session_id,
settings,
} = query_params
{
let job_name = settings
.iter()
.find(|s| s.key == BALLISTA_JOB_NAME)
- .map(|s| s.value.clone())
- .unwrap_or_else(|| "None".to_string());
+ .and_then(|s| s.value.clone())
+ .unwrap_or_default();
- let (session_id, session_ctx) = match optional_session_id {
- Some(OptionalSessionId::SessionId(session_id)) => {
+ let (session_id, session_ctx) = match session_id {
+ Some(session_id) => {
match
self.state.session_manager.get_session(&session_id).await {
Ok(ctx) => {
- // [SessionConfig] will be updated from received
properties
+ // Update [SessionConfig] using received properties
// TODO MM can we do something better here?
// move this to update session and use
.update_session(&session_params.session_id, &session_config)
- // instead of get_session
+ // instead of get_session.
+ //
+ // also we should consider sending properties
if/when changed rather than
+ // all properties every time
let state = ctx.state_ref();
let mut state = state.write();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]