[
https://issues.apache.org/jira/browse/MESOS-2863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742228#comment-14742228
]
Vaibhav Khanduja commented on MESOS-2863:
-----------------------------------------
Here is the code diff, which is more of a corrective measure .. and not
preventive (which i would prefer).
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 50b3c6e..8d1fcc1 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -80,6 +80,7 @@ public:
: launched(false),
killed(false),
killedByHealthCheck(false),
+ state(TASK_STARTING),
pid(-1),
healthPid(-1),
escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
@@ -111,13 +112,8 @@ public:
void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
if (launched) {
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_FAILED);
- status.set_message(
- "Attempted to run multiple tasks using a \"command\" executor");
-
- driver->sendStatusUpdate(status);
+ sendStatusUpdate(driver, TASK_FAILED, task.task_id(),
+ string("Attempted to run multiple tasks using a \"command\"
executor"));
return;
}
@@ -281,11 +277,7 @@ public:
pid,
lambda::_1));
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_RUNNING);
- driver->sendStatusUpdate(status);
-
+ sendStatusUpdate(driver, TASK_RUNNING, task.task_id(),string(""));
launched = true;
}
@@ -363,7 +355,7 @@ protected:
status.set_healthy(healthy);
status.set_state(TASK_RUNNING);
driver.get()->sendStatusUpdate(status);
-
+
if (initiateTaskKill) {
killedByHealthCheck = true;
killTask(driver.get(), taskID);
@@ -372,54 +364,74 @@ protected:
private:
+ void sendStatusUpdate(
+ ExecutorDriver* driver,
+ TaskState inState,
+ const TaskID& taskId,
+ const std::string& message)
+ {
+ // If last state has been TASK_FINISHED or TASK_KILLED,
+ // then do not send a different state again. Send out the
+ // last state.
+ // Check is new state is greater than last state, then
+ // no need to send a state. One logic would not to send
+ // a state, but then receiver with time out.
+ if ((inState < state) ||
+ (state == TASK_FINISHED) ||
+ (state == TASK_KILLED)) {
+ inState = state;
+ }
+ TaskStatus taskStatus;
+ taskStatus.mutable_task_id()->MergeFrom(taskId);
+ taskStatus.set_state(inState);
+ taskStatus.set_message(message);
+ if (killed && killedByHealthCheck) {
+ taskStatus.set_healthy(false);
+ }
+ driver->sendStatusUpdate(taskStatus);
+ // Maintain the last state sent.
+ state = inState;
+ }
+
void reaped(
ExecutorDriver* driver,
const TaskID& taskId,
pid_t pid,
const Future<Option<int> >& status_)
{
- TaskState state;
string message;
+ TaskState lstate;
Clock::cancel(escalationTimer);
if (!status_.isReady()) {
- state = TASK_FAILED;
+ lstate = TASK_FAILED;
message =
"Failed to get exit status for Command: " +
(status_.isFailed() ? status_.failure() : "future discarded");
} else if (status_.get().isNone()) {
- state = TASK_FAILED;
+ lstate = TASK_FAILED;
message = "Failed to get exit status for Command";
} else {
int status = status_.get().get();
CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
- state = TASK_FINISHED;
+ lstate = TASK_FINISHED;
} else if (killed) {
// Send TASK_KILLED if the task was killed as a result of
// killTask() or shutdown().
- state = TASK_KILLED;
+ lstate = TASK_KILLED;
} else {
- state = TASK_FAILED;
+ lstate = TASK_FAILED;
}
message = "Command " + WSTRINGIFY(status);
}
cout << message << " (pid: " << pid << ")" << endl;
-
- TaskStatus taskStatus;
- taskStatus.mutable_task_id()->MergeFrom(taskId);
- taskStatus.set_state(state);
- taskStatus.set_message(message);
- if (killed && killedByHealthCheck) {
- taskStatus.set_healthy(false);
- }
-
- driver->sendStatusUpdate(taskStatus);
-
+
+ sendStatusUpdate(driver, lstate,taskId, message);
// A hack for now ... but we need to wait until the status update
// is sent to the slave before we shut ourselves down.
os::sleep(Seconds(1));
@@ -494,6 +506,7 @@ private:
bool launched;
bool killed;
bool killedByHealthCheck;
+ TaskState state;
pid_t pid;
pid_t healthPid;
Duration escalationTimeout;
> Command executor can send TASK_KILLED after TASK_FINISHED
> ---------------------------------------------------------
>
> Key: MESOS-2863
> URL: https://issues.apache.org/jira/browse/MESOS-2863
> Project: Mesos
> Issue Type: Bug
> Reporter: Vinod Kone
> Assignee: Vaibhav Khanduja
> Labels: newbie++
>
> Observed this while doing some tests in our test cluster.
> If the command executor gets a shutdown() (e.g., framework unregistered)
> after sending TASK_FINISHED but before exiting (there is a forced sleep), it
> could send a TASK_KILLED update to the slave.
> Ideally the command executor should not send multiple terminal updates.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)