This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 38a914398b6f1aaf08db4f62f4e42cdb80127eb5 Author: Andrei Budnik <[email protected]> AuthorDate: Fri Sep 21 14:51:59 2018 +0200 Fixed broken pipe error in IOSwitchboard. Previous attempt to fix `HTTP 500` "broken pipe" in review /r/62187/ was not correct: after IOSwitchboard sends a response to the agent for the `ATTACH_CONTAINER_INPUT` call, the socket is closed immediately, thus causing the error on the agent. This patch adds a delay after IO redirects are finished and before IOSwitchboard forcibly send a response. Review: https://reviews.apache.org/r/68784/ (cherry picked from commit c3c77cbef818d497d8bd5e67fa72e55a7190e27a) --- src/slave/containerizer/mesos/io/switchboard.cpp | 42 +++++++++++++----------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp index 5bb21e7..498c008 100644 --- a/src/slave/containerizer/mesos/io/switchboard.cpp +++ b/src/slave/containerizer/mesos/io/switchboard.cpp @@ -1615,16 +1615,9 @@ IOSwitchboardServerProcess::acknowledgeContainerInputResponse() if (--numPendingAcknowledgments == 0) { // If IO redirects are finished or writing to `stdin` failed we want to // terminate ourselves (after flushing any outstanding messages from our - // message queue). Since IOSwitchboard might receive an acknowledgment for - // the `ATTACH_CONTAINER_INPUT` request before reading a final message from - // the corresponding connection, we need to delay our termination to give - // IOSwitchboard a chance to read the final message. Otherwise, the agent - // might get `HTTP 500` "broken pipe" while attempting to write the final - // message. + // message queue). if (!redirectFinished.future().isPending() || failure.isSome()) { - after(Seconds(1)).onAny(defer(self(), [=](const Future<Nothing>&) { - terminate(self(), false); - })); + terminate(self(), false); } } return http::OK(); @@ -1746,20 +1739,29 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( // the read loop finishes or IO redirects finish. Once this promise is set, // we return a final response to the client. // - // TODO(abudnik): Ideally, we would have used `process::select()` to capture a - // transition into a terminal state for any of `{readLoop, redirectFinished}`. - // However, `select()` currently does not capture a future that has failed. - // Another alternative would be to allow `promise::associate()` to accept - // multiple source futures. + // We use `defer(self(), ...)` to use this process as a synchronization point + // when changing state of the promise. Owned<Promise<http::Response>> promise(new Promise<http::Response>()); - auto setPromise = [promise](const Future<http::Response>& response) { - promise->set(response); - }; - - readLoop.onAny(defer(self(), setPromise)); + readLoop.onAny( + defer(self(), [promise](const Future<http::Response>& response) { + promise->set(response); + })); - redirectFinished.future().onAny(defer(self(), setPromise)); + // Since IOSwitchboard might receive an acknowledgment for the + // `ATTACH_CONTAINER_INPUT` request before reading a final message from + // the corresponding connection, we need to give IOSwitchboard a chance to + // read the final message. Otherwise, the agent might get `HTTP 500` + // "broken pipe" while attempting to write the final message. + redirectFinished.future().onAny( + defer(self(), [=](const Future<http::Response>& response) { + // TODO(abudnik): Ideally, we would have used `process::delay()` to + // delay a dispatch of the lambda to this process. + after(Seconds(1)) + .onAny(defer(self(), [promise, response](const Future<Nothing>&) { + promise->set(response); + })); + })); // We explicitly specify the return type to avoid a type deduction // issue in some versions of clang. See MESOS-2943.
