This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f81427b743789f1e01e9babeb5080f037319af50
Author: Greg Mann <[email protected]>
AuthorDate: Tue Jul 16 12:21:11 2019 -0700

    Added end-to-end tests for agent draining.
    
    This adds a minimal test for the DRAIN_AGENT scheduler call,
    including verification of master API results once an agent
    has been drained. A second test verifies the case where an
    agent is marked gone when draining is complete.
    
    Review: https://reviews.apache.org/r/71069/
---
 include/mesos/v1/mesos.hpp |   1 +
 src/tests/api_tests.cpp    | 336 +++++++++++++++++++++++++++++++++++++++++++++
 src/v1/mesos.cpp           |   6 +
 3 files changed, 343 insertions(+)

diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index df67f64..d8304f1 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -52,6 +52,7 @@ bool operator==(
     const CSIPluginContainerInfo& right);
 
 bool operator==(const DiscoveryInfo& left, const DiscoveryInfo& right);
+bool operator==(const DrainInfo& left, const DrainInfo& right);
 bool operator==(const Environment& left, const Environment& right);
 bool operator==(const ExecutorInfo& left, const ExecutorInfo& right);
 bool operator==(const FileInfo& left, const FileInfo& right);
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index af1d215..561ff20 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -20,6 +20,7 @@
 
 #include <mesos/http.hpp>
 
+#include <mesos/v1/mesos.hpp>
 #include <mesos/v1/resources.hpp>
 #include <mesos/v1/resource_provider.hpp>
 
@@ -5448,6 +5449,341 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable)
 }
 
 
+// When an operator submits a DRAIN_AGENT call, the agent should kill all
+// running tasks.
+TEST_P(MasterAPITest, DrainAgent)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<Nothing> registrarApplyDrained;
+  EXPECT_CALL(*master.get()->registrar, apply(_))
+    .WillOnce(DoDefault())
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyDrained),
+        Invoke(master.get()->registrar.get(), 
&MockRegistrar::unmocked_apply)));
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(killedUpdate);
+  AWAIT_READY(registrarApplyDrained);
+
+  // Ensure that the update acknowledgement has been processed.
+  Clock::settle();
+
+  mesos::v1::DrainInfo drainInfo;
+  drainInfo.set_state(mesos::v1::DRAINED);
+  drainInfo.mutable_config()->set_mark_gone(false);
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // GET_AGENTS response.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<v1::master::Response> response =
+      post(master.get()->pid, call, contentType);
+
+    AWAIT_READY(response);
+    ASSERT_TRUE(response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
+    ASSERT_EQ(response->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        response->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+
+    EXPECT_EQ(agent.drain_info(), drainInfo);
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", 
response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state-summary' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state-summary",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", 
response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+  }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
+// agent should kill all running tasks and the master should mark the agent 
gone
+// once terminal ACKs have been received.
+TEST_P(MasterAPITest, DrainAgentMarkGone)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> goneUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&goneUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // When the terminal ACK is received by the master, the agent should be 
marked
+  // gone, which entails sending a `ShutdownMessage`.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.set_mark_gone(true);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(goneUpdate);
+  AWAIT_READY(shutdownMessage);
+}
+
+
 class AgentAPITest
   : public MesosTest,
     public WithParamInterface<ContentType>
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index be479e3..c2fb528 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -146,6 +146,12 @@ bool operator==(
 }
 
 
+bool operator==(const DrainInfo& left, const DrainInfo& right)
+{
+  return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
 bool operator==(
     const Environment::Variable& left,
     const Environment::Variable& right)

Reply via email to