Added support for COMMAND checks to the default executor. Review: https://reviews.apache.org/r/58030/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f81c6f6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f81c6f6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f81c6f6 Branch: refs/heads/master Commit: 3f81c6f6052768e326e84e2eab93c20572b490ad Parents: 3a689ab Author: Gastón Kleiman <[email protected]> Authored: Thu Mar 30 17:14:14 2017 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Thu Mar 30 19:34:24 2017 +0200 ---------------------------------------------------------------------- src/checks/checker.cpp | 454 ++++++++++++++++++-- src/checks/checker.hpp | 32 +- src/launcher/default_executor.cpp | 8 +- src/tests/check_tests.cpp | 733 ++++++++++++++++++++++++++++++++- 4 files changed, 1181 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp index d1e9083..e48e037 100644 --- a/src/checks/checker.cpp +++ b/src/checks/checker.cpp @@ -19,6 +19,7 @@ #include <cstdint> #include <iterator> #include <map> +#include <memory> #include <string> #include <tuple> #include <vector> @@ -28,6 +29,8 @@ #include <mesos/mesos.hpp> #include <mesos/type_utils.hpp> +#include <mesos/agent/agent.hpp> + #include <process/collect.hpp> #include <process/defer.hpp> #include <process/delay.hpp> @@ -49,14 +52,18 @@ #include <stout/strings.hpp> #include <stout/try.hpp> #include <stout/unreachable.hpp> +#include <stout/uuid.hpp> #include <stout/os/environment.hpp> #include <stout/os/killtree.hpp> +#include "common/http.hpp" #include "common/protobuf_utils.hpp" #include "common/status_utils.hpp" #include "common/validation.hpp" +#include "internal/evolve.hpp" + #ifdef __linux__ #include "linux/ns.hpp" #endif @@ -64,9 +71,14 @@ using process::Failure; using process::Future; using process::Owned; +using process::Promise; using process::Subprocess; +using process::http::Connection; +using process::http::Response; + using std::map; +using std::shared_ptr; using std::string; using std::tuple; using std::vector; @@ -125,7 +137,10 @@ public: const lambda::function<void(const CheckStatusInfo&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, - const std::vector<std::string>& _namespaces); + const vector<string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + bool _commandCheckViaAgent); void pause(); void resume(); @@ -141,9 +156,33 @@ private: void scheduleNext(const Duration& duration); void processCheckResult( const Stopwatch& stopwatch, - const CheckStatusInfo& result); + const Option<CheckStatusInfo>& result); process::Future<int> commandCheck(); + + process::Future<int> nestedCommandCheck(); + void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise); + void __nestedCommandCheck( + std::shared_ptr<process::Promise<int>> promise, + process::http::Connection connection); + void ___nestedCommandCheck( + std::shared_ptr<process::Promise<int>> promise, + const ContainerID& checkContainerId, + const process::http::Response& launchResponse); + + void nestedCommandCheckFailure( + std::shared_ptr<process::Promise<int>> promise, + process::http::Connection connection, + ContainerID checkContainerId, + std::shared_ptr<bool> checkTimedOut, + const std::string& failure); + + process::Future<Option<int>> waitNestedContainer( + const ContainerID& containerId); + process::Future<Option<int>> _waitNestedContainer( + const ContainerID& containerId, + const process::http::Response& httpResponse); + void processCommandCheckResult( const Stopwatch& stopwatch, const process::Future<int>& result); @@ -167,10 +206,18 @@ private: const TaskID taskId; const Option<pid_t> taskPid; const std::vector<std::string> namespaces; + const Option<ContainerID> taskContainerId; + const Option<process::http::URL> agentURL; + const bool commandCheckViaAgent; + Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone; CheckStatusInfo previousCheckStatus; bool paused; + + // Contains the ID of the most recently terminated nested container + // that was used to perform a COMMAND check. + Option<ContainerID> previousCheckContainerId; }; @@ -192,7 +239,37 @@ Try<Owned<Checker>> Checker::create( callback, taskId, taskPid, - namespaces)); + namespaces, + None(), + None(), + false)); + + return Owned<Checker>(new Checker(process)); +} + + +Try<process::Owned<Checker>> Checker::create( + const CheckInfo& check, + const lambda::function<void(const CheckStatusInfo&)>& callback, + const TaskID& taskId, + const ContainerID& taskContainerId, + const process::http::URL& agentURL) +{ + // Validate the `CheckInfo` protobuf. + Option<Error> error = validation::checkInfo(check); + if (error.isSome()) { + return error.get(); + } + + Owned<CheckerProcess> process(new CheckerProcess( + check, + callback, + taskId, + None(), + {}, + taskContainerId, + agentURL, + true)); return Owned<Checker>(new Checker(process)); } @@ -229,13 +306,19 @@ CheckerProcess::CheckerProcess( const lambda::function<void(const CheckStatusInfo&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, - const vector<string>& _namespaces) + const vector<string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + bool _commandCheckViaAgent) : ProcessBase(process::ID::generate("checker")), check(_check), updateCallback(_callback), taskId(_taskId), taskPid(_taskPid), namespaces(_namespaces), + taskContainerId(_taskContainerId), + agentURL(_agentURL), + commandCheckViaAgent(_commandCheckViaAgent), paused(false) { Try<Duration> create = Duration::create(check.delay_seconds()); @@ -306,7 +389,9 @@ void CheckerProcess::performCheck() switch (check.type()) { case CheckInfo::COMMAND: { - commandCheck().onAny(defer( + Future<int> future = commandCheckViaAgent ? nestedCommandCheck() + : commandCheck(); + future.onAny(defer( self(), &Self::processCommandCheckResult, stopwatch, lambda::_1)); break; @@ -361,7 +446,7 @@ void CheckerProcess::resume() void CheckerProcess::processCheckResult( const Stopwatch& stopwatch, - const CheckStatusInfo& result) + const Option<CheckStatusInfo>& result) { // `Checker` might have been paused while performing the check. if (paused) { @@ -370,16 +455,20 @@ void CheckerProcess::processCheckResult( return; } - VLOG(1) << "Performed " << check.type() << " check" - << " for task '" << taskId << "' in " << stopwatch.elapsed(); - - // Trigger the callback if check info changes. - if (result != previousCheckStatus) { - // We assume this is a local send, i.e., the checker library is not used - // in a binary external to the executor and hence can not exit before - // the data is sent to the executor. - updateCallback(result); - previousCheckStatus = result; + // `result` should be some if it was possible to perform the check, + // and empty if there was a transient error. + if (result.isSome()) { + VLOG(1) << "Performed " << check.type() << " check" + << " for task '" << taskId << "' in " << stopwatch.elapsed(); + + // Trigger the callback if check info changes. + if (result.get() != previousCheckStatus) { + // We assume this is a local send, i.e., the checker library is not used + // in a binary external to the executor and hence can not exit before + // the data is sent to the executor. + updateCallback(result.get()); + previousCheckStatus = result.get(); + } } scheduleNext(checkInterval); @@ -470,37 +559,346 @@ Future<int> CheckerProcess::commandCheck() } +Future<int> CheckerProcess::nestedCommandCheck() +{ + CHECK_EQ(CheckInfo::COMMAND, check.type()); + CHECK(check.has_command()); + CHECK_SOME(taskContainerId); + CHECK_SOME(agentURL); + + VLOG(1) << "Launching COMMAND check for task '" << taskId << "'"; + + // We don't want recoverable errors, e.g., the agent responding with + // HTTP status code 503, to trigger a check failure. + // + // The future returned by this method represents the result of a + // check. It will be set to the exit status of the check command if it + // succeeded, to a `Failure` if there was a non-transient error, and + // discarded if there was a transient error. + auto promise = std::make_shared<Promise<int>>(); + + if (previousCheckContainerId.isSome()) { + agent::Call call; + call.set_type(agent::Call::REMOVE_NESTED_CONTAINER); + + agent::Call::RemoveNestedContainer* removeContainer = + call.mutable_remove_nested_container(); + + removeContainer->mutable_container_id()->CopyFrom( + previousCheckContainerId.get()); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + process::http::request(request, false) + .onFailed(defer(self(), + [this, promise](const string& failure) { + LOG(WARNING) << "Connection to remove the nested container '" + << previousCheckContainerId.get() + << "' used for the COMMAND check for task '" + << taskId << "' failed: " << failure; + + // Something went wrong while sending the request, we treat this + // as a transient failure and discard the promise. + promise->discard(); + })) + .onReady(defer(self(), [this, promise](const Response& response) { + if (response.code != process::http::Status::OK) { + // The agent was unable to remove the check container, we + // treat this as a transient failure and discard the promise. + LOG(WARNING) << "Received '" << response.status << "' (" + << response.body << ") while removing the nested" + << " container '" << previousCheckContainerId.get() + << "' used for the COMMAND check for task '" + << taskId << "'"; + + promise->discard(); + } + + previousCheckContainerId = None(); + _nestedCommandCheck(promise); + })); + } else { + _nestedCommandCheck(promise); + } + + return promise->future(); +} + + +void CheckerProcess::_nestedCommandCheck( + shared_ptr<process::Promise<int>> promise) +{ + // TODO(alexr): Use a lambda named capture for + // this cached value once it is available. + const TaskID _taskId = taskId; + + process::http::connect(agentURL.get()) + .onFailed(defer(self(), [_taskId, promise](const string& failure) { + LOG(WARNING) << "Unable to establish connection with the agent to launch" + << " COMMAND check for task '" << _taskId << "'" + << ": " << failure; + + // We treat this as a transient failure. + promise->discard(); + })) + .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1)); +} + + +void CheckerProcess::__nestedCommandCheck( + shared_ptr<process::Promise<int>> promise, + Connection connection) +{ + ContainerID checkContainerId; + checkContainerId.set_value("check-" + UUID::random().toString()); + checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); + + previousCheckContainerId = checkContainerId; + + CommandInfo command(check.command().command()); + + agent::Call call; + call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + agent::Call::LaunchNestedContainerSession* launch = + call.mutable_launch_nested_container_session(); + + launch->mutable_container_id()->CopyFrom(checkContainerId); + launch->mutable_command()->CopyFrom(command); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::RECORDIO)}, + {"Message-Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + // TODO(alexr): Use a lambda named capture for + // this cached value once it is available. + const Duration timeout = checkTimeout; + + auto checkTimedOut = std::make_shared<bool>(false); + + // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with + // the output of the container. The agent will close the stream once + // the container has exited, or kill the container if the client + // closes the connection. + // + // We're calling `Connection::send` with `streamed = false`, so that + // it returns an HTTP response of type 'BODY' once the entire response + // is received. + // + // This means that this future will not be completed until after the + // check command has finished or the connection has been closed. + connection.send(request, false) + .after(checkTimeout, + defer(self(), [timeout, checkTimedOut](Future<Response> future) { + future.discard(); + + *checkTimedOut = true; + + return Failure("Command timed out after " + stringify(timeout)); + })) + .onFailed(defer(self(), + &Self::nestedCommandCheckFailure, + promise, + connection, + checkContainerId, + checkTimedOut, + lambda::_1)) + .onReady(defer(self(), + &Self::___nestedCommandCheck, + promise, + checkContainerId, + lambda::_1)); +} + + +void CheckerProcess::___nestedCommandCheck( + shared_ptr<process::Promise<int>> promise, + const ContainerID& checkContainerId, + const Response& launchResponse) +{ + if (launchResponse.code != process::http::Status::OK) { + // The agent was unable to launch the check container, + // we treat this as a transient failure. + LOG(WARNING) << "Received '" << launchResponse.status << "' (" + << launchResponse.body << ") while launching COMMAND check" + << " for task '" << taskId << "'"; + + promise->discard(); + return; + } + + waitNestedContainer(checkContainerId) + .onFailed([promise](const string& failure) { + promise->fail( + "Unable to get the exit code: " + failure); + }) + .onReady([promise](const Option<int>& status) -> void { + if (status.isNone()) { + promise->fail("Unable to get the exit code"); + // TODO(gkleiman): Make sure that the following block works on Windows. + } else if (WIFSIGNALED(status.get()) && + WTERMSIG(status.get()) == SIGKILL) { + // The check container was signaled, probably because the task + // finished while the check was still in-flight, so we discard + // the result. + promise->discard(); + } else { + promise->set(status.get()); + } + }); +} + + +void CheckerProcess::nestedCommandCheckFailure( + shared_ptr<Promise<int>> promise, + Connection connection, + ContainerID checkContainerId, + shared_ptr<bool> checkTimedOut, + const string& failure) +{ + if (*checkTimedOut) { + // The check timed out, closing the connection will make the agent + // kill the container. + connection.disconnect(); + + // If the check delay interval is zero, we'll try to perform another + // check right after we finish processing the current timeout. + // + // We'll try to remove the container created for the check at the + // beginning of the next check. In order to prevent a failure, the + // promise should only be completed once we're sure that the + // container has terminated. + waitNestedContainer(checkContainerId) + .onAny([failure, promise](const Future<Option<int>>&) { + // We assume that once `WaitNestedContainer` returns, + // irrespective of whether the response contains a failure, the + // container will be in a terminal state, and that it will be + // possible to remove it. + // + // This means that we don't need to retry the + // `WaitNestedContainer` call. + promise->fail(failure); + }); + } else { + // The agent was not able to complete the request, discarding the + // promise signals the checker that it should retry the check. + // + // This will allow us to recover from a blip. The executor will + // pause the checker when it detects that the agent is not + // available. + LOG(WARNING) << "Connection to the agent to launch COMMAND check" + << " for task '" << taskId << "' failed: " << failure; + + promise->discard(); + } +} + + +Future<Option<int>> CheckerProcess::waitNestedContainer( + const ContainerID& containerId) +{ + agent::Call call; + call.set_type(agent::Call::WAIT_NESTED_CONTAINER); + + agent::Call::WaitNestedContainer* containerWait = + call.mutable_wait_nested_container(); + + containerWait->mutable_container_id()->CopyFrom(containerId); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + return process::http::request(request, false) + .repair([containerId](const Future<Response>& future) { + return Failure( + "Connection to wait for check container '" + + stringify(containerId) + "' failed: " + future.failure()); + }) + .then(defer(self(), + &Self::_waitNestedContainer, containerId, lambda::_1)); +} + + +Future<Option<int>> CheckerProcess::_waitNestedContainer( + const ContainerID& containerId, + const Response& httpResponse) +{ + if (httpResponse.code != process::http::Status::OK) { + return Failure( + "Received '" + httpResponse.status + "' (" + httpResponse.body + + ") while waiting on check container '" + stringify(containerId) + "'"); + } + + Try<agent::Response> response = + deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body); + CHECK_SOME(response); + + CHECK(response->has_wait_nested_container()); + + return ( + response->wait_nested_container().has_exit_status() + ? Option<int>(response->wait_nested_container().exit_status()) + : Option<int>::none()); +} + + void CheckerProcess::processCommandCheckResult( const Stopwatch& stopwatch, - const Future<int>& result) + const Future<int>& future) { - CheckStatusInfo checkStatusInfo; - checkStatusInfo.set_type(check.type()); + Option<CheckStatusInfo> result; - // On Posix, `result` corresponds to termination information in the + // On Posix, `future` corresponds to termination information in the // `stat_loc` area. On Windows, `status` is obtained via calling the // `GetExitCodeProcess()` function. // // TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows, // see MESOS-7242. - if (result.isReady() && WIFEXITED(result.get())) { - const int exitCode = WEXITSTATUS(result.get()); - VLOG(1) << check.type() << " check for task '" - << taskId << "' returned: " << exitCode; + if (future.isReady() && WIFEXITED(future.get())) { + const int exitCode = WEXITSTATUS(future.get()); + VLOG(1) << check.type() << " check for task '" << taskId << "'" + << " returned: " << exitCode; + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(check.type()); checkStatusInfo.mutable_command()->set_exit_code( static_cast<int32_t>(exitCode)); + + result = checkStatusInfo; + } else if (future.isDiscarded()) { + // Check's status is currently not available due to a transient error, + // e.g., due to the agent failover, no `CheckStatusInfo` message should + // be sent to the callback. + LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded"; + + result = None(); } else { // Check's status is currently not available, which may indicate a change // that should be reported as an empty `CheckStatusInfo.Command` message. - LOG(WARNING) << check.type() << " check for task '" << taskId - << "' failed: " - << (result.isFailed() ? result.failure() : "discarded"); + LOG(WARNING) << check.type() << " check for task '" << taskId << "'" + << " failed: " << future.failure(); + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(check.type()); checkStatusInfo.mutable_command(); + + result = checkStatusInfo; } - processCheckResult(stopwatch, checkStatusInfo); + processCheckResult(stopwatch, result); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp index 1521b9c..fb939d8 100644 --- a/src/checks/checker.hpp +++ b/src/checks/checker.hpp @@ -22,6 +22,7 @@ #include <mesos/mesos.hpp> +#include <process/http.hpp> #include <process/owned.hpp> #include <stout/error.hpp> @@ -43,6 +44,9 @@ public: * Attempts to create a `Checker` object. In case of success, checking * starts immediately after initialization. * + * If the check is a COMMAND check, the checker will fork a process, enter + * the task's namespaces, and execute the commmand. + * * @param check The protobuf message definition of a check. * @param callback A callback `Checker` uses to send check status updates * to its owner (usually an executor). @@ -56,12 +60,38 @@ public: * `process::Stream<CheckStatusInfo>` rather than invoking a callback. */ static Try<process::Owned<Checker>> create( - const CheckInfo& checkInfo, + const CheckInfo& check, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const Option<pid_t>& taskPid, const std::vector<std::string>& namespaces); + /** + * Attempts to create a `Checker` object. In case of success, checking + * starts immediately after initialization. + * + * If the check is a COMMAND check, the checker will delegate the execution + * of the check to the Mesos agent via the `LaunchNestedContainerSession` + * API call. + * + * @param check The protobuf message definition of a check. + * @param callback A callback `Checker` uses to send check status updates + * to its owner (usually an executor). + * @param taskId The TaskID of the target task. + * @param taskContainerId The ContainerID of the target task. + * @param agentURL The URL of the agent. + * @return A `Checker` object or an error if `create` fails. + * + * @todo A better approach would be to return a stream of updates, e.g., + * `process::Stream<CheckStatusInfo>` rather than invoking a callback. + */ + static Try<process::Owned<Checker>> create( + const CheckInfo& check, + const lambda::function<void(const CheckStatusInfo&)>& callback, + const TaskID& taskId, + const ContainerID& taskContainerId, + const process::http::URL& agentURL); + ~Checker(); // Not copyable, not assignable. http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 79785fc..9cc40c6 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -503,17 +503,13 @@ protected: false}); if (task.has_check()) { - // TODO(alexr): Add support for command checks. - CHECK_NE(CheckInfo::COMMAND, task.check().type()) - << "Command checks are not supported yet"; - Try<Owned<checks::Checker>> checker = checks::Checker::create( task.check(), defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1), taskId, - None(), - vector<string>()); + containerId, + agent); if (checker.isError()) { // TODO(anand): Should we send a TASK_FAILED instead? http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/tests/check_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp index 78ac498..d7fcbf9 100644 --- a/src/tests/check_tests.cpp +++ b/src/tests/check_tests.cpp @@ -33,11 +33,16 @@ #include "checks/checker.hpp" +#include "slave/containerizer/fetcher.hpp" + #include "tests/flags.hpp" #include "tests/health_check_test_helper.hpp" #include "tests/mesos.hpp" #include "tests/utils.hpp" +using mesos::internal::slave::Fetcher; +using mesos::internal::slave::MesosContainerizer; + using mesos::master::detector::MasterDetector; using mesos::v1::scheduler::Call; @@ -211,6 +216,17 @@ public: mesos->send(call); } + + virtual void teardown( + Mesos* mesos, + const v1::FrameworkID& frameworkId) + { + Call call; + call.set_type(Call::TEARDOWN); + call.mutable_framework_id()->CopyFrom(frameworkId); + + mesos->send(call); + } }; @@ -661,10 +677,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing) v1::TaskInfo taskInfo = v1::createTask(agentId, resources, SLEEP_COMMAND(10000)); + // Set both check and health check interval to an increased value to + // prevent a second update coming before reconciliation response. + int interval = 10; + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); checkInfo->set_type(v1::CheckInfo::COMMAND); checkInfo->set_delay_seconds(0); - checkInfo->set_interval_seconds(0); + checkInfo->set_interval_seconds(interval); checkInfo->mutable_command()->mutable_command()->set_value("exit 1"); // Delay health check for 1s to ensure health update comes after check update. @@ -676,7 +696,7 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing) v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check(); healthCheckInfo->set_type(v1::HealthCheck::COMMAND); healthCheckInfo->set_delay_seconds(1); - healthCheckInfo->set_interval_seconds(0); + healthCheckInfo->set_interval_seconds(interval); healthCheckInfo->mutable_command()->set_value("exit 0"); launchTask(&mesos, offer, taskInfo); @@ -860,16 +880,707 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered) // These are check tests with the default executor. class DefaultExecutorCheckTest : public CheckTest {}; -// TODO(alexr): Implement following tests once the default executor supports -// command checks. + +// Verifies that a command check is supported by the default executor, +// its status is delivered in a task status update, and the last known +// status can be obtained during explicit and implicit reconciliation. +// Additionally ensures that the specified environment of the command +// check is honored. // -// 1. COMMAND check with env var works, is delivered, and is reconciled -// properly. -// 2. COMMAND check's status change is delivered. TODO(alexr): When check -// mocking is available, ensure only status changes are delivered. -// 3. COMMAND check times out. -// 4. COMMAND check and health check do not shadow each other; upon -// reconciliation both statuses are available. +// TODO(gkleiman): Check if this test works on Windows. +TEST_F_TEMP_DISABLED_ON_WINDOWS( + DefaultExecutorCheckTest, + CommandCheckDeliveredAndReconciled) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Fetcher fetcher; + + // We have to explicitly create a `Containerizer` in non-local mode, + // because `LaunchNestedContainerSession` (used by command checks) + // tries to start a IO switchboard, which doesn't work in local mode yet. + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> agent = + StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + const v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + executorInfo.mutable_shutdown_grace_period()->set_nanoseconds( + Seconds(10).ns()); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)) + .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<Event::Update> updateTaskRunning; + Future<Event::Update> updateCheckResult; + Future<Event::Update> updateExplicitReconciliation; + Future<Event::Update> updateImplicitReconciliation; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillOnce(FutureArg<1>(&updateExplicitReconciliation)) + .WillOnce(FutureArg<1>(&updateImplicitReconciliation)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(10000)); + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::COMMAND); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(0); + + v1::CommandInfo* checkCommand = + checkInfo->mutable_command()->mutable_command(); + checkCommand->set_value("exit $STATUS"); + + v1::Environment::Variable* variable = + checkCommand->mutable_environment()->add_variables(); + variable->set_name("STATUS"); + variable->set_value("1"); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + launchTaskGroup(&mesos, offer, executorInfo, taskGroup); + + AWAIT_READY(updateTaskRunning); + const v1::TaskStatus& taskRunning = updateTaskRunning->status(); + + ASSERT_EQ(TASK_RUNNING, taskRunning.state()); + EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id()); + EXPECT_TRUE(taskRunning.has_check_status()); + EXPECT_TRUE(taskRunning.check_status().has_command()); + EXPECT_FALSE(taskRunning.check_status().command().has_exit_code()); + + acknowledge(&mesos, frameworkId, taskRunning); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult.task_id()); + EXPECT_TRUE(checkResult.has_check_status()); + EXPECT_TRUE(checkResult.check_status().command().has_exit_code()); + EXPECT_EQ(1, checkResult.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, checkResult); + + // Trigger explicit reconciliation. + reconcile( + &mesos, + frameworkId, + {std::make_pair(checkResult.task_id(), checkResult.agent_id())}); + + AWAIT_READY(updateExplicitReconciliation); + const v1::TaskStatus& explicitReconciliation = + updateExplicitReconciliation->status(); + + ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_RECONCILIATION, + explicitReconciliation.reason()); + EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id()); + EXPECT_TRUE(explicitReconciliation.has_check_status()); + EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code()); + EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, explicitReconciliation); + + // Trigger implicit reconciliation. + reconcile(&mesos, frameworkId, {}); + + AWAIT_READY(updateImplicitReconciliation); + const v1::TaskStatus& implicitReconciliation = + updateImplicitReconciliation->status(); + + ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_RECONCILIATION, + implicitReconciliation.reason()); + EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id()); + EXPECT_TRUE(implicitReconciliation.has_check_status()); + EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code()); + EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code()); + + // Cleanup all mesos launched containers. + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + AWAIT_READY(containerIds); + + EXPECT_CALL(*scheduler, disconnected(_)); + + teardown(&mesos, frameworkId); + + foreach (const ContainerID& containerId, containerIds.get()) { + AWAIT_READY(containerizer->wait(containerId)); + } +} + + +// Verifies that a command check's status changes are delivered. +// +// TODO(alexr): When check mocking is available, ensure that *only* +// status changes are delivered. +// +// TODO(gkleiman): Check if this test works on Windows. +TEST_F_TEMP_DISABLED_ON_WINDOWS( + DefaultExecutorCheckTest, + CommandCheckStatusChange) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Fetcher fetcher; + + // We have to explicitly create a `Containerizer` in non-local mode, + // because `LaunchNestedContainerSession` (used by command checks) + // tries to start a IO switchboard, which doesn't work in local mode yet. + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> agent = + StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + const v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + executorInfo.mutable_shutdown_grace_period()->set_nanoseconds( + Seconds(10).ns()); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)) + .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<Event::Update> updateTaskRunning; + Future<Event::Update> updateCheckResult; + Future<Event::Update> updateCheckResultChanged; + Future<Event::Update> updateCheckResultBack; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillOnce(FutureArg<1>(&updateCheckResultChanged)) + .WillOnce(FutureArg<1>(&updateCheckResultBack)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(10000)); + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::COMMAND); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(0); + checkInfo->mutable_command()->mutable_command()->set_value( + FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX"))); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + launchTaskGroup(&mesos, offer, executorInfo, taskGroup); + + AWAIT_READY(updateTaskRunning); + ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state()); + EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id()); + + acknowledge(&mesos, frameworkId, updateTaskRunning->status()); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_TRUE(checkResult.check_status().command().has_exit_code()); + EXPECT_EQ(1, checkResult.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, checkResult); + + AWAIT_READY(updateCheckResultChanged); + const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status(); + + ASSERT_EQ(TASK_RUNNING, checkResultChanged.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResultChanged.reason()); + EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code()); + EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, checkResultChanged); + + AWAIT_READY(updateCheckResultBack); + const v1::TaskStatus& checkResultBack = updateCheckResultBack->status(); + + ASSERT_EQ(TASK_RUNNING, checkResultBack.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResultBack.reason()); + EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code()); + EXPECT_EQ(1, checkResultBack.check_status().command().exit_code()); + + // Cleanup all mesos launched containers. + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + AWAIT_READY(containerIds); + + EXPECT_CALL(*scheduler, disconnected(_)); + + teardown(&mesos, frameworkId); + + foreach (const ContainerID& containerId, containerIds.get()) { + AWAIT_READY(containerizer->wait(containerId)); + } +} + + +// Verifies that when a command check times out after a successful check, +// an empty check status update is delivered. +// +// TODO(gkleiman): Check if this test works on Windows. +TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Fetcher fetcher; + + // We have to explicitly create a `Containerizer` in non-local mode, + // because `LaunchNestedContainerSession` (used by command checks) + // tries to start a IO switchboard, which doesn't work in local mode yet. + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> agent = + StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + const v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + executorInfo.mutable_shutdown_grace_period()->set_nanoseconds( + Seconds(10).ns()); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)) + .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<Event::Update> updateTaskRunning; + Future<Event::Update> updateCheckResult; + Future<Event::Update> updateCheckResultTimeout; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillOnce(FutureArg<1>(&updateCheckResultTimeout)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(10000)); + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::COMMAND); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(0); + checkInfo->set_timeout_seconds(1); + checkInfo->mutable_command()->mutable_command()->set_value( + STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX"))); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + launchTaskGroup(&mesos, offer, executorInfo, taskGroup); + + AWAIT_READY(updateTaskRunning); + ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state()); + EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id()); + + acknowledge(&mesos, frameworkId, updateTaskRunning->status()); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_TRUE(checkResult.check_status().command().has_exit_code()); + EXPECT_EQ(1, checkResult.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, checkResult); + + AWAIT_READY(updateCheckResultTimeout); + const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status(); + + ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResultTimeout.reason()); + EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code()); + + // Cleanup all mesos launched containers. + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + AWAIT_READY(containerIds); + + EXPECT_CALL(*scheduler, disconnected(_)); + + teardown(&mesos, frameworkId); + + foreach (const ContainerID& containerId, containerIds.get()) { + AWAIT_READY(containerizer->wait(containerId)); + } +} + + +// Verifies that when both command check and health check are specified, +// health and check updates include both statuses. Also verifies that +// both statuses are included upon reconciliation. +// +// TODO(gkleiman): Check if this test works on Windows. +TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Fetcher fetcher; + + // We have to explicitly create a `Containerizer` in non-local mode, + // because `LaunchNestedContainerSession` (used by command checks) + // tries to start a IO switchboard, which doesn't work in local mode yet. + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> agent = + StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + const v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + executorInfo.mutable_shutdown_grace_period()->set_nanoseconds( + Seconds(10).ns()); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)) + .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<Event::Update> updateTaskRunning; + Future<Event::Update> updateCheckResult; + Future<Event::Update> updateHealthResult; + Future<Event::Update> updateImplicitReconciliation; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillOnce(FutureArg<1>(&updateHealthResult)) + .WillOnce(FutureArg<1>(&updateImplicitReconciliation)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(10000)); + + // Set both check and health check interval to an increased value to + // prevent a second update coming before reconciliation response. + int interval = 10; + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::COMMAND); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(interval); + checkInfo->mutable_command()->mutable_command()->set_value("exit 1"); + + // Delay health check for 1s to ensure health update comes after check update. + // + // TODO(alexr): This can lead to flakiness on busy agents. A more robust + // approach could be setting the grace period to MAX_INT, and make the + // health check pass iff a file created by the check exists. Alternatively, + // we can relax the expectation that the check update is delivered first. + v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check(); + healthCheckInfo->set_type(v1::HealthCheck::COMMAND); + healthCheckInfo->set_delay_seconds(1); + healthCheckInfo->set_interval_seconds(interval); + healthCheckInfo->mutable_command()->set_value("exit 0"); + + launchTask(&mesos, offer, taskInfo); + + AWAIT_READY(updateTaskRunning); + ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state()); + EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id()); + + acknowledge(&mesos, frameworkId, updateTaskRunning->status()); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult.task_id()); + EXPECT_FALSE(checkResult.has_healthy()); + EXPECT_TRUE(checkResult.has_check_status()); + EXPECT_TRUE(checkResult.check_status().command().has_exit_code()); + EXPECT_EQ(1, checkResult.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, checkResult); + + AWAIT_READY(updateHealthResult); + const v1::TaskStatus& healthResult = updateHealthResult->status(); + + ASSERT_EQ(TASK_RUNNING, healthResult.state()); + EXPECT_EQ(taskInfo.task_id(), healthResult.task_id()); + EXPECT_TRUE(healthResult.has_healthy()); + EXPECT_TRUE(healthResult.healthy()); + EXPECT_TRUE(healthResult.has_check_status()); + EXPECT_TRUE(healthResult.check_status().command().has_exit_code()); + EXPECT_EQ(1, healthResult.check_status().command().exit_code()); + + acknowledge(&mesos, frameworkId, healthResult); + + // Trigger implicit reconciliation. + reconcile(&mesos, frameworkId, {}); + + AWAIT_READY(updateImplicitReconciliation); + const v1::TaskStatus& implicitReconciliation = + updateImplicitReconciliation->status(); + + ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_RECONCILIATION, + implicitReconciliation.reason()); + EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id()); + EXPECT_TRUE(implicitReconciliation.has_healthy()); + EXPECT_TRUE(implicitReconciliation.healthy()); + EXPECT_TRUE(implicitReconciliation.has_check_status()); + EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code()); + EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code()); + + // Cleanup all mesos launched containers. + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + AWAIT_READY(containerIds); + + EXPECT_CALL(*scheduler, disconnected(_)); + + teardown(&mesos, frameworkId); + + foreach (const ContainerID& containerId, containerIds.get()) { + AWAIT_READY(containerizer->wait(containerId)); + } +} + // Verifies that an HTTP check is supported by the default executor and // its status is delivered in a task status update.
