This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 5bb69a77 Ballista proto cleanup (#1110)
5bb69a77 is described below
commit 5bb69a77a91c2cf4f76e04933dc98347bef17a92
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Nov 6 17:37:33 2024 +0000
Ballista proto cleanup (#1110)
* remove `GetFileMetadata` from proto ...
... as it was not finished. There is
no documentation what was the intention of it.
* host is optional instead on one off ...
... as we upgraded to latest protoc since then.
* proposal to deprecate `ExecuteQueryParams.sql`
---
ballista/core/proto/ballista.proto | 22 +----
ballista/core/src/serde/generated/ballista.rs | 111 ++----------------------
ballista/executor/src/executor.rs | 2 +-
ballista/executor/src/executor_process.rs | 15 +---
ballista/executor/src/standalone.rs | 5 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 98 +++------------------
docs/developer/architecture.md | 1 -
7 files changed, 30 insertions(+), 224 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index eab1d801..a40e6f2d 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -289,14 +289,11 @@ message ExecutorMetadata {
}
-// Used by grpc
+// Used for scheduler-executor
+// communication
message ExecutorRegistration {
string id = 1;
- // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14
(see https://github.com/tokio-rs/prost/issues/430 and
https://github.com/tokio-rs/prost/pull/455)
- // this syntax is ugly but is binary compatible with the "optional" keyword
(see
https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
- oneof optional_host {
- string host = 2;
- }
+ optional string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
@@ -527,7 +524,7 @@ message UpdateTaskStatusResult {
message ExecuteQueryParams {
oneof query {
bytes logical_plan = 1;
- string sql = 2;
+ string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL
needed use `flight-sql`
}
oneof optional_session_id {
string session_id = 3;
@@ -629,15 +626,6 @@ message GetJobStatusResult {
JobStatus status = 1;
}
-message GetFileMetadataParams {
- string path = 1;
- string file_type = 2;
-}
-
-message GetFileMetadataResult {
- datafusion_common.Schema schema = 1;
-}
-
message FilePartitionMetadata {
repeated string filename = 1;
}
@@ -713,8 +701,6 @@ service SchedulerGrpc {
rpc UpdateTaskStatus (UpdateTaskStatusParams) returns
(UpdateTaskStatusResult) {}
- rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult)
{}
-
rpc CreateSession (CreateSessionParams) returns (CreateSessionResult) {}
rpc UpdateSession (UpdateSessionParams) returns (UpdateSessionResult) {}
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index 51a7b80b..d61ef331 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -435,31 +435,20 @@ pub struct ExecutorMetadata {
#[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
}
-/// Used by grpc
+/// Used for scheduler-executor
+/// communication
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorRegistration {
#[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
+ #[prost(string, optional, tag = "2")]
+ pub host: ::core::option::Option<::prost::alloc::string::String>,
#[prost(uint32, tag = "3")]
pub port: u32,
#[prost(uint32, tag = "4")]
pub grpc_port: u32,
#[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
- /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14
(see <https://github.com/tokio-rs/prost/issues/430> and
<https://github.com/tokio-rs/prost/pull/455>)
- /// this syntax is ugly but is binary compatible with the "optional"
keyword (see
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
- #[prost(oneof = "executor_registration::OptionalHost", tags = "2")]
- pub optional_host:
::core::option::Option<executor_registration::OptionalHost>,
-}
-/// Nested message and enum types in `ExecutorRegistration`.
-pub mod executor_registration {
- /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14
(see <https://github.com/tokio-rs/prost/issues/430> and
<https://github.com/tokio-rs/prost/pull/455>)
- /// this syntax is ugly but is binary compatible with the "optional"
keyword (see
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
- #[derive(Clone, PartialEq, ::prost::Oneof)]
- pub enum OptionalHost {
- #[prost(string, tag = "2")]
- Host(::prost::alloc::string::String),
- }
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorHeartbeat {
@@ -815,6 +804,7 @@ pub mod execute_query_params {
pub enum Query {
#[prost(bytes, tag = "1")]
LogicalPlan(::prost::alloc::vec::Vec<u8>),
+ /// I'd suggest to remove this, if SQL needed use `flight-sql`
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
@@ -971,18 +961,6 @@ pub struct GetJobStatusResult {
pub status: ::core::option::Option<JobStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct GetFileMetadataParams {
- #[prost(string, tag = "1")]
- pub path: ::prost::alloc::string::String,
- #[prost(string, tag = "2")]
- pub file_type: ::prost::alloc::string::String,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct GetFileMetadataResult {
- #[prost(message, optional, tag = "1")]
- pub schema: ::core::option::Option<::datafusion_proto_common::Schema>,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilePartitionMetadata {
#[prost(string, repeated, tag = "1")]
pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
@@ -1262,32 +1240,6 @@ pub mod scheduler_grpc_client {
);
self.inner.unary(req, path, codec).await
}
- pub async fn get_file_metadata(
- &mut self,
- request: impl tonic::IntoRequest<super::GetFileMetadataParams>,
- ) -> std::result::Result<
- tonic::Response<super::GetFileMetadataResult>,
- tonic::Status,
- > {
- self.inner
- .ready()
- .await
- .map_err(|e| {
- tonic::Status::unknown(
- format!("Service was not ready: {}", e.into()),
- )
- })?;
- let codec = tonic::codec::ProstCodec::default();
- let path = http::uri::PathAndQuery::from_static(
- "/ballista.protobuf.SchedulerGrpc/GetFileMetadata",
- );
- let mut req = request.into_request();
- req.extensions_mut()
- .insert(
- GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"GetFileMetadata"),
- );
- self.inner.unary(req, path, codec).await
- }
pub async fn create_session(
&mut self,
request: impl tonic::IntoRequest<super::CreateSessionParams>,
@@ -1756,13 +1708,6 @@ pub mod scheduler_grpc_server {
tonic::Response<super::UpdateTaskStatusResult>,
tonic::Status,
>;
- async fn get_file_metadata(
- &self,
- request: tonic::Request<super::GetFileMetadataParams>,
- ) -> std::result::Result<
- tonic::Response<super::GetFileMetadataResult>,
- tonic::Status,
- >;
async fn create_session(
&self,
request: tonic::Request<super::CreateSessionParams>,
@@ -2080,52 +2025,6 @@ pub mod scheduler_grpc_server {
};
Box::pin(fut)
}
- "/ballista.protobuf.SchedulerGrpc/GetFileMetadata" => {
- #[allow(non_camel_case_types)]
- struct GetFileMetadataSvc<T: SchedulerGrpc>(pub Arc<T>);
- impl<
- T: SchedulerGrpc,
- > tonic::server::UnaryService<super::GetFileMetadataParams>
- for GetFileMetadataSvc<T> {
- type Response = super::GetFileMetadataResult;
- type Future = BoxFuture<
- tonic::Response<Self::Response>,
- tonic::Status,
- >;
- fn call(
- &mut self,
- request:
tonic::Request<super::GetFileMetadataParams>,
- ) -> Self::Future {
- let inner = Arc::clone(&self.0);
- let fut = async move {
- <T as
SchedulerGrpc>::get_file_metadata(&inner, request)
- .await
- };
- Box::pin(fut)
- }
- }
- let accept_compression_encodings =
self.accept_compression_encodings;
- let send_compression_encodings =
self.send_compression_encodings;
- let max_decoding_message_size =
self.max_decoding_message_size;
- let max_encoding_message_size =
self.max_encoding_message_size;
- let inner = self.inner.clone();
- let fut = async move {
- let method = GetFileMetadataSvc(inner);
- let codec = tonic::codec::ProstCodec::default();
- let mut grpc = tonic::server::Grpc::new(codec)
- .apply_compression_config(
- accept_compression_encodings,
- send_compression_encodings,
- )
- .apply_max_message_size_config(
- max_decoding_message_size,
- max_encoding_message_size,
- );
- let res = grpc.unary(method, req).await;
- Ok(res)
- };
- Box::pin(fut)
- }
"/ballista.protobuf.SchedulerGrpc/CreateSession" => {
#[allow(non_camel_case_types)]
struct CreateSessionSvc<T: SchedulerGrpc>(pub Arc<T>);
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 53a36855..a799b85c 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -359,7 +359,7 @@ mod test {
port: 0,
grpc_port: 0,
specification: None,
- optional_host: None,
+ host: None,
};
let config_producer = Arc::new(|| {
SessionConfig::new().with_option_extension(BallistaConfig::new().unwrap())
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index a15bfadb..fe57ec37 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -50,9 +50,8 @@ use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
- executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
- ExecutorRegistration, ExecutorResource, ExecutorSpecification,
ExecutorStatus,
- ExecutorStoppedParams, HeartBeatParams,
+ scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration,
ExecutorResource,
+ ExecutorSpecification, ExecutorStatus, ExecutorStoppedParams,
HeartBeatParams,
};
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec,
@@ -184,10 +183,7 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
let executor_id = Uuid::new_v4().to_string();
let executor_meta = ExecutorRegistration {
id: executor_id.clone(),
- optional_host: opt
- .external_host
- .clone()
- .map(executor_registration::OptionalHost::Host),
+ host: opt.external_host.clone(),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
@@ -392,10 +388,7 @@ pub async fn start_executor_process(opt:
Arc<ExecutorProcessConfig>) -> Result<(
}),
metadata: Some(ExecutorRegistration {
id: executor_id.clone(),
- optional_host: opt
- .external_host
- .clone()
- .map(executor_registration::OptionalHost::Host),
+ host: opt.external_host.clone(),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index 628de96f..28efe70f 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -23,7 +23,6 @@ use ballista_core::utils::SessionConfigExt;
use ballista_core::{
error::Result,
object_store_registry::with_object_store_registry,
- serde::protobuf::executor_registration::OptionalHost,
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration},
serde::scheduler::ExecutorSpecification,
serde::BallistaCodec,
@@ -73,7 +72,7 @@ pub async fn new_standalone_executor_from_state<
let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
- optional_host: Some(OptionalHost::Host("localhost".to_string())),
+ host: Some("localhost".to_string()),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
@@ -145,7 +144,7 @@ pub async fn new_standalone_executor<
let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
- optional_host: Some(OptionalHost::Host("localhost".to_string())),
+ host: Some("localhost".to_string()),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index e475e438..1758dfd8 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -19,40 +19,30 @@ use axum::extract::ConnectInfo;
use ballista_core::config::{BallistaConfig, BALLISTA_JOB_NAME};
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId,
Query};
use std::collections::HashMap;
-use std::convert::TryInto;
use std::net::SocketAddr;
-use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
execute_query_failure_result, execute_query_result, AvailableTaskSlots,
CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult,
CreateSessionParams, CreateSessionResult, ExecuteQueryFailureResult,
ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult,
ExecutorHeartbeat,
- ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams,
- GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
HeartBeatParams,
- HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
- RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult,
- UpdateSessionParams, UpdateSessionResult, UpdateTaskStatusParams,
- UpdateTaskStatusResult,
+ ExecutorStoppedParams, ExecutorStoppedResult, GetJobStatusParams,
GetJobStatusResult,
+ HeartBeatParams, HeartBeatResult, PollWorkParams, PollWorkResult,
+ RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams,
+ RemoveSessionResult, UpdateSessionParams, UpdateSessionResult,
+ UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::ExecutorMetadata;
-
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
-use futures::TryStreamExt;
use log::{debug, error, info, trace, warn};
-use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use std::ops::Deref;
-use std::sync::Arc;
use crate::cluster::{bind_task_bias, bind_task_round_robin};
use crate::config::TaskDistributionPolicy;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use datafusion::prelude::SessionContext;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
@@ -88,10 +78,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
- .optional_host
- .map(|h| match h {
- OptionalHost::Host(host) => host,
- })
+ .host
.unwrap_or_else(||
remote_addr.unwrap().ip().to_string()),
port: metadata.port as u16,
grpc_port: metadata.grpc_port as u16,
@@ -166,10 +153,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
- .optional_host
- .map(|h| match h {
- OptionalHost::Host(host) => host,
- })
+ .host
.unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
port: metadata.port as u16,
grpc_port: metadata.grpc_port as u16,
@@ -214,10 +198,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
- .optional_host
- .map(|h| match h {
- OptionalHost::Host(host) => host,
- })
+ .host
.unwrap_or_else(||
remote_addr.unwrap().ip().to_string()),
port: metadata.port as u16,
grpc_port: metadata.grpc_port as u16,
@@ -286,56 +267,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
Ok(Response::new(UpdateTaskStatusResult { success: true }))
}
- async fn get_file_metadata(
- &self,
- request: Request<GetFileMetadataParams>,
- ) -> Result<Response<GetFileMetadataResult>, Status> {
- // Here, we use the default config, since we don't know the session id
- let session_ctx = SessionContext::new();
- let state = session_ctx.state();
-
- // TODO support multiple object stores
- let obj_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
- // TODO shouldn't this take a ListingOption object as input?
-
- let GetFileMetadataParams { path, file_type } = request.into_inner();
- let file_format: Arc<dyn FileFormat> = match file_type.as_str() {
- "parquet" => Ok(Arc::new(ParquetFormat::default())),
- // TODO implement for CSV
- _ => Err(tonic::Status::unimplemented(
- "get_file_metadata unsupported file type",
- )),
- }?;
-
- let path = Path::from(path.as_str());
- let file_metas: Vec<_> = obj_store
- .list(Some(&path))
- .try_collect()
- .await
- .map_err(|e| {
- let msg = format!("Error listing files: {e}");
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?;
-
- let schema = file_format
- .infer_schema(&state, &obj_store, &file_metas)
- .await
- .map_err(|e| {
- let msg = format!("Error inferring schema: {e}");
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?;
-
- Ok(Response::new(GetFileMetadataResult {
- schema: Some(schema.as_ref().try_into().map_err(|e| {
- let msg = format!("Error inferring schema: {e}");
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?),
- }))
- }
-
async fn create_session(
&self,
request: Request<CreateSessionParams>,
@@ -661,9 +592,8 @@ mod test {
use crate::metrics::default_metrics_collector;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
- executor_registration::OptionalHost, executor_status,
ExecutorRegistration,
- ExecutorStatus, ExecutorStoppedParams, HeartBeatParams, PollWorkParams,
- RegisterExecutorParams,
+ executor_status, ExecutorRegistration, ExecutorStatus,
ExecutorStoppedParams,
+ HeartBeatParams, PollWorkParams, RegisterExecutorParams,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
@@ -690,7 +620,7 @@ mod test {
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
- optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
+ host: Some("http://localhost:8080".to_owned()),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
@@ -778,7 +708,7 @@ mod test {
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
- optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
+ host: Some("http://localhost:8080".to_owned()),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
@@ -863,7 +793,7 @@ mod test {
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
- optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
+ host: Some("http://localhost:8080".to_owned()),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
@@ -916,7 +846,7 @@ mod test {
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
- optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
+ host: Some("http://localhost:8080".to_owned()),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
diff --git a/docs/developer/architecture.md b/docs/developer/architecture.md
index 9ae20d9d..d49e5013 100644
--- a/docs/developer/architecture.md
+++ b/docs/developer/architecture.md
@@ -57,7 +57,6 @@ The scheduler process implements a gRPC interface (defined in
| -------------------- |
-------------------------------------------------------------------- |
| ExecuteQuery | Submit a logical query plan or SQL query for
execution |
| GetExecutorsMetadata | Retrieves a list of executors that have registered
with a scheduler |
-| GetFileMetadata | Retrieve metadata about files available in the
cluster file system |
| GetJobStatus | Get the status of a submitted query
|
| RegisterExecutor | Executors call this method to register themselves
with the scheduler |
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]