Implemented TCP check support in command and default executors. Review: https://reviews.apache.org/r/58196
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26e135d7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26e135d7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26e135d7 Branch: refs/heads/master Commit: 26e135d7abe457a858deb9762ee5ff735f6048cc Parents: bbed0e8 Author: Alexander Rukletsov <[email protected]> Authored: Wed Apr 5 00:09:01 2017 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Mon Apr 24 12:06:22 2017 +0200 ---------------------------------------------------------------------- src/checks/checker.cpp | 155 ++++++++++++++++++ src/checks/checker.hpp | 6 + src/launcher/default_executor.cpp | 1 + src/launcher/executor.cpp | 1 + src/tests/check_tests.cpp | 281 +++++++++++++++++++++++++++++++++ 5 files changed, 444 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/checks/checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp index 2fcf527..dcc3164 100644 --- a/src/checks/checker.cpp +++ b/src/checks/checker.cpp @@ -88,8 +88,10 @@ namespace checks { #ifndef __WINDOWS__ constexpr char HTTP_CHECK_COMMAND[] = "curl"; +constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect"; #else constexpr char HTTP_CHECK_COMMAND[] = "curl.exe"; +constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect.exe"; #endif // __WINDOWS__ constexpr char DEFAULT_HTTP_SCHEME[] = "http"; @@ -133,6 +135,7 @@ class CheckerProcess : public ProtobufProcess<CheckerProcess> public: CheckerProcess( const CheckInfo& _check, + const string& _launcherDir, const lambda::function<void(const CheckStatusInfo&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, @@ -193,11 +196,21 @@ private: const Stopwatch& stopwatch, const Future<int>& future); + Future<bool> tcpCheck(); + Future<bool> _tcpCheck( + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t); + void processTcpCheckResult( + const Stopwatch& stopwatch, + const Future<bool>& future); + const CheckInfo check; Duration checkDelay; Duration checkInterval; Duration checkTimeout; + // Contains the binary for TCP checks. + const string launcherDir; + const lambda::function<void(const CheckStatusInfo&)> updateCallback; const TaskID taskId; const Option<pid_t> taskPid; @@ -220,6 +233,7 @@ private: Try<Owned<Checker>> Checker::create( const CheckInfo& check, + const string& launcherDir, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const Option<pid_t>& taskPid, @@ -233,6 +247,7 @@ Try<Owned<Checker>> Checker::create( Owned<CheckerProcess> process(new CheckerProcess( check, + launcherDir, callback, taskId, taskPid, @@ -248,6 +263,7 @@ Try<Owned<Checker>> Checker::create( Try<Owned<Checker>> Checker::create( const CheckInfo& check, + const string& launcherDir, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const ContainerID& taskContainerId, @@ -262,6 +278,7 @@ Try<Owned<Checker>> Checker::create( Owned<CheckerProcess> process(new CheckerProcess( check, + launcherDir, callback, taskId, None(), @@ -303,6 +320,7 @@ void Checker::resume() CheckerProcess::CheckerProcess( const CheckInfo& _check, + const string& _launcherDir, const lambda::function<void(const CheckStatusInfo&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, @@ -313,6 +331,7 @@ CheckerProcess::CheckerProcess( bool _commandCheckViaAgent) : ProcessBase(process::ID::generate("checker")), check(_check), + launcherDir(_launcherDir), updateCallback(_callback), taskId(_taskId), taskPid(_taskPid), @@ -407,6 +426,9 @@ void CheckerProcess::performCheck() break; } case CheckInfo::TCP: { + tcpCheck().onAny(defer( + self(), + &Self::processTcpCheckResult, stopwatch, lambda::_1)); break; } case CheckInfo::UNKNOWN: { @@ -1068,6 +1090,139 @@ void CheckerProcess::processHttpCheckResult( processCheckResult(stopwatch, result); } + +Future<bool> CheckerProcess::tcpCheck() +{ + CHECK_EQ(CheckInfo::TCP, check.type()); + CHECK(check.has_tcp()); + + // TCP_CHECK_COMMAND should be reachable. + CHECK(os::exists(launcherDir)); + + const CheckInfo::Tcp& tcp = check.tcp(); + + VLOG(1) << "Launching TCP check for task '" << taskId << "' at port " + << tcp.port(); + + const string command = path::join(launcherDir, TCP_CHECK_COMMAND); + + const vector<string> argv = { + command, + "--ip=" + stringify(DEFAULT_DOMAIN), + "--port=" + stringify(tcp.port()) + }; + + // TODO(alexr): Consider launching the helper binary once per task lifetime, + // see MESOS-6766. + Try<Subprocess> s = subprocess( + command, + argv, + Subprocess::PATH(os::DEV_NULL), + Subprocess::PIPE(), + Subprocess::PIPE(), + nullptr, + None(), + clone); + + if (s.isError()) { + return Failure( + "Failed to create the " + command + " subprocess: " + s.error()); + } + + // TODO(alexr): Use lambda named captures for + // these cached values once they are available. + pid_t commandPid = s->pid(); + const Duration timeout = checkTimeout; + const TaskID _taskId = taskId; + + return await( + s->status(), + process::io::read(s->out().get()), + process::io::read(s->err().get())) + .after( + timeout, + [timeout, commandPid, _taskId](Future<tuple<Future<Option<int>>, + Future<string>, + Future<string>>> future) + { + future.discard(); + + if (commandPid != -1) { + // Cleanup the TCP_CHECK_COMMAND process. + VLOG(1) << "Killing the TCP check process " << commandPid + << " for task '" << _taskId << "'"; + + os::killtree(commandPid, SIGKILL); + } + + return Failure( + string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout)); + }) + .then(defer(self(), &Self::_tcpCheck, lambda::_1)); +} + + +Future<bool> CheckerProcess::_tcpCheck( + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) +{ + const Future<Option<int>>& status = std::get<0>(t); + if (!status.isReady()) { + return Failure( + "Failed to get the exit status of the " + string(TCP_CHECK_COMMAND) + + " process: " + (status.isFailed() ? status.failure() : "discarded")); + } + + if (status->isNone()) { + return Failure( + "Failed to reap the " + string(TCP_CHECK_COMMAND) + " process"); + } + + int exitCode = status->get(); + + const Future<string>& commandOutput = std::get<1>(t); + if (commandOutput.isReady()) { + VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandOutput.get(); + } + + if (exitCode != 0) { + const Future<string>& commandError = std::get<2>(t); + if (commandError.isReady()) { + VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandError.get(); + } + } + + // Non-zero exit code of TCP_CHECK_COMMAND can mean configuration problem + // (e.g., bad command flag), system error (e.g., a socket cannot be + // created), or actually a failed connection. We cannot distinguish between + // these cases, hence treat all of them as connection failure. + return (exitCode == 0 ? true : false); +} + + +void CheckerProcess::processTcpCheckResult( + const Stopwatch& stopwatch, + const Future<bool>& future) +{ + CheckStatusInfo result; + result.set_type(check.type()); + + if (future.isReady()) { + VLOG(1) << check.type() << " check for task '" + << taskId << "' returned: " << stringify(future.get()); + + result.mutable_tcp()->set_succeeded(future.get()); + } else { + // Check's status is currently not available, which may indicate a change + // that should be reported as an empty `CheckStatusInfo.Tcp` message. + LOG(WARNING) << check.type() << " check for task '" << taskId << "' failed:" + << " " << (future.isFailed() ? future.failure() : "discarded"); + + result.mutable_tcp(); + } + + processCheckResult(stopwatch, result); +} + namespace validation { Option<Error> checkInfo(const CheckInfo& checkInfo) http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/checks/checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp index fec30a2..bbe147f 100644 --- a/src/checks/checker.hpp +++ b/src/checks/checker.hpp @@ -48,6 +48,8 @@ public: * the task's namespaces, and execute the commmand. * * @param check The protobuf message definition of a check. + * @param launcherDir A directory where Mesos helper binaries are located. + * Executor must have access to this directory for TCP checks. * @param callback A callback `Checker` uses to send check status updates * to its owner (usually an executor). * @param taskId The TaskID of the target task. @@ -61,6 +63,7 @@ public: */ static Try<process::Owned<Checker>> create( const CheckInfo& check, + const std::string& launcherDir, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const Option<pid_t>& taskPid, @@ -75,6 +78,8 @@ public: * API call. * * @param check The protobuf message definition of a check. + * @param launcherDir A directory where Mesos helper binaries are located. + * Executor must have access to this directory for TCP checks. * @param callback A callback `Checker` uses to send check status updates * to its owner (usually an executor). * @param taskId The TaskID of the target task. @@ -89,6 +94,7 @@ public: */ static Try<process::Owned<Checker>> create( const CheckInfo& check, + const std::string& launcherDir, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const ContainerID& taskContainerId, http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 95505fc..5c31f94 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -506,6 +506,7 @@ protected: Try<Owned<checks::Checker>> checker = checks::Checker::create( task.check(), + launcherDirectory, defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1), taskId, containerId, http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/launcher/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index c9cecb5..b05f73e 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -643,6 +643,7 @@ protected: Try<Owned<checks::Checker>> _checker = checks::Checker::create( task.check(), + launcherDir, defer(self(), &Self::taskCheckUpdated, taskId.get(), lambda::_1), taskId.get(), pid, http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/tests/check_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp index 67124c9..67ca6fb 100644 --- a/src/tests/check_tests.cpp +++ b/src/tests/check_tests.cpp @@ -864,6 +864,138 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered) } +// Verifies that a TCP check is supported by the command executor and +// its status is delivered in a task status update. +// +// TODO(alexr): Check if this test works on Windows. +TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, TCPCheckDelivered) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get()); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<v1::scheduler::Event::Update> updateTaskRunning; + Future<v1::scheduler::Event::Update> updateCheckResult; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + const uint16_t testPort = getFreePort().get(); + + // Use `test-helper` to launch a simple HTTP + // server to respond to TCP checks. + const string command = strings::format( + "%s %s --ip=127.0.0.1 --port=%u", + getTestHelperPath("test-helper"), + HttpServerTestHelper::NAME, + testPort).get(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command); + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::TCP); + checkInfo->mutable_tcp()->set_port(testPort); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(0); + + launchTask(&mesos, offer, taskInfo); + + AWAIT_READY(updateTaskRunning); + const v1::TaskStatus& taskRunning = updateTaskRunning->status(); + + ASSERT_EQ(TASK_RUNNING, taskRunning.state()); + EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id()); + EXPECT_TRUE(taskRunning.has_check_status()); + EXPECT_TRUE(taskRunning.check_status().has_tcp()); + EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded()); + + acknowledge(&mesos, frameworkId, taskRunning); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult.task_id()); + EXPECT_TRUE(checkResult.has_check_status()); + EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded()); + + // Since it takes some time for the HTTP server to start serving requests, + // the first several TCP checks may fail. However we still expect a + // successful TCP check and hence an extra status update. + if (checkResult.check_status().tcp().succeeded() == false) + { + // Inject an expectation for the extra status update we expect. + Future<v1::scheduler::Event::Update> updateCheckResult2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateCheckResult2)) + .RetiresOnSaturation(); + + // Acknowledge (to be able to get the next update). + acknowledge(&mesos, frameworkId, checkResult); + + AWAIT_READY(updateCheckResult2); + const v1::TaskStatus& checkResult2 = updateCheckResult2->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult2.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult2.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id()); + EXPECT_TRUE(checkResult2.has_check_status()); + EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded()); + EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded()); + } +} + + // TODO(alexr): Implement following tests for the docker executor once // the docker executor supports checks. // @@ -875,6 +1007,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered) // 4. COMMAND check and health check do not shadow each other; upon // reconciliation both statuses are available. // 5. HTTP check works and is delivered. +// 6. TCP check works and is delivered. // These are check tests with the default executor. @@ -1720,6 +1853,154 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, HTTPCheckDelivered) } +// Verifies that a TCP check is supported by the default executor and +// its status is delivered in a task status update. +// +// TODO(alexr): Check if this test works on Windows. +TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, TCPCheckDelivered) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), flags); + ASSERT_SOME(agent); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + + const v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + subscribe(&mesos, frameworkInfo); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + const v1::Offer& offer = offers->offers(0); + const v1::AgentID agentId = offer.agent_id(); + + Future<v1::scheduler::Event::Update> updateTaskRunning; + Future<v1::scheduler::Event::Update> updateCheckResult; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateTaskRunning)) + .WillOnce(FutureArg<1>(&updateCheckResult)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + const uint16_t testPort = getFreePort().get(); + + // Use `test-helper` to launch a simple HTTP + // server to respond to TCP checks. + const string command = strings::format( + "%s %s --ip=127.0.0.1 --port=%u", + getTestHelperPath("test-helper"), + HttpServerTestHelper::NAME, + testPort).get(); + + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command); + + v1::CheckInfo* checkInfo = taskInfo.mutable_check(); + checkInfo->set_type(v1::CheckInfo::TCP); + checkInfo->mutable_tcp()->set_port(testPort); + checkInfo->set_delay_seconds(0); + checkInfo->set_interval_seconds(0); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + launchTaskGroup(&mesos, offer, executorInfo, taskGroup); + + AWAIT_READY(updateTaskRunning); + const v1::TaskStatus& taskRunning = updateTaskRunning->status(); + + ASSERT_EQ(TASK_RUNNING, taskRunning.state()); + EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id()); + EXPECT_TRUE(taskRunning.has_check_status()); + EXPECT_TRUE(taskRunning.check_status().has_tcp()); + EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded()); + + // Acknowledge (to be able to get the next update). + acknowledge(&mesos, frameworkId, taskRunning); + + AWAIT_READY(updateCheckResult); + const v1::TaskStatus& checkResult = updateCheckResult->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult.task_id()); + EXPECT_TRUE(checkResult.has_check_status()); + EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded()); + + // Since it takes some time for the HTTP server to start serving requests, + // the first several TCP checks may fail. However we still expect a + // successful TCP check and hence an extra status update. + if (checkResult.check_status().tcp().succeeded() == false) + { + // Inject an expectation for the extra status update we expect. + Future<v1::scheduler::Event::Update> updateCheckResult2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&updateCheckResult2)) + .RetiresOnSaturation(); + + // Acknowledge (to be able to get the next update). + acknowledge(&mesos, frameworkId, checkResult); + + AWAIT_READY(updateCheckResult2); + const v1::TaskStatus& checkResult2 = updateCheckResult2->status(); + + ASSERT_EQ(TASK_RUNNING, checkResult2.state()); + ASSERT_EQ( + v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + checkResult2.reason()); + EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id()); + EXPECT_TRUE(checkResult2.has_check_status()); + EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded()); + EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded()); + } +} + + // These are protobuf validation tests. // // TODO(alexr): Move these tests once validation code is moved closer to
