This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new c58c881d Check executor id consistency when receive stop executor 
request (#335)
c58c881d is described below

commit c58c881de14eef6fb8475a2135768e96a73ac2a0
Author: yahoNanJing <[email protected]>
AuthorDate: Mon Oct 10 21:13:33 2022 +0800

    Check executor id consistency when receive stop executor request (#335)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/proto/ballista.proto             | 5 +++--
 ballista/rust/executor/src/executor_server.rs       | 7 +++++++
 ballista/rust/scheduler/src/scheduler_server/mod.rs | 1 +
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 53f93877..9cb8cca4 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -845,10 +845,11 @@ message HeartBeatResult {
 }
 
 message StopExecutorParams {
+  string executor_id = 1;
   // stop reason
-  string reason = 1;
+  string reason = 2;
   // force to stop the executor immediately
-  bool force = 2;
+  bool force = 3;
 }
 
 message StopExecutorResult {
diff --git a/ballista/rust/executor/src/executor_server.rs 
b/ballista/rust/executor/src/executor_server.rs
index bf036d6f..e850c895 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -665,6 +665,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
         request: Request<StopExecutorParams>,
     ) -> Result<Response<StopExecutorResult>, Status> {
         let stop_request = request.into_inner();
+        if stop_request.executor_id != self.executor.metadata.id {
+            warn!(
+                "The executor id {} in request is different from {}. The stop 
request will be ignored",
+                stop_request.executor_id, self.executor.metadata.id
+            );
+            return Ok(Response::new(StopExecutorResult {}));
+        }
         let stop_reason = stop_request.reason;
         let force = stop_request.force;
         info!(
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs 
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 6cfbf070..6d517bf9 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -229,6 +229,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
                             tokio::task::spawn(async move {
                                 match client
                                     .stop_executor(StopExecutorParams {
+                                        executor_id,
                                         reason: stop_reason,
                                         force: true,
                                     })

Reply via email to