This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new ae1d3de4 Upgrade DataFusion to 23.0.0 (#755)
ae1d3de4 is described below
commit ae1d3de4f96de752ee0122faa1d4417b1ca4c2cb
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Apr 26 04:41:35 2023 +0800
Upgrade DataFusion to 23.0.0 (#755)
Co-authored-by: yangzhong <[email protected]>
---
Cargo.toml | 19 +-
ballista-cli/Cargo.toml | 2 +-
ballista/client/Cargo.toml | 2 +-
ballista/client/src/context.rs | 2 +-
ballista/core/Cargo.toml | 13 +-
ballista/core/src/serde/generated/ballista.rs | 454 ++++++++++++++++++++++----
ballista/core/src/serde/scheduler/mod.rs | 2 +-
ballista/executor/Cargo.toml | 8 +-
ballista/scheduler/Cargo.toml | 15 +-
ballista/scheduler/scheduler_config_spec.toml | 1 -
10 files changed, 421 insertions(+), 97 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index bbab677c..39233942 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,13 +17,22 @@
[workspace]
members = ["ballista-cli", "ballista/client", "ballista/core",
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
-exclude = ["python"]
[workspace.dependencies]
-arrow = { version = "36.0.0" }
-arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] }
-datafusion = "22.0.0"
-datafusion-proto = "22.0.0"
+arrow = { version = "37.0.0" }
+arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] }
+configure_me = { version = "0.4.0" }
+configure_me_codegen = { version = "0.4.4" }
+datafusion = "23.0.0"
+datafusion-cli = "23.0.0"
+datafusion-proto = "23.0.0"
+object_store = "0.5.4"
+sqlparser = "0.33.0"
+tonic = { version = "0.9" }
+tonic-build = { version = "0.9", default-features = false, features = [
+ "transport",
+ "prost",
+] }
# cargo build --profile release-lto
[profile.release-lto]
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 8f94db42..c805f3d1 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,7 +32,7 @@ readme = "README.md"
ballista = { path = "../ballista/client", version = "0.11.0", features =
["standalone"] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { workspace = true }
-datafusion-cli = "22.0.0"
+datafusion-cli = { workspace = true }
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 88b93378..e371a859 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -36,7 +36,7 @@ datafusion-proto = { workspace = true }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.32.0"
+sqlparser = { workspace = true }
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 84bbaf02..dea233d7 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -623,7 +623,7 @@ mod tests {
.map(|t| ListingTableUrl::parse(t).unwrap())
.collect();
let config =
ListingTableConfig::new_with_multi_paths(table_paths)
- .with_schema(Arc::new(Schema::new(vec![])))
+ .with_schema(Arc::new(Schema::empty()))
.with_listing_options(error_options);
let error_table = ListingTable::try_new(config).unwrap();
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 95282a36..feeb8311 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
build = "build.rs"
# Exclude proto files so crates.io consumers don't need protoc
@@ -58,7 +58,7 @@ hashbrown = "0.13"
itertools = "0.10"
libloading = "0.7.3"
log = "0.4"
-object_store = "0.5.2"
+object_store = { workspace = true }
once_cell = "1.9.0"
parking_lot = "0.12"
@@ -67,11 +67,11 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
-sqlparser = "0.32.0"
+sqlparser = { workspace = true }
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.8"
+tonic = { workspace = true }
url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
walkdir = "2.3.2"
@@ -81,7 +81,4 @@ tempfile = "3"
[build-dependencies]
rustc_version = "0.4.0"
-tonic-build = { version = "0.8", default-features = false, features = [
- "transport",
- "prost",
-] }
+tonic-build = { workspace = true }
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index f511f2a2..439bbf84 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1135,7 +1135,7 @@ pub mod scheduler_grpc_client {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self,
tonic::transport::Error>
where
- D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
@@ -1191,11 +1191,27 @@ pub mod scheduler_grpc_client {
self.inner = self.inner.accept_compressed(encoding);
self
}
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_decoding_message_size(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_encoding_message_size(limit);
+ self
+ }
/// Executors must poll the scheduler for heartbeat and to receive
tasks
pub async fn poll_work(
&mut self,
request: impl tonic::IntoRequest<super::PollWorkParams>,
- ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status> {
+ ) -> std::result::Result<tonic::Response<super::PollWorkResult>,
tonic::Status> {
self.inner
.ready()
.await
@@ -1209,12 +1225,18 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/PollWork",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"PollWork"));
+ self.inner.unary(req, path, codec).await
}
pub async fn register_executor(
&mut self,
request: impl tonic::IntoRequest<super::RegisterExecutorParams>,
- ) -> Result<tonic::Response<super::RegisterExecutorResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::RegisterExecutorResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1228,14 +1250,25 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/RegisterExecutor",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "ballista.protobuf.SchedulerGrpc",
+ "RegisterExecutor",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
}
/// Push-based task scheduler will only leverage this interface
/// rather than the PollWork interface to report executor states
pub async fn heart_beat_from_executor(
&mut self,
request: impl tonic::IntoRequest<super::HeartBeatParams>,
- ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::HeartBeatResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1249,12 +1282,23 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/HeartBeatFromExecutor",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "ballista.protobuf.SchedulerGrpc",
+ "HeartBeatFromExecutor",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn update_task_status(
&mut self,
request: impl tonic::IntoRequest<super::UpdateTaskStatusParams>,
- ) -> Result<tonic::Response<super::UpdateTaskStatusResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::UpdateTaskStatusResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1268,12 +1312,23 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/UpdateTaskStatus",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new(
+ "ballista.protobuf.SchedulerGrpc",
+ "UpdateTaskStatus",
+ ),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn get_file_metadata(
&mut self,
request: impl tonic::IntoRequest<super::GetFileMetadataParams>,
- ) -> Result<tonic::Response<super::GetFileMetadataResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::GetFileMetadataResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1287,12 +1342,20 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/GetFileMetadata",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"GetFileMetadata"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn execute_query(
&mut self,
request: impl tonic::IntoRequest<super::ExecuteQueryParams>,
- ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status>
{
+ ) -> std::result::Result<
+ tonic::Response<super::ExecuteQueryResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1306,12 +1369,20 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/ExecuteQuery",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"ExecuteQuery"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn get_job_status(
&mut self,
request: impl tonic::IntoRequest<super::GetJobStatusParams>,
- ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status>
{
+ ) -> std::result::Result<
+ tonic::Response<super::GetJobStatusResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1325,13 +1396,21 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/GetJobStatus",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"GetJobStatus"),
+ );
+ self.inner.unary(req, path, codec).await
}
/// Used by Executor to tell Scheduler it is stopped.
pub async fn executor_stopped(
&mut self,
request: impl tonic::IntoRequest<super::ExecutorStoppedParams>,
- ) -> Result<tonic::Response<super::ExecutorStoppedResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::ExecutorStoppedResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1345,12 +1424,20 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/ExecutorStopped",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"ExecutorStopped"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn cancel_job(
&mut self,
request: impl tonic::IntoRequest<super::CancelJobParams>,
- ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::CancelJobResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1364,12 +1451,18 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/CancelJob",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"CancelJob"));
+ self.inner.unary(req, path, codec).await
}
pub async fn clean_job_data(
&mut self,
request: impl tonic::IntoRequest<super::CleanJobDataParams>,
- ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status>
{
+ ) -> std::result::Result<
+ tonic::Response<super::CleanJobDataResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1383,7 +1476,12 @@ pub mod scheduler_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/CleanJobData",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.SchedulerGrpc",
"CleanJobData"),
+ );
+ self.inner.unary(req, path, codec).await
}
}
}
@@ -1400,7 +1498,7 @@ pub mod executor_grpc_client {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self,
tonic::transport::Error>
where
- D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
@@ -1456,10 +1554,29 @@ pub mod executor_grpc_client {
self.inner = self.inner.accept_compressed(encoding);
self
}
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_decoding_message_size(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.inner = self.inner.max_encoding_message_size(limit);
+ self
+ }
pub async fn launch_task(
&mut self,
request: impl tonic::IntoRequest<super::LaunchTaskParams>,
- ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::LaunchTaskResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1473,12 +1590,18 @@ pub mod executor_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.ExecutorGrpc/LaunchTask",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("ballista.protobuf.ExecutorGrpc",
"LaunchTask"));
+ self.inner.unary(req, path, codec).await
}
pub async fn launch_multi_task(
&mut self,
request: impl tonic::IntoRequest<super::LaunchMultiTaskParams>,
- ) -> Result<tonic::Response<super::LaunchMultiTaskResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::LaunchMultiTaskResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1492,12 +1615,20 @@ pub mod executor_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.ExecutorGrpc/LaunchMultiTask",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.ExecutorGrpc",
"LaunchMultiTask"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn stop_executor(
&mut self,
request: impl tonic::IntoRequest<super::StopExecutorParams>,
- ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status>
{
+ ) -> std::result::Result<
+ tonic::Response<super::StopExecutorResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1511,12 +1642,20 @@ pub mod executor_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.ExecutorGrpc/StopExecutor",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.ExecutorGrpc",
"StopExecutor"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn cancel_tasks(
&mut self,
request: impl tonic::IntoRequest<super::CancelTasksParams>,
- ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::CancelTasksResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1530,12 +1669,20 @@ pub mod executor_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.ExecutorGrpc/CancelTasks",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.ExecutorGrpc",
"CancelTasks"),
+ );
+ self.inner.unary(req, path, codec).await
}
pub async fn remove_job_data(
&mut self,
request: impl tonic::IntoRequest<super::RemoveJobDataParams>,
- ) -> Result<tonic::Response<super::RemoveJobDataResult>,
tonic::Status> {
+ ) -> std::result::Result<
+ tonic::Response<super::RemoveJobDataResult>,
+ tonic::Status,
+ > {
self.inner
.ready()
.await
@@ -1549,7 +1696,12 @@ pub mod executor_grpc_client {
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.ExecutorGrpc/RemoveJobData",
);
- self.inner.unary(request.into_request(), path, codec).await
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(
+ GrpcMethod::new("ballista.protobuf.ExecutorGrpc",
"RemoveJobData"),
+ );
+ self.inner.unary(req, path, codec).await
}
}
}
@@ -1564,52 +1716,75 @@ pub mod scheduler_grpc_server {
async fn poll_work(
&self,
request: tonic::Request<super::PollWorkParams>,
- ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status>;
+ ) -> std::result::Result<tonic::Response<super::PollWorkResult>,
tonic::Status>;
async fn register_executor(
&self,
request: tonic::Request<super::RegisterExecutorParams>,
- ) -> Result<tonic::Response<super::RegisterExecutorResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::RegisterExecutorResult>,
+ tonic::Status,
+ >;
/// Push-based task scheduler will only leverage this interface
/// rather than the PollWork interface to report executor states
async fn heart_beat_from_executor(
&self,
request: tonic::Request<super::HeartBeatParams>,
- ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status>;
+ ) -> std::result::Result<tonic::Response<super::HeartBeatResult>,
tonic::Status>;
async fn update_task_status(
&self,
request: tonic::Request<super::UpdateTaskStatusParams>,
- ) -> Result<tonic::Response<super::UpdateTaskStatusResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::UpdateTaskStatusResult>,
+ tonic::Status,
+ >;
async fn get_file_metadata(
&self,
request: tonic::Request<super::GetFileMetadataParams>,
- ) -> Result<tonic::Response<super::GetFileMetadataResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::GetFileMetadataResult>,
+ tonic::Status,
+ >;
async fn execute_query(
&self,
request: tonic::Request<super::ExecuteQueryParams>,
- ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::ExecuteQueryResult>,
+ tonic::Status,
+ >;
async fn get_job_status(
&self,
request: tonic::Request<super::GetJobStatusParams>,
- ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::GetJobStatusResult>,
+ tonic::Status,
+ >;
/// Used by Executor to tell Scheduler it is stopped.
async fn executor_stopped(
&self,
request: tonic::Request<super::ExecutorStoppedParams>,
- ) -> Result<tonic::Response<super::ExecutorStoppedResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::ExecutorStoppedResult>,
+ tonic::Status,
+ >;
async fn cancel_job(
&self,
request: tonic::Request<super::CancelJobParams>,
- ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status>;
+ ) -> std::result::Result<tonic::Response<super::CancelJobResult>,
tonic::Status>;
async fn clean_job_data(
&self,
request: tonic::Request<super::CleanJobDataParams>,
- ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::CleanJobDataResult>,
+ tonic::Status,
+ >;
}
#[derive(Debug)]
pub struct SchedulerGrpcServer<T: SchedulerGrpc> {
inner: _Inner<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
+ max_decoding_message_size: Option<usize>,
+ max_encoding_message_size: Option<usize>,
}
struct _Inner<T>(Arc<T>);
impl<T: SchedulerGrpc> SchedulerGrpcServer<T> {
@@ -1622,6 +1797,8 @@ pub mod scheduler_grpc_server {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
+ max_decoding_message_size: None,
+ max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
@@ -1645,6 +1822,22 @@ pub mod scheduler_grpc_server {
self.send_compression_encodings.enable(encoding);
self
}
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.max_decoding_message_size = Some(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.max_encoding_message_size = Some(limit);
+ self
+ }
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for
SchedulerGrpcServer<T>
where
@@ -1658,7 +1851,7 @@ pub mod scheduler_grpc_server {
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ ) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -1680,13 +1873,15 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::PollWorkParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).poll_work(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1696,6 +1891,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1718,7 +1917,7 @@ pub mod scheduler_grpc_server {
&mut self,
request:
tonic::Request<super::RegisterExecutorParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).register_executor(request).await
};
@@ -1727,6 +1926,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1736,6 +1937,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1758,7 +1963,7 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::HeartBeatParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).heart_beat_from_executor(request).await
};
@@ -1767,6 +1972,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1776,6 +1983,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1798,7 +2009,7 @@ pub mod scheduler_grpc_server {
&mut self,
request:
tonic::Request<super::UpdateTaskStatusParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).update_task_status(request).await
};
@@ -1807,6 +2018,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1816,6 +2029,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1838,7 +2055,7 @@ pub mod scheduler_grpc_server {
&mut self,
request:
tonic::Request<super::GetFileMetadataParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).get_file_metadata(request).await
};
@@ -1847,6 +2064,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1856,6 +2075,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1878,7 +2101,7 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::ExecuteQueryParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).execute_query(request).await
};
@@ -1887,6 +2110,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1896,6 +2121,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1918,7 +2147,7 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::GetJobStatusParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).get_job_status(request).await
};
@@ -1927,6 +2156,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1936,6 +2167,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1958,7 +2193,7 @@ pub mod scheduler_grpc_server {
&mut self,
request:
tonic::Request<super::ExecutorStoppedParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).executor_stopped(request).await
};
@@ -1967,6 +2202,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -1976,6 +2213,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -1998,13 +2239,15 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::CancelJobParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).cancel_job(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2014,6 +2257,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2036,7 +2283,7 @@ pub mod scheduler_grpc_server {
&mut self,
request: tonic::Request<super::CleanJobDataParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).clean_job_data(request).await
};
@@ -2045,6 +2292,8 @@ pub mod scheduler_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2054,6 +2303,10 @@ pub mod scheduler_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2082,12 +2335,14 @@ pub mod scheduler_grpc_server {
inner,
accept_compression_encodings:
self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
+ max_decoding_message_size: self.max_decoding_message_size,
+ max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: SchedulerGrpc> Clone for _Inner<T> {
fn clone(&self) -> Self {
- Self(self.0.clone())
+ Self(Arc::clone(&self.0))
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
@@ -2109,29 +2364,46 @@ pub mod executor_grpc_server {
async fn launch_task(
&self,
request: tonic::Request<super::LaunchTaskParams>,
- ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::LaunchTaskResult>,
+ tonic::Status,
+ >;
async fn launch_multi_task(
&self,
request: tonic::Request<super::LaunchMultiTaskParams>,
- ) -> Result<tonic::Response<super::LaunchMultiTaskResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::LaunchMultiTaskResult>,
+ tonic::Status,
+ >;
async fn stop_executor(
&self,
request: tonic::Request<super::StopExecutorParams>,
- ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::StopExecutorResult>,
+ tonic::Status,
+ >;
async fn cancel_tasks(
&self,
request: tonic::Request<super::CancelTasksParams>,
- ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::CancelTasksResult>,
+ tonic::Status,
+ >;
async fn remove_job_data(
&self,
request: tonic::Request<super::RemoveJobDataParams>,
- ) -> Result<tonic::Response<super::RemoveJobDataResult>,
tonic::Status>;
+ ) -> std::result::Result<
+ tonic::Response<super::RemoveJobDataResult>,
+ tonic::Status,
+ >;
}
#[derive(Debug)]
pub struct ExecutorGrpcServer<T: ExecutorGrpc> {
inner: _Inner<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
+ max_decoding_message_size: Option<usize>,
+ max_encoding_message_size: Option<usize>,
}
struct _Inner<T>(Arc<T>);
impl<T: ExecutorGrpc> ExecutorGrpcServer<T> {
@@ -2144,6 +2416,8 @@ pub mod executor_grpc_server {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
+ max_decoding_message_size: None,
+ max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
@@ -2167,6 +2441,22 @@ pub mod executor_grpc_server {
self.send_compression_encodings.enable(encoding);
self
}
+ /// Limits the maximum size of a decoded message.
+ ///
+ /// Default: `4MB`
+ #[must_use]
+ pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+ self.max_decoding_message_size = Some(limit);
+ self
+ }
+ /// Limits the maximum size of an encoded message.
+ ///
+ /// Default: `usize::MAX`
+ #[must_use]
+ pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+ self.max_encoding_message_size = Some(limit);
+ self
+ }
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for
ExecutorGrpcServer<T>
where
@@ -2180,7 +2470,7 @@ pub mod executor_grpc_server {
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ ) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -2202,13 +2492,15 @@ pub mod executor_grpc_server {
&mut self,
request: tonic::Request<super::LaunchTaskParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).launch_task(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2218,6 +2510,10 @@ pub mod executor_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2240,7 +2536,7 @@ pub mod executor_grpc_server {
&mut self,
request:
tonic::Request<super::LaunchMultiTaskParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).launch_multi_task(request).await
};
@@ -2249,6 +2545,8 @@ pub mod executor_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2258,6 +2556,10 @@ pub mod executor_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2280,7 +2582,7 @@ pub mod executor_grpc_server {
&mut self,
request: tonic::Request<super::StopExecutorParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).stop_executor(request).await
};
@@ -2289,6 +2591,8 @@ pub mod executor_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2298,6 +2602,10 @@ pub mod executor_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2320,7 +2628,7 @@ pub mod executor_grpc_server {
&mut self,
request: tonic::Request<super::CancelTasksParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).cancel_tasks(request).await
};
@@ -2329,6 +2637,8 @@ pub mod executor_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2338,6 +2648,10 @@ pub mod executor_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2360,7 +2674,7 @@ pub mod executor_grpc_server {
&mut self,
request:
tonic::Request<super::RemoveJobDataParams>,
) -> Self::Future {
- let inner = self.0.clone();
+ let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).remove_job_data(request).await
};
@@ -2369,6 +2683,8 @@ pub mod executor_grpc_server {
}
let accept_compression_encodings =
self.accept_compression_encodings;
let send_compression_encodings =
self.send_compression_encodings;
+ let max_decoding_message_size =
self.max_decoding_message_size;
+ let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
@@ -2378,6 +2694,10 @@ pub mod executor_grpc_server {
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
@@ -2406,12 +2726,14 @@ pub mod executor_grpc_server {
inner,
accept_compression_encodings:
self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
+ max_decoding_message_size: self.max_decoding_message_size,
+ max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: ExecutorGrpc> Clone for _Inner<T> {
fn clone(&self) -> Self {
- Self(self.0.clone())
+ Self(Arc::clone(&self.0))
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
diff --git a/ballista/core/src/serde/scheduler/mod.rs
b/ballista/core/src/serde/scheduler/mod.rs
index d8ff0906..83a6d173 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -135,7 +135,7 @@ impl PartitionStats {
pub fn arrow_struct_repr(self) -> Field {
Field::new(
"partition_stats",
- DataType::Struct(self.arrow_struct_fields()),
+ DataType::Struct(self.arrow_struct_fields().into()),
false,
)
}
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 1167e40a..65568d82 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
[package.metadata.configure_me.bin]
executor = "executor_config_spec.toml"
@@ -43,7 +43,7 @@ arrow-flight = { workspace = true }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
chrono = { version = "0.4", default-features = false }
-configure_me = "0.4.0"
+configure_me = { workspace = true }
dashmap = "5.4.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
@@ -62,7 +62,7 @@ tokio = { version = "1.0", features = [
"signal",
] }
tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.8"
+tonic = { workspace = true }
tracing = "0.1.36"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = [
@@ -75,7 +75,7 @@ uuid = { version = "1.0", features = ["v4"] }
[dev-dependencies]
[build-dependencies]
-configure_me_codegen = "=0.4.0"
+configure_me_codegen = { workspace = true }
# use libc on unix like platforms to set worker priority in DedicatedExecutor
[target."cfg(unix)".dependencies.libc]
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ee36dcbc..30709989 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
@@ -49,7 +49,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-configure_me = "0.4.0"
+configure_me = { workspace = true }
dashmap = "5.4.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
@@ -62,7 +62,7 @@ http-body = "0.4"
hyper = "0.14.4"
itertools = "0.10.3"
log = "0.4"
-object_store = "0.5.0"
+object_store = { workspace = true }
once_cell = { version = "1.16.0", optional = true }
parking_lot = "0.12"
parse_arg = "0.1.3"
@@ -74,7 +74,7 @@ serde = { version = "1", features = ["derive"] }
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.8"
+tonic = { workspace = true }
tower = { version = "0.4" }
tracing = "0.1.36"
tracing-appender = "0.2.2"
@@ -90,8 +90,5 @@ warp = "0.3"
ballista-core = { path = "../core", version = "0.11.0" }
[build-dependencies]
-configure_me_codegen = "=0.4.0"
-tonic-build = { version = "0.8", default-features = false, features = [
- "transport",
- "prost",
-] }
+configure_me_codegen = { workspace = true }
+tonic-build = { workspace = true }
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index e1eb561d..f9157950 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -51,7 +51,6 @@ doc = "etcd urls for use when discovery mode is `etcd`.
Default: localhost:2379"
default = "std::string::String::from(\"localhost:2379\")"
[[param]]
-abbr = "h"
name = "bind_host"
type = "String"
default = "std::string::String::from(\"0.0.0.0\")"