This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4cbda17f3667e2b0713a1f1663e50819a076680b Author: Greg Mann <[email protected]> AuthorDate: Thu Jul 25 12:17:43 2019 -0700 Enabled the Docker executor to accept kill policy overrides. This adds a new `killTask()` overload to the Docker executor and updates the Mesos executor driver to call into that overload when the loaded executor is the Docker executor. This allows the executor driver to pass the kill policy override, when present, into the Docker executor. Review: https://reviews.apache.org/r/71034/ --- src/docker/executor.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++++---- src/docker/executor.hpp | 5 +++++ src/exec/exec.cpp | 22 ++++++++++++++++++---- 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index de8216f..132f42b 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -396,15 +396,31 @@ public: defer(self(), &Self::launchHealthCheck, containerName, task)); } - void killTask(ExecutorDriver* driver, const TaskID& taskId) + void killTask( + ExecutorDriver* driver, + const TaskID& taskId, + const Option<KillPolicy>& killPolicyOverride = None()) { - LOG(INFO) << "Received killTask for task " << taskId.value(); + string overrideMessage = ""; + if (killPolicyOverride.isSome() && killPolicyOverride->has_grace_period()) { + Duration gracePeriodDuration = + Nanoseconds(killPolicyOverride->grace_period().nanoseconds()); + + overrideMessage = + " with grace period override of " + stringify(gracePeriodDuration); + } + + LOG(INFO) << "Received killTask" << overrideMessage + << " for task " << taskId.value(); // Using shutdown grace period as a default is backwards compatible // with the `stop_timeout` flag, deprecated in 1.0. Duration gracePeriod = shutdownGracePeriod; - if (killPolicy.isSome() && killPolicy->has_grace_period()) { + if (killPolicyOverride.isSome() && killPolicyOverride->has_grace_period()) { + gracePeriod = + Nanoseconds(killPolicyOverride->grace_period().nanoseconds()); + } else if (killPolicy.isSome() && killPolicy->has_grace_period()) { gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds()); } @@ -929,7 +945,12 @@ void DockerExecutor::launchTask(ExecutorDriver* driver, const TaskInfo& task) void DockerExecutor::killTask(ExecutorDriver* driver, const TaskID& taskId) { - dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId); + // Need to disambiguate overloaded function. + void (DockerExecutorProcess::*killTaskMethod)( + ExecutorDriver*, const TaskID&, const Option<KillPolicy>&) + = &DockerExecutorProcess::killTask; + + process::dispatch(process.get(), killTaskMethod, driver, taskId, None()); } @@ -955,6 +976,25 @@ void DockerExecutor::error(ExecutorDriver* driver, const string& data) dispatch(process.get(), &DockerExecutorProcess::error, driver, data); } + +void DockerExecutor::killTask( + ExecutorDriver* driver, + const TaskID& taskId, + const Option<KillPolicy>& killPolicyOverride) +{ + // Need to disambiguate overloaded function. + void (DockerExecutorProcess::*killTaskMethod)( + ExecutorDriver*, const TaskID&, const Option<KillPolicy>&) + = &DockerExecutorProcess::killTask; + + process::dispatch( + process.get(), + killTaskMethod, + driver, + taskId, + killPolicyOverride); +} + } // namespace docker { } // namespace internal { } // namespace mesos { diff --git a/src/docker/executor.hpp b/src/docker/executor.hpp index dfb8ad0..768c2e1 100644 --- a/src/docker/executor.hpp +++ b/src/docker/executor.hpp @@ -151,6 +151,11 @@ public: void error(ExecutorDriver* driver, const std::string& data) override; + void killTask( + ExecutorDriver* driver, + const TaskID& taskId, + const Option<KillPolicy>& killPolicyOverride); + private: process::Owned<DockerExecutorProcess> process; }; diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index c0fa3b6..67e082e 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -47,6 +47,8 @@ #include "common/protobuf_utils.hpp" +#include "docker/executor.hpp" + #include "logging/flags.hpp" #include "logging/logging.hpp" @@ -183,8 +185,7 @@ public: &RunTaskMessage::task); install<KillTaskMessage>( - &ExecutorProcess::killTask, - &KillTaskMessage::task_id); + &ExecutorProcess::killTask); install<StatusUpdateAcknowledgementMessage>( &ExecutorProcess::statusUpdateAcknowledgement, @@ -339,8 +340,10 @@ protected: VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed(); } - void killTask(const TaskID& taskId) + void killTask(KillTaskMessage&& killTaskMessage) { + const TaskID taskId = killTaskMessage.task_id(); + if (aborted.load()) { VLOG(1) << "Ignoring kill task message for task " << taskId << " because the driver is aborted!"; @@ -365,7 +368,18 @@ protected: stopwatch.start(); } - executor->killTask(driver, taskId); + // If this is a Docker executor, call the `killTask()` overload which + // allows the kill policy to be overridden. + auto* dockerExecutor = dynamic_cast<docker::DockerExecutor*>(executor); + if (dockerExecutor) { + Option<KillPolicy> killPolicy = killTaskMessage.has_kill_policy() + ? killTaskMessage.kill_policy() + : Option<KillPolicy>::none(); + + dockerExecutor->killTask(driver, taskId, killPolicy); + } else { + executor->killTask(driver, taskId); + } VLOG(1) << "Executor::killTask took " << stopwatch.elapsed(); }
