This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1672941630960cccf66ed81b11811d84e8a4e3f0 Author: Andrei Budnik <[email protected]> AuthorDate: Tue Sep 18 19:10:14 2018 +0200 Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard. Previously, IOSwitchboard process could terminate before all HTTP responses had been sent to the agent. In the case of `ATTACH_CONTAINER_INPUT` call, we could drop a final HTTP `200 OK` response, so the agent got broken HTTP connection for the call. This patch introduces an acknowledgment for the received response for the `ATTACH_CONTAINER_INPUT` call. This acknowledgment is a new type of control messages for the `ATTACH_CONTAINER_INPUT` call. When IOSwitchboard receives an acknowledgment, and io redirects are finished, it terminates itself. That guarantees that the agent always receives a response for the `ATTACH_CONTAINER_INPUT` call. Review: https://reviews.apache.org/r/65168/ (cherry picked from commit 5b95bb0f21852058d22703385f2c8e139881bf1a) --- src/slave/containerizer/mesos/io/switchboard.cpp | 52 ++++++++++++++++++------ src/slave/http.cpp | 28 ++++++++++++- src/slave/http.hpp | 3 ++ src/tests/containerizer/io_switchboard_tests.cpp | 17 ++++++++ 4 files changed, 85 insertions(+), 15 deletions(-) diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp index 0e4edb3..1982d9b 100644 --- a/src/slave/containerizer/mesos/io/switchboard.cpp +++ b/src/slave/containerizer/mesos/io/switchboard.cpp @@ -1004,6 +1004,9 @@ private: // switchboard. Option<Error> validate(const agent::Call::AttachContainerInput& call); + // Handle acknowledgment for `ATTACH_CONTAINER_INPUT` call. + Future<http::Response> acknowledgeContainerInputResponse(); + // Handle `ATTACH_CONTAINER_INPUT` calls. Future<http::Response> attachContainerInput( const Owned<recordio::Reader<agent::Call>>& reader); @@ -1029,6 +1032,10 @@ private: bool waitForConnection; Option<Duration> heartbeatInterval; bool inputConnected; + // Each time the agent receives a response for `ATTACH_CONTAINER_INPUT` + // request it sends an acknowledgment. This counter is used to delay + // IOSwitchboard termination until all acknowledgments are received. + size_t numPendingAcknowledgments; Future<unix::Socket> accept; Promise<Nothing> promise; Promise<Nothing> startRedirect; @@ -1164,7 +1171,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess( socket(_socket), waitForConnection(_waitForConnection), heartbeatInterval(_heartbeatInterval), - inputConnected(false) {} + inputConnected(false), + numPendingAcknowledgments(0) {} Future<Nothing> IOSwitchboardServerProcess::run() @@ -1221,12 +1229,12 @@ Future<Nothing> IOSwitchboardServerProcess::run() // containers with this behavior and we will exit out of the // switchboard process early. // - // If our IO redirects are finished and there is an input connected, - // then we set `redirectFinished` promise which triggers a callback for + // If our IO redirects are finished and there are pending + // acknowledgments for `ATTACH_CONTAINER_INPUT` requests, then + // we set `redirectFinished` promise which triggers a callback for // `attachContainerInput()`. This callback returns a final `HTTP 200` // response to the client, even if the client has not yet sent the EOF - // message. So we postpone our termination until we send a final - // response to the client. + // message. // // NOTE: We always call `terminate()` with `false` to ensure // that our event queue is drained before actually terminating. @@ -1257,7 +1265,7 @@ Future<Nothing> IOSwitchboardServerProcess::run() collect(stdoutRedirect, stderrRedirect) .then(defer(self(), [this]() { - if (inputConnected) { + if (numPendingAcknowledgments > 0) { redirectFinished.set(http::OK()); } else { terminate(self(), false); @@ -1367,6 +1375,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler( { CHECK_EQ("POST", request.method); + if (request.url.path == "/acknowledge_container_input_response") { + return acknowledgeContainerInputResponse(); + } + Option<string> contentType_ = request.headers.get("Content-Type"); CHECK_SOME(contentType_); @@ -1593,9 +1605,30 @@ Option<Error> IOSwitchboardServerProcess::validate( } +Future<http::Response> +IOSwitchboardServerProcess::acknowledgeContainerInputResponse() +{ + // Check if this is an acknowledgment sent by the agent. This acknowledgment + // means that response for `ATTACH_CONTAINER_INPUT` call has been received by + // the agent. + CHECK_GT(numPendingAcknowledgments, 0u); + if (--numPendingAcknowledgments == 0) { + // If IO redirects are finished or writing to `stdin` failed we want to + // terminate ourselves (after flushing any outstanding messages from our + // message queue). + if (!redirectFinished.future().isPending() || failure.isSome()) { + terminate(self(), false); + } + } + return http::OK(); +} + + Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( const Owned<recordio::Reader<agent::Call>>& reader) { + ++numPendingAcknowledgments; + // Only allow a single input connection at a time. if (inputConnected) { return http::Conflict("Multiple input connections are not allowed"); @@ -1728,13 +1761,6 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( // Reset `inputConnected` to allow future input connections. inputConnected = false; - // If IO redirects are finished or writing to `stdin` failed we want - // to terminate ourselves (after flushing any outstanding messages - // from our message queue). - if (!redirectFinished.future().isPending() || failure.isSome()) { - terminate(self(), false); - } - return response; })); } diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 000c067..c6f7493 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -3103,8 +3103,7 @@ Future<Response> Http::_attachContainerInput( std::move(decoder), encoder, writer); return slave->containerizer->attach(containerId) - .then([mediaTypes, reader, writer, transform]( - Connection connection) mutable { + .then([=](Connection connection) mutable { Request request; request.method = "POST"; request.type = Request::PIPE; @@ -3139,6 +3138,31 @@ Future<Response> Http::_attachContainerInput( connection.disconnected() .onAny([connection]() {}); + return connection.send(request) + .onAny(defer( + slave->self(), + [=](const Future<Response>&) { + // After we have received a response for `ATTACH_CONTAINER_INPUT` + // call, we need to send an acknowledgment to the IOSwitchboard, + // so that the IOSwitchboard process can terminate itself. This is + // a workaround for the problem with dropping outstanding HTTP + // responses due to a lack of graceful shutdown in libprocess. + acknowledgeContainerInputResponse(containerId); + })); + }); +} + + +Future<Response> Http::acknowledgeContainerInputResponse( + const ContainerID& containerId) const { + return slave->containerizer->attach(containerId) + .then([](Connection connection) { + Request request; + request.method = "POST"; + request.type = Request::BODY; + request.url.domain = ""; + request.url.path = "/acknowledge_container_input_response"; + return connection.send(request); }); } diff --git a/src/slave/http.hpp b/src/slave/http.hpp index 7820087..5b113fa 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -326,6 +326,9 @@ private: process::Owned<recordio::Reader<agent::Call>>&& decoder, const RequestMediaTypes& mediaTypes) const; + process::Future<process::http::Response> acknowledgeContainerInputResponse( + const ContainerID& containerId) const; + process::Future<process::http::Response> attachContainerOutput( const mesos::agent::Call& call, const RequestMediaTypes& mediaTypes, diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp index c00f6a9..e443145 100644 --- a/src/tests/containerizer/io_switchboard_tests.cpp +++ b/src/tests/containerizer/io_switchboard_tests.cpp @@ -121,6 +121,19 @@ protected: return connection.send(request, true); } + // Helper that sends an acknowledgment for the `ATTACH_CONTAINER_INPUT` + // request. + Future<http::Response> acknowledgeContainerInputResponse( + http::Connection connection) const { + http::Request request; + request.method = "POST"; + request.type = http::Request::BODY; + request.url.domain = ""; + request.url.path = "/acknowledge_container_input_response"; + + return connection.send(request); + } + // Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached // and returns the merged stdout and stderr. // NOTE: It ignores any `ProcessIO::Control` records. @@ -578,6 +591,8 @@ TEST_F(IOSwitchboardServerTest, AttachInput) AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + acknowledgeContainerInputResponse(connection); + AWAIT_READY(connection.disconnect()); AWAIT_READY(connection.disconnected()); @@ -689,6 +704,8 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat) // result of receiving the heartbeats. AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + acknowledgeContainerInputResponse(connection); + AWAIT_READY(connection.disconnect()); AWAIT_READY(connection.disconnected());
