Made `kill` not use pipelining in the default executor. The previous code used to pipeline all the `KILL_NESTED_CONTAINER` calls on the same connection. This change removes this and invokes `post` for each child container. This simplifies the code and makes it more readable.
Review: https://reviews.apache.org/r/56267/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c26d577 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c26d577 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c26d577 Branch: refs/heads/master Commit: 2c26d577203ecc0afc706d3484de80099512e1cb Parents: 08f4cdd Author: Anand Mazumdar <an...@apache.org> Authored: Tue Feb 7 09:42:08 2017 -0800 Committer: Anand Mazumdar <an...@apache.org> Committed: Tue Feb 7 09:42:52 2017 -0800 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 48 +++++++++++----------------------- 1 file changed, 15 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2c26d577/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 97eee05..78a4b6e 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -717,42 +717,17 @@ protected: CHECK_EQ(SUBSCRIBED, state); - process::http::connect(agent) - .onAny(defer(self(), &Self::_shutdown, lambda::_1)); - } - - void _shutdown(const Future<Connection>& connection) - { - if (!connection.isReady()) { - LOG(ERROR) - << "Unable to establish connection with the agent: " - << (connection.isFailed() ? connection.failure() : "discarded"); - __shutdown(); - return; - } - - // It is possible that the agent process failed before we could - // kill the child containers. - if (state == DISCONNECTED || state == CONNECTED) { - LOG(ERROR) << "Unable to kill child containers as the " - << "executor is in state " << state; - __shutdown(); - return; - } - list<Future<Nothing>> killing; foreachkey (const ContainerID& containerId, containers) { - killing.push_back(kill(connection.get(), containerId)); + killing.push_back(kill(containerId)); } // It is possible that the agent process can fail while we are - // killing child containers. We fail fast if this happens. We - // capture `connection` to ensure that the connection is not - // disconnected before the responses are complete. + // killing child containers. We fail fast if this happens. collect(killing) .onAny(defer( self(), - [this, connection](const Future<list<Nothing>>& future) { + [this](const Future<list<Nothing>>& future) { if (future.isReady()) { return; } @@ -779,7 +754,7 @@ protected: terminate(self()); } - Future<Nothing> kill(Connection connection, const ContainerID& containerId) + Future<Nothing> kill(const ContainerID& containerId) { CHECK_EQ(SUBSCRIBED, state); CHECK(containers.contains(containerId)); @@ -794,7 +769,7 @@ protected: kill->mutable_container_id()->CopyFrom(containerId); - return post(connection, call) + return post(None(), call) .then([](const Response& /* response */) { return Nothing(); }); @@ -906,17 +881,24 @@ private: mesos->send(evolve(call)); } - Future<Response> post(Connection connection, const agent::Call& call) + Future<Response> post( + Option<Connection> connection, + const agent::Call& call) { ::Request request; request.method = "POST"; request.url = agent; request.body = serialize(contentType, evolve(call)); - request.keepAlive = true; request.headers = {{"Accept", stringify(contentType)}, {"Content-Type", stringify(contentType)}}; - return connection.send(request); + // Only pipeline requests when there is an active connection. + if (connection.isSome()) { + request.keepAlive = true; + } + + return connection.isSome() ? connection->send(request) + : process::http::request(request); } void retry(