This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new ae1d3de4 Upgrade DataFusion to 23.0.0 (#755)
ae1d3de4 is described below

commit ae1d3de4f96de752ee0122faa1d4417b1ca4c2cb
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Apr 26 04:41:35 2023 +0800

    Upgrade DataFusion to 23.0.0 (#755)
    
    Co-authored-by: yangzhong <[email protected]>
---
 Cargo.toml                                    |  19 +-
 ballista-cli/Cargo.toml                       |   2 +-
 ballista/client/Cargo.toml                    |   2 +-
 ballista/client/src/context.rs                |   2 +-
 ballista/core/Cargo.toml                      |  13 +-
 ballista/core/src/serde/generated/ballista.rs | 454 ++++++++++++++++++++++----
 ballista/core/src/serde/scheduler/mod.rs      |   2 +-
 ballista/executor/Cargo.toml                  |   8 +-
 ballista/scheduler/Cargo.toml                 |  15 +-
 ballista/scheduler/scheduler_config_spec.toml |   1 -
 10 files changed, 421 insertions(+), 97 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index bbab677c..39233942 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,13 +17,22 @@
 
 [workspace]
 members = ["ballista-cli", "ballista/client", "ballista/core", 
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
-exclude = ["python"]
 
 [workspace.dependencies]
-arrow = { version = "36.0.0" }
-arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] }
-datafusion = "22.0.0"
-datafusion-proto = "22.0.0"
+arrow = { version = "37.0.0" }
+arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] }
+configure_me = { version = "0.4.0" }
+configure_me_codegen = { version = "0.4.4" }
+datafusion = "23.0.0"
+datafusion-cli = "23.0.0"
+datafusion-proto = "23.0.0"
+object_store = "0.5.4"
+sqlparser = "0.33.0"
+tonic = { version = "0.9" }
+tonic-build = { version = "0.9", default-features = false, features = [
+    "transport",
+    "prost",
+] }
 
 # cargo build --profile release-lto
 [profile.release-lto]
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 8f94db42..c805f3d1 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,7 +32,7 @@ readme = "README.md"
 ballista = { path = "../ballista/client", version = "0.11.0", features = 
["standalone"] }
 clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { workspace = true }
-datafusion-cli = "22.0.0"
+datafusion-cli = { workspace = true }
 dirs = "4.0.0"
 env_logger = "0.10"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 88b93378..e371a859 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -36,7 +36,7 @@ datafusion-proto = { workspace = true }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.32.0"
+sqlparser = { workspace = true }
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 84bbaf02..dea233d7 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -623,7 +623,7 @@ mod tests {
                         .map(|t| ListingTableUrl::parse(t).unwrap())
                         .collect();
                     let config = 
ListingTableConfig::new_with_multi_paths(table_paths)
-                        .with_schema(Arc::new(Schema::new(vec![])))
+                        .with_schema(Arc::new(Schema::empty()))
                         .with_listing_options(error_options);
 
                     let error_table = ListingTable::try_new(config).unwrap();
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 95282a36..feeb8311 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista";
 repository = "https://github.com/apache/arrow-ballista";
 readme = "README.md"
 authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
 build = "build.rs"
 
 # Exclude proto files so crates.io consumers don't need protoc
@@ -58,7 +58,7 @@ hashbrown = "0.13"
 itertools = "0.10"
 libloading = "0.7.3"
 log = "0.4"
-object_store = "0.5.2"
+object_store = { workspace = true }
 once_cell = "1.9.0"
 
 parking_lot = "0.12"
@@ -67,11 +67,11 @@ prost = "0.11"
 prost-types = "0.11"
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
-sqlparser = "0.32.0"
+sqlparser = { workspace = true }
 sys-info = "0.9.0"
 tokio = "1.0"
 tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.8"
+tonic = { workspace = true }
 url = "2.2"
 uuid = { version = "1.0", features = ["v4"] }
 walkdir = "2.3.2"
@@ -81,7 +81,4 @@ tempfile = "3"
 
 [build-dependencies]
 rustc_version = "0.4.0"
-tonic-build = { version = "0.8", default-features = false, features = [
-    "transport",
-    "prost",
-] }
+tonic-build = { workspace = true }
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index f511f2a2..439bbf84 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1135,7 +1135,7 @@ pub mod scheduler_grpc_client {
         /// Attempt to create a new client by connecting to a given endpoint.
         pub async fn connect<D>(dst: D) -> Result<Self, 
tonic::transport::Error>
         where
-            D: std::convert::TryInto<tonic::transport::Endpoint>,
+            D: TryInto<tonic::transport::Endpoint>,
             D::Error: Into<StdError>,
         {
             let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
@@ -1191,11 +1191,27 @@ pub mod scheduler_grpc_client {
             self.inner = self.inner.accept_compressed(encoding);
             self
         }
+        /// Limits the maximum size of a decoded message.
+        ///
+        /// Default: `4MB`
+        #[must_use]
+        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+            self.inner = self.inner.max_decoding_message_size(limit);
+            self
+        }
+        /// Limits the maximum size of an encoded message.
+        ///
+        /// Default: `usize::MAX`
+        #[must_use]
+        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+            self.inner = self.inner.max_encoding_message_size(limit);
+            self
+        }
         /// Executors must poll the scheduler for heartbeat and to receive 
tasks
         pub async fn poll_work(
             &mut self,
             request: impl tonic::IntoRequest<super::PollWorkParams>,
-        ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status> {
+        ) -> std::result::Result<tonic::Response<super::PollWorkResult>, 
tonic::Status> {
             self.inner
                 .ready()
                 .await
@@ -1209,12 +1225,18 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/PollWork",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"PollWork"));
+            self.inner.unary(req, path, codec).await
         }
         pub async fn register_executor(
             &mut self,
             request: impl tonic::IntoRequest<super::RegisterExecutorParams>,
-        ) -> Result<tonic::Response<super::RegisterExecutorResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::RegisterExecutorResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1228,14 +1250,25 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/RegisterExecutor",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new(
+                        "ballista.protobuf.SchedulerGrpc",
+                        "RegisterExecutor",
+                    ),
+                );
+            self.inner.unary(req, path, codec).await
         }
         /// Push-based task scheduler will only leverage this interface
         /// rather than the PollWork interface to report executor states
         pub async fn heart_beat_from_executor(
             &mut self,
             request: impl tonic::IntoRequest<super::HeartBeatParams>,
-        ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::HeartBeatResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1249,12 +1282,23 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/HeartBeatFromExecutor",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new(
+                        "ballista.protobuf.SchedulerGrpc",
+                        "HeartBeatFromExecutor",
+                    ),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn update_task_status(
             &mut self,
             request: impl tonic::IntoRequest<super::UpdateTaskStatusParams>,
-        ) -> Result<tonic::Response<super::UpdateTaskStatusResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::UpdateTaskStatusResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1268,12 +1312,23 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/UpdateTaskStatus",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new(
+                        "ballista.protobuf.SchedulerGrpc",
+                        "UpdateTaskStatus",
+                    ),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn get_file_metadata(
             &mut self,
             request: impl tonic::IntoRequest<super::GetFileMetadataParams>,
-        ) -> Result<tonic::Response<super::GetFileMetadataResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::GetFileMetadataResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1287,12 +1342,20 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/GetFileMetadata",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"GetFileMetadata"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn execute_query(
             &mut self,
             request: impl tonic::IntoRequest<super::ExecuteQueryParams>,
-        ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status> 
{
+        ) -> std::result::Result<
+            tonic::Response<super::ExecuteQueryResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1306,12 +1369,20 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/ExecuteQuery",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"ExecuteQuery"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn get_job_status(
             &mut self,
             request: impl tonic::IntoRequest<super::GetJobStatusParams>,
-        ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status> 
{
+        ) -> std::result::Result<
+            tonic::Response<super::GetJobStatusResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1325,13 +1396,21 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/GetJobStatus",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"GetJobStatus"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         /// Used by Executor to tell Scheduler it is stopped.
         pub async fn executor_stopped(
             &mut self,
             request: impl tonic::IntoRequest<super::ExecutorStoppedParams>,
-        ) -> Result<tonic::Response<super::ExecutorStoppedResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::ExecutorStoppedResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1345,12 +1424,20 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/ExecutorStopped",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"ExecutorStopped"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn cancel_job(
             &mut self,
             request: impl tonic::IntoRequest<super::CancelJobParams>,
-        ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::CancelJobResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1364,12 +1451,18 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/CancelJob",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"CancelJob"));
+            self.inner.unary(req, path, codec).await
         }
         pub async fn clean_job_data(
             &mut self,
             request: impl tonic::IntoRequest<super::CleanJobDataParams>,
-        ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status> 
{
+        ) -> std::result::Result<
+            tonic::Response<super::CleanJobDataResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1383,7 +1476,12 @@ pub mod scheduler_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.SchedulerGrpc/CleanJobData",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.SchedulerGrpc", 
"CleanJobData"),
+                );
+            self.inner.unary(req, path, codec).await
         }
     }
 }
@@ -1400,7 +1498,7 @@ pub mod executor_grpc_client {
         /// Attempt to create a new client by connecting to a given endpoint.
         pub async fn connect<D>(dst: D) -> Result<Self, 
tonic::transport::Error>
         where
-            D: std::convert::TryInto<tonic::transport::Endpoint>,
+            D: TryInto<tonic::transport::Endpoint>,
             D::Error: Into<StdError>,
         {
             let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
@@ -1456,10 +1554,29 @@ pub mod executor_grpc_client {
             self.inner = self.inner.accept_compressed(encoding);
             self
         }
+        /// Limits the maximum size of a decoded message.
+        ///
+        /// Default: `4MB`
+        #[must_use]
+        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+            self.inner = self.inner.max_decoding_message_size(limit);
+            self
+        }
+        /// Limits the maximum size of an encoded message.
+        ///
+        /// Default: `usize::MAX`
+        #[must_use]
+        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+            self.inner = self.inner.max_encoding_message_size(limit);
+            self
+        }
         pub async fn launch_task(
             &mut self,
             request: impl tonic::IntoRequest<super::LaunchTaskParams>,
-        ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::LaunchTaskResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1473,12 +1590,18 @@ pub mod executor_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.ExecutorGrpc/LaunchTask",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(GrpcMethod::new("ballista.protobuf.ExecutorGrpc", 
"LaunchTask"));
+            self.inner.unary(req, path, codec).await
         }
         pub async fn launch_multi_task(
             &mut self,
             request: impl tonic::IntoRequest<super::LaunchMultiTaskParams>,
-        ) -> Result<tonic::Response<super::LaunchMultiTaskResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::LaunchMultiTaskResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1492,12 +1615,20 @@ pub mod executor_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.ExecutorGrpc/LaunchMultiTask",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.ExecutorGrpc", 
"LaunchMultiTask"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn stop_executor(
             &mut self,
             request: impl tonic::IntoRequest<super::StopExecutorParams>,
-        ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status> 
{
+        ) -> std::result::Result<
+            tonic::Response<super::StopExecutorResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1511,12 +1642,20 @@ pub mod executor_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.ExecutorGrpc/StopExecutor",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.ExecutorGrpc", 
"StopExecutor"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn cancel_tasks(
             &mut self,
             request: impl tonic::IntoRequest<super::CancelTasksParams>,
-        ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::CancelTasksResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1530,12 +1669,20 @@ pub mod executor_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.ExecutorGrpc/CancelTasks",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.ExecutorGrpc", 
"CancelTasks"),
+                );
+            self.inner.unary(req, path, codec).await
         }
         pub async fn remove_job_data(
             &mut self,
             request: impl tonic::IntoRequest<super::RemoveJobDataParams>,
-        ) -> Result<tonic::Response<super::RemoveJobDataResult>, 
tonic::Status> {
+        ) -> std::result::Result<
+            tonic::Response<super::RemoveJobDataResult>,
+            tonic::Status,
+        > {
             self.inner
                 .ready()
                 .await
@@ -1549,7 +1696,12 @@ pub mod executor_grpc_client {
             let path = http::uri::PathAndQuery::from_static(
                 "/ballista.protobuf.ExecutorGrpc/RemoveJobData",
             );
-            self.inner.unary(request.into_request(), path, codec).await
+            let mut req = request.into_request();
+            req.extensions_mut()
+                .insert(
+                    GrpcMethod::new("ballista.protobuf.ExecutorGrpc", 
"RemoveJobData"),
+                );
+            self.inner.unary(req, path, codec).await
         }
     }
 }
@@ -1564,52 +1716,75 @@ pub mod scheduler_grpc_server {
         async fn poll_work(
             &self,
             request: tonic::Request<super::PollWorkParams>,
-        ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status>;
+        ) -> std::result::Result<tonic::Response<super::PollWorkResult>, 
tonic::Status>;
         async fn register_executor(
             &self,
             request: tonic::Request<super::RegisterExecutorParams>,
-        ) -> Result<tonic::Response<super::RegisterExecutorResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::RegisterExecutorResult>,
+            tonic::Status,
+        >;
         /// Push-based task scheduler will only leverage this interface
         /// rather than the PollWork interface to report executor states
         async fn heart_beat_from_executor(
             &self,
             request: tonic::Request<super::HeartBeatParams>,
-        ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status>;
+        ) -> std::result::Result<tonic::Response<super::HeartBeatResult>, 
tonic::Status>;
         async fn update_task_status(
             &self,
             request: tonic::Request<super::UpdateTaskStatusParams>,
-        ) -> Result<tonic::Response<super::UpdateTaskStatusResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::UpdateTaskStatusResult>,
+            tonic::Status,
+        >;
         async fn get_file_metadata(
             &self,
             request: tonic::Request<super::GetFileMetadataParams>,
-        ) -> Result<tonic::Response<super::GetFileMetadataResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::GetFileMetadataResult>,
+            tonic::Status,
+        >;
         async fn execute_query(
             &self,
             request: tonic::Request<super::ExecuteQueryParams>,
-        ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::ExecuteQueryResult>,
+            tonic::Status,
+        >;
         async fn get_job_status(
             &self,
             request: tonic::Request<super::GetJobStatusParams>,
-        ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::GetJobStatusResult>,
+            tonic::Status,
+        >;
         /// Used by Executor to tell Scheduler it is stopped.
         async fn executor_stopped(
             &self,
             request: tonic::Request<super::ExecutorStoppedParams>,
-        ) -> Result<tonic::Response<super::ExecutorStoppedResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::ExecutorStoppedResult>,
+            tonic::Status,
+        >;
         async fn cancel_job(
             &self,
             request: tonic::Request<super::CancelJobParams>,
-        ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status>;
+        ) -> std::result::Result<tonic::Response<super::CancelJobResult>, 
tonic::Status>;
         async fn clean_job_data(
             &self,
             request: tonic::Request<super::CleanJobDataParams>,
-        ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::CleanJobDataResult>,
+            tonic::Status,
+        >;
     }
     #[derive(Debug)]
     pub struct SchedulerGrpcServer<T: SchedulerGrpc> {
         inner: _Inner<T>,
         accept_compression_encodings: EnabledCompressionEncodings,
         send_compression_encodings: EnabledCompressionEncodings,
+        max_decoding_message_size: Option<usize>,
+        max_encoding_message_size: Option<usize>,
     }
     struct _Inner<T>(Arc<T>);
     impl<T: SchedulerGrpc> SchedulerGrpcServer<T> {
@@ -1622,6 +1797,8 @@ pub mod scheduler_grpc_server {
                 inner,
                 accept_compression_encodings: Default::default(),
                 send_compression_encodings: Default::default(),
+                max_decoding_message_size: None,
+                max_encoding_message_size: None,
             }
         }
         pub fn with_interceptor<F>(
@@ -1645,6 +1822,22 @@ pub mod scheduler_grpc_server {
             self.send_compression_encodings.enable(encoding);
             self
         }
+        /// Limits the maximum size of a decoded message.
+        ///
+        /// Default: `4MB`
+        #[must_use]
+        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+            self.max_decoding_message_size = Some(limit);
+            self
+        }
+        /// Limits the maximum size of an encoded message.
+        ///
+        /// Default: `usize::MAX`
+        #[must_use]
+        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+            self.max_encoding_message_size = Some(limit);
+            self
+        }
     }
     impl<T, B> tonic::codegen::Service<http::Request<B>> for 
SchedulerGrpcServer<T>
     where
@@ -1658,7 +1851,7 @@ pub mod scheduler_grpc_server {
         fn poll_ready(
             &mut self,
             _cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
+        ) -> Poll<std::result::Result<(), Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -1680,13 +1873,15 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::PollWorkParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move { 
(*inner).poll_work(request).await };
                             Box::pin(fut)
                         }
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1696,6 +1891,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1718,7 +1917,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::RegisterExecutorParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).register_executor(request).await
                             };
@@ -1727,6 +1926,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1736,6 +1937,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1758,7 +1963,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::HeartBeatParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 
(*inner).heart_beat_from_executor(request).await
                             };
@@ -1767,6 +1972,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1776,6 +1983,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1798,7 +2009,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::UpdateTaskStatusParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).update_task_status(request).await
                             };
@@ -1807,6 +2018,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1816,6 +2029,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1838,7 +2055,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::GetFileMetadataParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).get_file_metadata(request).await
                             };
@@ -1847,6 +2064,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1856,6 +2075,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1878,7 +2101,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::ExecuteQueryParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).execute_query(request).await
                             };
@@ -1887,6 +2110,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1896,6 +2121,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1918,7 +2147,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::GetJobStatusParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).get_job_status(request).await
                             };
@@ -1927,6 +2156,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1936,6 +2167,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1958,7 +2193,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::ExecutorStoppedParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).executor_stopped(request).await
                             };
@@ -1967,6 +2202,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -1976,6 +2213,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -1998,13 +2239,15 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::CancelJobParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move { 
(*inner).cancel_job(request).await };
                             Box::pin(fut)
                         }
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2014,6 +2257,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2036,7 +2283,7 @@ pub mod scheduler_grpc_server {
                             &mut self,
                             request: tonic::Request<super::CleanJobDataParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).clean_job_data(request).await
                             };
@@ -2045,6 +2292,8 @@ pub mod scheduler_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2054,6 +2303,10 @@ pub mod scheduler_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2082,12 +2335,14 @@ pub mod scheduler_grpc_server {
                 inner,
                 accept_compression_encodings: 
self.accept_compression_encodings,
                 send_compression_encodings: self.send_compression_encodings,
+                max_decoding_message_size: self.max_decoding_message_size,
+                max_encoding_message_size: self.max_encoding_message_size,
             }
         }
     }
     impl<T: SchedulerGrpc> Clone for _Inner<T> {
         fn clone(&self) -> Self {
-            Self(self.0.clone())
+            Self(Arc::clone(&self.0))
         }
     }
     impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
@@ -2109,29 +2364,46 @@ pub mod executor_grpc_server {
         async fn launch_task(
             &self,
             request: tonic::Request<super::LaunchTaskParams>,
-        ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::LaunchTaskResult>,
+            tonic::Status,
+        >;
         async fn launch_multi_task(
             &self,
             request: tonic::Request<super::LaunchMultiTaskParams>,
-        ) -> Result<tonic::Response<super::LaunchMultiTaskResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::LaunchMultiTaskResult>,
+            tonic::Status,
+        >;
         async fn stop_executor(
             &self,
             request: tonic::Request<super::StopExecutorParams>,
-        ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::StopExecutorResult>,
+            tonic::Status,
+        >;
         async fn cancel_tasks(
             &self,
             request: tonic::Request<super::CancelTasksParams>,
-        ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::CancelTasksResult>,
+            tonic::Status,
+        >;
         async fn remove_job_data(
             &self,
             request: tonic::Request<super::RemoveJobDataParams>,
-        ) -> Result<tonic::Response<super::RemoveJobDataResult>, 
tonic::Status>;
+        ) -> std::result::Result<
+            tonic::Response<super::RemoveJobDataResult>,
+            tonic::Status,
+        >;
     }
     #[derive(Debug)]
     pub struct ExecutorGrpcServer<T: ExecutorGrpc> {
         inner: _Inner<T>,
         accept_compression_encodings: EnabledCompressionEncodings,
         send_compression_encodings: EnabledCompressionEncodings,
+        max_decoding_message_size: Option<usize>,
+        max_encoding_message_size: Option<usize>,
     }
     struct _Inner<T>(Arc<T>);
     impl<T: ExecutorGrpc> ExecutorGrpcServer<T> {
@@ -2144,6 +2416,8 @@ pub mod executor_grpc_server {
                 inner,
                 accept_compression_encodings: Default::default(),
                 send_compression_encodings: Default::default(),
+                max_decoding_message_size: None,
+                max_encoding_message_size: None,
             }
         }
         pub fn with_interceptor<F>(
@@ -2167,6 +2441,22 @@ pub mod executor_grpc_server {
             self.send_compression_encodings.enable(encoding);
             self
         }
+        /// Limits the maximum size of a decoded message.
+        ///
+        /// Default: `4MB`
+        #[must_use]
+        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
+            self.max_decoding_message_size = Some(limit);
+            self
+        }
+        /// Limits the maximum size of an encoded message.
+        ///
+        /// Default: `usize::MAX`
+        #[must_use]
+        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
+            self.max_encoding_message_size = Some(limit);
+            self
+        }
     }
     impl<T, B> tonic::codegen::Service<http::Request<B>> for 
ExecutorGrpcServer<T>
     where
@@ -2180,7 +2470,7 @@ pub mod executor_grpc_server {
         fn poll_ready(
             &mut self,
             _cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
+        ) -> Poll<std::result::Result<(), Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -2202,13 +2492,15 @@ pub mod executor_grpc_server {
                             &mut self,
                             request: tonic::Request<super::LaunchTaskParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move { 
(*inner).launch_task(request).await };
                             Box::pin(fut)
                         }
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2218,6 +2510,10 @@ pub mod executor_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2240,7 +2536,7 @@ pub mod executor_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::LaunchMultiTaskParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).launch_multi_task(request).await
                             };
@@ -2249,6 +2545,8 @@ pub mod executor_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2258,6 +2556,10 @@ pub mod executor_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2280,7 +2582,7 @@ pub mod executor_grpc_server {
                             &mut self,
                             request: tonic::Request<super::StopExecutorParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).stop_executor(request).await
                             };
@@ -2289,6 +2591,8 @@ pub mod executor_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2298,6 +2602,10 @@ pub mod executor_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2320,7 +2628,7 @@ pub mod executor_grpc_server {
                             &mut self,
                             request: tonic::Request<super::CancelTasksParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).cancel_tasks(request).await
                             };
@@ -2329,6 +2637,8 @@ pub mod executor_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2338,6 +2648,10 @@ pub mod executor_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2360,7 +2674,7 @@ pub mod executor_grpc_server {
                             &mut self,
                             request: 
tonic::Request<super::RemoveJobDataParams>,
                         ) -> Self::Future {
-                            let inner = self.0.clone();
+                            let inner = Arc::clone(&self.0);
                             let fut = async move {
                                 (*inner).remove_job_data(request).await
                             };
@@ -2369,6 +2683,8 @@ pub mod executor_grpc_server {
                     }
                     let accept_compression_encodings = 
self.accept_compression_encodings;
                     let send_compression_encodings = 
self.send_compression_encodings;
+                    let max_decoding_message_size = 
self.max_decoding_message_size;
+                    let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
                         let inner = inner.0;
@@ -2378,6 +2694,10 @@ pub mod executor_grpc_server {
                             .apply_compression_config(
                                 accept_compression_encodings,
                                 send_compression_encodings,
+                            )
+                            .apply_max_message_size_config(
+                                max_decoding_message_size,
+                                max_encoding_message_size,
                             );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
@@ -2406,12 +2726,14 @@ pub mod executor_grpc_server {
                 inner,
                 accept_compression_encodings: 
self.accept_compression_encodings,
                 send_compression_encodings: self.send_compression_encodings,
+                max_decoding_message_size: self.max_decoding_message_size,
+                max_encoding_message_size: self.max_encoding_message_size,
             }
         }
     }
     impl<T: ExecutorGrpc> Clone for _Inner<T> {
         fn clone(&self) -> Self {
-            Self(self.0.clone())
+            Self(Arc::clone(&self.0))
         }
     }
     impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
diff --git a/ballista/core/src/serde/scheduler/mod.rs 
b/ballista/core/src/serde/scheduler/mod.rs
index d8ff0906..83a6d173 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -135,7 +135,7 @@ impl PartitionStats {
     pub fn arrow_struct_repr(self) -> Field {
         Field::new(
             "partition_stats",
-            DataType::Struct(self.arrow_struct_fields()),
+            DataType::Struct(self.arrow_struct_fields().into()),
             false,
         )
     }
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 1167e40a..65568d82 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista";
 repository = "https://github.com/apache/arrow-ballista";
 readme = "README.md"
 authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
 
 [package.metadata.configure_me.bin]
 executor = "executor_config_spec.toml"
@@ -43,7 +43,7 @@ arrow-flight = { workspace = true }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
 chrono = { version = "0.4", default-features = false }
-configure_me = "0.4.0"
+configure_me = { workspace = true }
 dashmap = "5.4.0"
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
@@ -62,7 +62,7 @@ tokio = { version = "1.0", features = [
     "signal",
 ] }
 tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.8"
+tonic = { workspace = true }
 tracing = "0.1.36"
 tracing-appender = "0.2.2"
 tracing-subscriber = { version = "0.3.15", features = [
@@ -75,7 +75,7 @@ uuid = { version = "1.0", features = ["v4"] }
 [dev-dependencies]
 
 [build-dependencies]
-configure_me_codegen = "=0.4.0"
+configure_me_codegen = { workspace = true }
 
 # use libc on unix like platforms to set worker priority in DedicatedExecutor
 [target."cfg(unix)".dependencies.libc]
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ee36dcbc..30709989 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista";
 repository = "https://github.com/apache/arrow-ballista";
 readme = "README.md"
 authors = ["Apache Arrow <[email protected]>"]
-edition = "2018"
+edition = "2021"
 
 [package.metadata.configure_me.bin]
 scheduler = "scheduler_config_spec.toml"
@@ -49,7 +49,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
 base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-configure_me = "0.4.0"
+configure_me = { workspace = true }
 dashmap = "5.4.0"
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
@@ -62,7 +62,7 @@ http-body = "0.4"
 hyper = "0.14.4"
 itertools = "0.10.3"
 log = "0.4"
-object_store = "0.5.0"
+object_store = { workspace = true }
 once_cell = { version = "1.16.0", optional = true }
 parking_lot = "0.12"
 parse_arg = "0.1.3"
@@ -74,7 +74,7 @@ serde = { version = "1", features = ["derive"] }
 sled_package = { package = "sled", version = "0.34", optional = true }
 tokio = { version = "1.0", features = ["full"] }
 tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.8"
+tonic = { workspace = true }
 tower = { version = "0.4" }
 tracing = "0.1.36"
 tracing-appender = "0.2.2"
@@ -90,8 +90,5 @@ warp = "0.3"
 ballista-core = { path = "../core", version = "0.11.0" }
 
 [build-dependencies]
-configure_me_codegen = "=0.4.0"
-tonic-build = { version = "0.8", default-features = false, features = [
-    "transport",
-    "prost",
-] }
+configure_me_codegen = { workspace = true }
+tonic-build = { workspace = true }
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index e1eb561d..f9157950 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -51,7 +51,6 @@ doc = "etcd urls for use when discovery mode is `etcd`. 
Default: localhost:2379"
 default = "std::string::String::from(\"localhost:2379\")"
 
 [[param]]
-abbr = "h"
 name = "bind_host"
 type = "String"
 default = "std::string::String::from(\"0.0.0.0\")"

Reply via email to