Ensured executor adapter propagates error and shutdown messages. Prior to this patch, if an error, kill, or shutdown occurred during subscription / registration with the agent, it was not propagated back to the executor if the v0_v1 executor adapter was used. This happened because the adapter did not call the `connected` callback until after successful registration and hence the executor did not even try to send the `SUBSCRIBE` call, without which the adapter did not send any events to the executor.
A fix is to call the `connected` callback if an error occurred or shutdown / kill event arrived before the executor had subscribed. Review: https://reviews.apache.org/r/64070/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09aaf339 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09aaf339 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09aaf339 Branch: refs/heads/1.4.x Commit: 09aaf3390e0eb2fa7e96f92605943057774ac624 Parents: 1bc8f5d Author: Alexander Rukletsov <ruklet...@gmail.com> Authored: Fri Dec 22 12:10:28 2017 +0100 Committer: Alexander Rukletsov <al...@apache.org> Committed: Fri Dec 22 12:29:44 2017 +0100 ---------------------------------------------------------------------- src/executor/v0_v1executor.cpp | 41 ++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/09aaf339/src/executor/v0_v1executor.cpp ---------------------------------------------------------------------- diff --git a/src/executor/v0_v1executor.cpp b/src/executor/v0_v1executor.cpp index 61d5919..086cfc7 100644 --- a/src/executor/v0_v1executor.cpp +++ b/src/executor/v0_v1executor.cpp @@ -52,6 +52,7 @@ public: const function<void(const queue<Event>&)>& received) : ProcessBase(process::ID::generate("v0-to-v1-adapter")), callbacks {connected, disconnected, received}, + connected(false), subscribeCall(false) {} virtual ~V0ToV1AdapterProcess() = default; @@ -61,7 +62,10 @@ public: const mesos::FrameworkInfo& _frameworkInfo, const mesos::SlaveInfo& slaveInfo) { - callbacks.connected(); + if (!connected) { + callbacks.connected(); + connected = true; + } // We need these copies to populate the fields in `Event::Subscribed` upon // receiving a `reregistered()` callback later. @@ -92,6 +96,7 @@ public: // disconnection from the agent. callbacks.disconnected(); callbacks.connected(); + connected = true; Event event; event.set_type(Event::SUBSCRIBED); @@ -111,6 +116,17 @@ public: void killTask(const mesos::TaskID& taskId) { + // Logically an executor cannot receive any response from an agent if it + // is not connected. Since we have received `killTask`, we assume we are + // connected and trigger the `connected` callback to enable event delivery. + // This satisfies the invariant of the v1 interface that an executor can + // receive an event only after successfully connecting with the agent. + if (!connected) { + LOG(INFO) << "Implicitly connecting the executor to kill a task"; + callbacks.connected(); + connected = true; + } + Event event; event.set_type(Event::KILL); @@ -147,6 +163,17 @@ public: void shutdown() { + // Logically an executor cannot receive any response from an agent if it + // is not connected. Since we have received `shutdown`, we assume we are + // connected and trigger the `connected` callback to enable event delivery. + // This satisfies the invariant of the v1 interface that an executor can + // receive an event only after successfully connecting with the agent. + if (!connected) { + LOG(INFO) << "Implicitly connecting the executor to shut it down"; + callbacks.connected(); + connected = true; + } + Event event; event.set_type(Event::SHUTDOWN); @@ -155,6 +182,17 @@ public: void error(const string& message) { + // Logically an executor cannot receive any response from an agent if it + // is not connected. Since we have received `error`, we assume we are + // connected and trigger the `connected` callback to enable event delivery. + // This satisfies the invariant of the v1 interface that an executor can + // receive an event only after successfully connecting with the agent. + if (!connected) { + LOG(INFO) << "Implicitly connecting the executor to send an error"; + callbacks.connected(); + connected = true; + } + Event event; event.set_type(Event::ERROR); @@ -232,6 +270,7 @@ private: }; Callbacks callbacks; + bool connected; bool subscribeCall; queue<Event> pending; Option<mesos::ExecutorInfo> executorInfo;