This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new ff9f5cc  Fixed a bug in the agent's draining handler.
ff9f5cc is described below

commit ff9f5cc796f6a99302d94de121726cd7b5988f11
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Wed May 6 16:35:19 2020 -0700

    Fixed a bug in the agent's draining handler.
    
    Previously, when the agent had no tasks or operations and
    received a `DrainSlaveMessage`, it would checkpoint the
    `DrainConfig` to disk, implicitly placing it into a "draining"
    state indefinitely. This patch updates the agent's handler to
    avoid checkpointing anything to disk in this case.
    
    The `SlaveTest.DrainInfoInAPIOutputs` test is also removed
    and its functionality is moved into the test
    `SlaveTest.DrainAgentKillsRunningTask`. The running task in
    the latter test allows us to verify agent API outputs both
    before and after the task's terminal update is acknowleged.
    
    Review: https://reviews.apache.org/r/72368/
---
 src/slave/slave.cpp       |  12 +++
 src/tests/slave_tests.cpp | 215 +++++++++++++++++++++++++---------------------
 2 files changed, 127 insertions(+), 100 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1a32c81..c828d99 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1074,6 +1074,18 @@ void Slave::drain(
     const UPID& from,
     DrainSlaveMessage&& drainSlaveMessage)
 {
+  if (operations.empty() && frameworks.empty()) {
+    LOG(INFO)
+      << "Received DrainConfig " << drainSlaveMessage.config()
+      << (drainConfig.isSome()
+          ? "; previously stored DrainConfig " + stringify(*drainConfig)
+          : "")
+      << "; agent has no stored frameworks, tasks, or operations,"
+         " so draining is already complete";
+
+    return;
+  }
+
   hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds;
   foreachvalue (Framework* framework, frameworks) {
     foreachvalue (const auto& taskMap, framework->pendingTasks) {
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5ad04b2..b46e561 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -12089,97 +12089,8 @@ TEST_F(
 }
 
 
-// When the agent receives a `DrainSlaveMessage` from the master, the agent's
-// drain info should be visible in the agent's API output.
-TEST_F(SlaveTest, DrainInfoInAPIOutputs)
-{
-  Clock::pause();
-
-  const int GRACE_PERIOD_NANOS = 1000000;
-
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  StandaloneMasterDetector detector(master.get()->pid);
-
-  slave::Flags slaveFlags = CreateSlaveFlags();
-
-  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(slaveFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  // Simulate the master sending a `DrainSlaveMessage` to the agent.
-  DurationInfo maxGracePeriod;
-  maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
-
-  DrainConfig drainConfig;
-  drainConfig.set_mark_gone(true);
-  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
-
-  DrainSlaveMessage drainSlaveMessage;
-  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
-
-  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
-
-  Clock::settle();
-
-  {
-    v1::agent::Call call;
-    call.set_type(v1::agent::Call::GET_AGENT);
-
-    const ContentType contentType = ContentType::PROTOBUF;
-
-    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-    headers["Accept"] = stringify(contentType);
-
-    Future<process::http::Response> httpResponse =
-      process::http::post(
-          slave.get()->pid,
-          "api/v1",
-          headers,
-          serialize(contentType, call),
-          stringify(contentType));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
-
-    Future<v1::agent::Response> responseMessage =
-      deserialize<v1::agent::Response>(contentType, httpResponse->body);
-
-    AWAIT_READY(responseMessage);
-    ASSERT_TRUE(responseMessage->IsInitialized());
-    ASSERT_EQ(v1::agent::Response::GET_AGENT, responseMessage->type());
-    ASSERT_TRUE(responseMessage->get_agent().has_drain_config());
-    EXPECT_EQ(
-        drainConfig,
-        devolve(responseMessage->get_agent().drain_config()));
-  }
-
-  {
-    Future<Response> response = process::http::get(
-        slave.get()->pid,
-        "state",
-        None(),
-        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", 
response);
-
-    Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body);
-
-    ASSERT_SOME(state);
-
-    EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]);
-  }
-}
-
-
 // When an agent receives a `DrainSlaveMessage`, it should kill running tasks.
+// Agent API outputs related to draining are also verified.
 TEST_F(SlaveTest, DrainAgentKillsRunningTask)
 {
   Clock::pause();
@@ -12194,23 +12105,19 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
 
   slave::Flags slaveFlags = CreateSlaveFlags();
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags, true);
   ASSERT_SOME(slave);
 
+  slave.get()->start();
+
   Clock::advance(slaveFlags.registration_backoff_factor);
 
   AWAIT_READY(updateSlaveMessage);
 
-  // Set the partition-aware capability to ensure that the terminal update 
state
-  // is TASK_GONE_BY_OPERATOR, since we will set `mark_gone = true`.
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -12280,7 +12187,6 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
   maxGracePeriod.set_nanoseconds(0);
 
   DrainConfig drainConfig;
-  drainConfig.set_mark_gone(true);
   drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
 
   DrainSlaveMessage drainSlaveMessage;
@@ -12290,9 +12196,118 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
   EXPECT_EQ(
       v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
+
+  // Since the scheduler has not acknowledged the terminal task status update,
+  // the agent should still be in the draining state. Confirm that its drain
+  // info appears in API outputs.
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::GET_AGENT);
+
+    const ContentType contentType = ContentType::PROTOBUF;
+
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    Future<process::http::Response> httpResponse =
+      process::http::post(
+          slave.get()->pid,
+          "api/v1",
+          headers,
+          serialize(contentType, call),
+          stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
+
+    Future<v1::agent::Response> responseMessage =
+      deserialize<v1::agent::Response>(contentType, httpResponse->body);
+
+    AWAIT_READY(responseMessage);
+    ASSERT_TRUE(responseMessage->get_agent().has_drain_config());
+    EXPECT_EQ(
+        drainConfig,
+        devolve(responseMessage->get_agent().drain_config()));
+  }
+
+  {
+    Future<Response> response = process::http::get(
+        slave.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", 
response);
+
+    Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body);
+
+    ASSERT_SOME(state);
+
+    EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]);
+  }
+
+  // Now acknowledge the terminal update and confirm that the agent's drain 
info
+  // is gone.
+
+  Future<StatusUpdateAcknowledgementMessage> terminalAcknowledgement =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  // The agent won't complete draining until the framework has been removed.
+  // Set up an expectation to await on this event.
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(*slave.get()->mock(), removeFramework(_))
+    .WillOnce(DoAll(Invoke(slave.get()->mock(),
+                           &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(killedUpdate->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(killedUpdate->status().uuid());
+
+    mesos.send(call);
+  }
+
+  // Resume the clock so that the timer used by `delay()` in `os::reap()` can
+  // elapse and allow the executor process to be reaped.
+  Clock::resume();
+
+  AWAIT_READY(terminalAcknowledgement);
+  AWAIT_READY(removeFramework);
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::GET_AGENT);
+
+    const ContentType contentType = ContentType::PROTOBUF;
+
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    Future<process::http::Response> httpResponse =
+      process::http::post(
+          slave.get()->pid,
+          "api/v1",
+          headers,
+          serialize(contentType, call),
+          stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
+
+    Future<v1::agent::Response> responseMessage =
+      deserialize<v1::agent::Response>(contentType, httpResponse->body);
+
+    AWAIT_READY(responseMessage);
+    ASSERT_FALSE(responseMessage->get_agent().has_drain_config());
+  }
 }
 
 

Reply via email to