Implemented TCP check support in command and default executors.

Review: https://reviews.apache.org/r/58196


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26e135d7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26e135d7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26e135d7

Branch: refs/heads/master
Commit: 26e135d7abe457a858deb9762ee5ff735f6048cc
Parents: bbed0e8
Author: Alexander Rukletsov <[email protected]>
Authored: Wed Apr 5 00:09:01 2017 +0200
Committer: Alexander Rukletsov <[email protected]>
Committed: Mon Apr 24 12:06:22 2017 +0200

----------------------------------------------------------------------
 src/checks/checker.cpp            | 155 ++++++++++++++++++
 src/checks/checker.hpp            |   6 +
 src/launcher/default_executor.cpp |   1 +
 src/launcher/executor.cpp         |   1 +
 src/tests/check_tests.cpp         | 281 +++++++++++++++++++++++++++++++++
 5 files changed, 444 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 2fcf527..dcc3164 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -88,8 +88,10 @@ namespace checks {
 
 #ifndef __WINDOWS__
 constexpr char HTTP_CHECK_COMMAND[] = "curl";
+constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect";
 #else
 constexpr char HTTP_CHECK_COMMAND[] = "curl.exe";
+constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect.exe";
 #endif // __WINDOWS__
 
 constexpr char DEFAULT_HTTP_SCHEME[] = "http";
@@ -133,6 +135,7 @@ class CheckerProcess : public 
ProtobufProcess<CheckerProcess>
 public:
   CheckerProcess(
       const CheckInfo& _check,
+      const string& _launcherDir,
       const lambda::function<void(const CheckStatusInfo&)>& _callback,
       const TaskID& _taskId,
       const Option<pid_t>& _taskPid,
@@ -193,11 +196,21 @@ private:
       const Stopwatch& stopwatch,
       const Future<int>& future);
 
+  Future<bool> tcpCheck();
+  Future<bool> _tcpCheck(
+      const tuple<Future<Option<int>>, Future<string>, Future<string>>& t);
+  void processTcpCheckResult(
+      const Stopwatch& stopwatch,
+      const Future<bool>& future);
+
   const CheckInfo check;
   Duration checkDelay;
   Duration checkInterval;
   Duration checkTimeout;
 
+  // Contains the binary for TCP checks.
+  const string launcherDir;
+
   const lambda::function<void(const CheckStatusInfo&)> updateCallback;
   const TaskID taskId;
   const Option<pid_t> taskPid;
@@ -220,6 +233,7 @@ private:
 
 Try<Owned<Checker>> Checker::create(
     const CheckInfo& check,
+    const string& launcherDir,
     const lambda::function<void(const CheckStatusInfo&)>& callback,
     const TaskID& taskId,
     const Option<pid_t>& taskPid,
@@ -233,6 +247,7 @@ Try<Owned<Checker>> Checker::create(
 
   Owned<CheckerProcess> process(new CheckerProcess(
       check,
+      launcherDir,
       callback,
       taskId,
       taskPid,
@@ -248,6 +263,7 @@ Try<Owned<Checker>> Checker::create(
 
 Try<Owned<Checker>> Checker::create(
     const CheckInfo& check,
+    const string& launcherDir,
     const lambda::function<void(const CheckStatusInfo&)>& callback,
     const TaskID& taskId,
     const ContainerID& taskContainerId,
@@ -262,6 +278,7 @@ Try<Owned<Checker>> Checker::create(
 
   Owned<CheckerProcess> process(new CheckerProcess(
       check,
+      launcherDir,
       callback,
       taskId,
       None(),
@@ -303,6 +320,7 @@ void Checker::resume()
 
 CheckerProcess::CheckerProcess(
     const CheckInfo& _check,
+    const string& _launcherDir,
     const lambda::function<void(const CheckStatusInfo&)>& _callback,
     const TaskID& _taskId,
     const Option<pid_t>& _taskPid,
@@ -313,6 +331,7 @@ CheckerProcess::CheckerProcess(
     bool _commandCheckViaAgent)
   : ProcessBase(process::ID::generate("checker")),
     check(_check),
+    launcherDir(_launcherDir),
     updateCallback(_callback),
     taskId(_taskId),
     taskPid(_taskPid),
@@ -407,6 +426,9 @@ void CheckerProcess::performCheck()
       break;
     }
     case CheckInfo::TCP: {
+      tcpCheck().onAny(defer(
+          self(),
+          &Self::processTcpCheckResult, stopwatch, lambda::_1));
       break;
     }
     case CheckInfo::UNKNOWN: {
@@ -1068,6 +1090,139 @@ void CheckerProcess::processHttpCheckResult(
   processCheckResult(stopwatch, result);
 }
 
+
+Future<bool> CheckerProcess::tcpCheck()
+{
+  CHECK_EQ(CheckInfo::TCP, check.type());
+  CHECK(check.has_tcp());
+
+  // TCP_CHECK_COMMAND should be reachable.
+  CHECK(os::exists(launcherDir));
+
+  const CheckInfo::Tcp& tcp = check.tcp();
+
+  VLOG(1) << "Launching TCP check for task '" << taskId << "' at port "
+          << tcp.port();
+
+  const string command = path::join(launcherDir, TCP_CHECK_COMMAND);
+
+  const vector<string> argv = {
+    command,
+    "--ip=" + stringify(DEFAULT_DOMAIN),
+    "--port=" + stringify(tcp.port())
+  };
+
+  // TODO(alexr): Consider launching the helper binary once per task lifetime,
+  // see MESOS-6766.
+  Try<Subprocess> s = subprocess(
+      command,
+      argv,
+      Subprocess::PATH(os::DEV_NULL),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      nullptr,
+      None(),
+      clone);
+
+  if (s.isError()) {
+    return Failure(
+        "Failed to create the " + command + " subprocess: " + s.error());
+  }
+
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once they are available.
+  pid_t commandPid = s->pid();
+  const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
+
+  return await(
+      s->status(),
+      process::io::read(s->out().get()),
+      process::io::read(s->err().get()))
+    .after(
+        timeout,
+        [timeout, commandPid, _taskId](Future<tuple<Future<Option<int>>,
+                                                    Future<string>,
+                                                    Future<string>>> future)
+    {
+      future.discard();
+
+      if (commandPid != -1) {
+        // Cleanup the TCP_CHECK_COMMAND process.
+        VLOG(1) << "Killing the TCP check process " << commandPid
+                << " for task '" << _taskId << "'";
+
+        os::killtree(commandPid, SIGKILL);
+      }
+
+      return Failure(
+          string(TCP_CHECK_COMMAND) + " timed out after " + 
stringify(timeout));
+    })
+    .then(defer(self(), &Self::_tcpCheck, lambda::_1));
+}
+
+
+Future<bool> CheckerProcess::_tcpCheck(
+    const tuple<Future<Option<int>>, Future<string>, Future<string>>& t)
+{
+  const Future<Option<int>>& status = std::get<0>(t);
+  if (!status.isReady()) {
+    return Failure(
+        "Failed to get the exit status of the " + string(TCP_CHECK_COMMAND) +
+        " process: " + (status.isFailed() ? status.failure() : "discarded"));
+  }
+
+  if (status->isNone()) {
+    return Failure(
+        "Failed to reap the " + string(TCP_CHECK_COMMAND) + " process");
+  }
+
+  int exitCode = status->get();
+
+  const Future<string>& commandOutput = std::get<1>(t);
+  if (commandOutput.isReady()) {
+    VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandOutput.get();
+  }
+
+  if (exitCode != 0) {
+    const Future<string>& commandError = std::get<2>(t);
+    if (commandError.isReady()) {
+      VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandError.get();
+    }
+  }
+
+  // Non-zero exit code of TCP_CHECK_COMMAND can mean configuration problem
+  // (e.g., bad command flag), system error (e.g., a socket cannot be
+  // created), or actually a failed connection. We cannot distinguish between
+  // these cases, hence treat all of them as connection failure.
+  return (exitCode == 0 ? true : false);
+}
+
+
+void CheckerProcess::processTcpCheckResult(
+    const Stopwatch& stopwatch,
+    const Future<bool>& future)
+{
+  CheckStatusInfo result;
+  result.set_type(check.type());
+
+  if (future.isReady()) {
+    VLOG(1) << check.type() << " check for task '"
+            << taskId << "' returned: " << stringify(future.get());
+
+    result.mutable_tcp()->set_succeeded(future.get());
+  } else {
+    // Check's status is currently not available, which may indicate a change
+    // that should be reported as an empty `CheckStatusInfo.Tcp` message.
+    LOG(WARNING) << check.type() << " check for task '" << taskId << "' 
failed:"
+                 << " " << (future.isFailed() ? future.failure() : 
"discarded");
+
+    result.mutable_tcp();
+  }
+
+  processCheckResult(stopwatch, result);
+}
+
 namespace validation {
 
 Option<Error> checkInfo(const CheckInfo& checkInfo)

http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index fec30a2..bbe147f 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -48,6 +48,8 @@ public:
    * the task's namespaces, and execute the commmand.
    *
    * @param check The protobuf message definition of a check.
+   * @param launcherDir A directory where Mesos helper binaries are located.
+   *     Executor must have access to this directory for TCP checks.
    * @param callback A callback `Checker` uses to send check status updates
    *     to its owner (usually an executor).
    * @param taskId The TaskID of the target task.
@@ -61,6 +63,7 @@ public:
    */
   static Try<process::Owned<Checker>> create(
       const CheckInfo& check,
+      const std::string& launcherDir,
       const lambda::function<void(const CheckStatusInfo&)>& callback,
       const TaskID& taskId,
       const Option<pid_t>& taskPid,
@@ -75,6 +78,8 @@ public:
    * API call.
    *
    * @param check The protobuf message definition of a check.
+   * @param launcherDir A directory where Mesos helper binaries are located.
+   *     Executor must have access to this directory for TCP checks.
    * @param callback A callback `Checker` uses to send check status updates
    *     to its owner (usually an executor).
    * @param taskId The TaskID of the target task.
@@ -89,6 +94,7 @@ public:
    */
   static Try<process::Owned<Checker>> create(
       const CheckInfo& check,
+      const std::string& launcherDir,
       const lambda::function<void(const CheckStatusInfo&)>& callback,
       const TaskID& taskId,
       const ContainerID& taskContainerId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index 95505fc..5c31f94 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -506,6 +506,7 @@ protected:
         Try<Owned<checks::Checker>> checker =
           checks::Checker::create(
               task.check(),
+              launcherDirectory,
               defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1),
               taskId,
               containerId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index c9cecb5..b05f73e 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -643,6 +643,7 @@ protected:
       Try<Owned<checks::Checker>> _checker =
         checks::Checker::create(
             task.check(),
+            launcherDir,
             defer(self(), &Self::taskCheckUpdated, taskId.get(), lambda::_1),
             taskId.get(),
             pid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/26e135d7/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 67124c9..67ca6fb 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -864,6 +864,138 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, 
HTTPCheckDelivered)
 }
 
 
+// Verifies that a TCP check is supported by the command executor and
+// its status is delivered in a task status update.
+//
+// TODO(alexr): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, TCPCheckDelivered)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> updateTaskRunning;
+  Future<v1::scheduler::Event::Update> updateCheckResult;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  const uint16_t testPort = getFreePort().get();
+
+  // Use `test-helper` to launch a simple HTTP
+  // server to respond to TCP checks.
+  const string command = strings::format(
+      "%s %s --ip=127.0.0.1 --port=%u",
+      getTestHelperPath("test-helper"),
+      HttpServerTestHelper::NAME,
+      testPort).get();
+
+  v1::Resources resources =
+      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::TCP);
+  checkInfo->mutable_tcp()->set_port(testPort);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(0);
+
+  launchTask(&mesos, offer, taskInfo);
+
+  AWAIT_READY(updateTaskRunning);
+  const v1::TaskStatus& taskRunning = updateTaskRunning->status();
+
+  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
+  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
+  EXPECT_TRUE(taskRunning.has_check_status());
+  EXPECT_TRUE(taskRunning.check_status().has_tcp());
+  EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded());
+
+  acknowledge(&mesos, frameworkId, taskRunning);
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+  EXPECT_TRUE(checkResult.has_check_status());
+  EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded());
+
+  // Since it takes some time for the HTTP server to start serving requests,
+  // the first several TCP checks may fail. However we still expect a
+  // successful TCP check and hence an extra status update.
+  if (checkResult.check_status().tcp().succeeded() == false)
+  {
+    // Inject an expectation for the extra status update we expect.
+    Future<v1::scheduler::Event::Update> updateCheckResult2;
+    EXPECT_CALL(*scheduler, update(_, _))
+      .WillOnce(FutureArg<1>(&updateCheckResult2))
+      .RetiresOnSaturation();
+
+    // Acknowledge (to be able to get the next update).
+    acknowledge(&mesos, frameworkId, checkResult);
+
+    AWAIT_READY(updateCheckResult2);
+    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();
+
+    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
+    ASSERT_EQ(
+        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+        checkResult2.reason());
+    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
+    EXPECT_TRUE(checkResult2.has_check_status());
+    EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded());
+    EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded());
+  }
+}
+
+
 // TODO(alexr): Implement following tests for the docker executor once
 // the docker executor supports checks.
 //
@@ -875,6 +1007,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, 
HTTPCheckDelivered)
 // 4. COMMAND check and health check do not shadow each other; upon
 //    reconciliation both statuses are available.
 // 5. HTTP check works and is delivered.
+// 6. TCP check works and is delivered.
 
 
 // These are check tests with the default executor.
@@ -1720,6 +1853,154 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, HTTPCheckDelivered)
 }
 
 
+// Verifies that a TCP check is supported by the default executor and
+// its status is delivered in a task status update.
+//
+// TODO(alexr): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, TCPCheckDelivered)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), flags);
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  const v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> updateTaskRunning;
+  Future<v1::scheduler::Event::Update> updateCheckResult;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  const uint16_t testPort = getFreePort().get();
+
+  // Use `test-helper` to launch a simple HTTP
+  // server to respond to TCP checks.
+  const string command = strings::format(
+      "%s %s --ip=127.0.0.1 --port=%u",
+      getTestHelperPath("test-helper"),
+      HttpServerTestHelper::NAME,
+      testPort).get();
+
+  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, command);
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::TCP);
+  checkInfo->mutable_tcp()->set_port(testPort);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(0);
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+  AWAIT_READY(updateTaskRunning);
+  const v1::TaskStatus& taskRunning = updateTaskRunning->status();
+
+  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
+  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
+  EXPECT_TRUE(taskRunning.has_check_status());
+  EXPECT_TRUE(taskRunning.check_status().has_tcp());
+  EXPECT_FALSE(taskRunning.check_status().tcp().has_succeeded());
+
+  // Acknowledge (to be able to get the next update).
+  acknowledge(&mesos, frameworkId, taskRunning);
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+  EXPECT_TRUE(checkResult.has_check_status());
+  EXPECT_TRUE(checkResult.check_status().tcp().has_succeeded());
+
+  // Since it takes some time for the HTTP server to start serving requests,
+  // the first several TCP checks may fail. However we still expect a
+  // successful TCP check and hence an extra status update.
+  if (checkResult.check_status().tcp().succeeded() == false)
+  {
+    // Inject an expectation for the extra status update we expect.
+    Future<v1::scheduler::Event::Update> updateCheckResult2;
+    EXPECT_CALL(*scheduler, update(_, _))
+      .WillOnce(FutureArg<1>(&updateCheckResult2))
+      .RetiresOnSaturation();
+
+    // Acknowledge (to be able to get the next update).
+    acknowledge(&mesos, frameworkId, checkResult);
+
+    AWAIT_READY(updateCheckResult2);
+    const v1::TaskStatus& checkResult2 = updateCheckResult2->status();
+
+    ASSERT_EQ(TASK_RUNNING, checkResult2.state());
+    ASSERT_EQ(
+        v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+        checkResult2.reason());
+    EXPECT_EQ(taskInfo.task_id(), checkResult2.task_id());
+    EXPECT_TRUE(checkResult2.has_check_status());
+    EXPECT_TRUE(checkResult2.check_status().tcp().has_succeeded());
+    EXPECT_EQ(true, checkResult2.check_status().tcp().succeeded());
+  }
+}
+
+
 // These are protobuf validation tests.
 //
 // TODO(alexr): Move these tests once validation code is moved closer to

Reply via email to