This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit e9605a6243db41c1bbc85ec9ade112f2ef806c15 Author: Andrei Budnik <[email protected]> AuthorDate: Tue Sep 18 19:09:31 2018 +0200 Fixed IOSwitchboard waiting EOF from attach container input request. Previously, when a corresponding nested container terminated, while the user was attached to the container's stdin via `ATTACH_CONTAINER_INPUT` IOSwitchboard didn't terminate immediately. IOSwitchboard was waiting for EOF message from the input HTTP connection. Since the IOSwitchboard was stuck, the corresponding nested container was also stuck in `DESTROYING` state. This patch fixes the aforementioned issue by sending 200 `OK` response for `ATTACH_CONTAINER_INPUT` call in the case when io redirect is finished while reading from the HTTP input connection is not. Review: https://reviews.apache.org/r/68232/ (cherry picked from commit 2fdc8f3cffc5eac91e5f2b0c6aef2254acfc2bd0) --- src/slave/containerizer/mesos/io/switchboard.cpp | 74 +++++++++++++++--------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp index 52b0e52..0e4edb3 100644 --- a/src/slave/containerizer/mesos/io/switchboard.cpp +++ b/src/slave/containerizer/mesos/io/switchboard.cpp @@ -1029,10 +1029,11 @@ private: bool waitForConnection; Option<Duration> heartbeatInterval; bool inputConnected; - bool redirectFinished; // Set when both stdout and stderr redirects finish. Future<unix::Socket> accept; Promise<Nothing> promise; Promise<Nothing> startRedirect; + // Set when both stdout and stderr redirects finish. + Promise<http::Response> redirectFinished; // The following must be a `std::list` // for proper erase semantics later on. list<HttpConnection> outputConnections; @@ -1163,8 +1164,7 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess( socket(_socket), waitForConnection(_waitForConnection), heartbeatInterval(_heartbeatInterval), - inputConnected(false), - redirectFinished(false) {} + inputConnected(false) {} Future<Nothing> IOSwitchboardServerProcess::run() @@ -1222,10 +1222,11 @@ Future<Nothing> IOSwitchboardServerProcess::run() // switchboard process early. // // If our IO redirects are finished and there is an input connected, - // then we postpone our termination until either a container closes - // its `stdin` or a client closes the input connection so that we can - // guarantee returning a http response for `ATTACH_CONTAINER_INPUT` - // request before terminating ourselves. + // then we set `redirectFinished` promise which triggers a callback for + // `attachContainerInput()`. This callback returns a final `HTTP 200` + // response to the client, even if the client has not yet sent the EOF + // message. So we postpone our termination until we send a final + // response to the client. // // NOTE: We always call `terminate()` with `false` to ensure // that our event queue is drained before actually terminating. @@ -1256,9 +1257,9 @@ Future<Nothing> IOSwitchboardServerProcess::run() collect(stdoutRedirect, stderrRedirect) .then(defer(self(), [this]() { - redirectFinished = true; - - if (!inputConnected) { + if (inputConnected) { + redirectFinished.set(http::OK()); + } else { terminate(self(), false); } return Nothing(); @@ -1608,7 +1609,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( // Loop through each record and process it. Return a proper // response once the last record has been fully processed. - return loop( + auto readLoop = loop( self(), [=]() { return reader->read(); @@ -1699,22 +1700,43 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( UNREACHABLE(); } } - }) - // We explicitly specify the return type to avoid a type deduction - // issue in some versions of clang. See MESOS-2943. - .then(defer(self(), [=](const http::Response& response) -> http::Response { - // Reset `inputConnected` to allow future input connections. - inputConnected = false; - - // If IO redirects are finished or writing to `stdin` failed we want - // to terminate ourselves (after flushing any outstanding messages - // from our message queue). - if (redirectFinished || failure.isSome()) { - terminate(self(), false); - } + }); - return response; - })); + // We create a new promise, which is transitioned to `READY` when either + // the read loop finishes or IO redirects finish. Once this promise is set, + // we return a final response to the client. + // + // TODO(abudnik): Ideally, we would have used `process::select()` to capture a + // transition into a terminal state for any of `{readLoop, redirectFinished}`. + // However, `select()` currently does not capture a future that has failed. + // Another alternative would be to allow `promise::associate()` to accept + // multiple source futures. + Owned<Promise<http::Response>> promise(new Promise<http::Response>()); + + auto setPromise = [promise](const Future<http::Response>& response) { + promise->set(response); + }; + + readLoop.onAny(defer(self(), setPromise)); + + redirectFinished.future().onAny(defer(self(), setPromise)); + + // We explicitly specify the return type to avoid a type deduction + // issue in some versions of clang. See MESOS-2943. + return promise->future().then( + defer(self(), [=](const http::Response& response) -> http::Response { + // Reset `inputConnected` to allow future input connections. + inputConnected = false; + + // If IO redirects are finished or writing to `stdin` failed we want + // to terminate ourselves (after flushing any outstanding messages + // from our message queue). + if (!redirectFinished.future().isPending() || failure.isSome()) { + terminate(self(), false); + } + + return response; + })); }
