This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch 1.9.x in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/1.9.x by this push: new 35912a2 Fixed a bug in the agent's draining handler. 35912a2 is described below commit 35912a22081e88ba243d2b690667dff6a90c51d0 Author: Greg Mann <g...@mesosphere.io> AuthorDate: Wed May 6 16:35:19 2020 -0700 Fixed a bug in the agent's draining handler. Previously, when the agent had no tasks or operations and received a `DrainSlaveMessage`, it would checkpoint the `DrainConfig` to disk, implicitly placing it into a "draining" state indefinitely. This patch updates the agent's handler to avoid checkpointing anything to disk in this case. The `SlaveTest.DrainInfoInAPIOutputs` test is also removed and its functionality is moved into the test `SlaveTest.DrainAgentKillsRunningTask`. The running task in the latter test allows us to verify agent API outputs both before and after the task's terminal update is acknowleged. Review: https://reviews.apache.org/r/72368/ --- src/slave/slave.cpp | 12 +++ src/tests/slave_tests.cpp | 215 +++++++++++++++++++++++++--------------------- 2 files changed, 127 insertions(+), 100 deletions(-) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 23d2ddd..7110ff4 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -998,6 +998,18 @@ void Slave::drain( const UPID& from, DrainSlaveMessage&& drainSlaveMessage) { + if (operations.empty() && frameworks.empty()) { + LOG(INFO) + << "Received DrainConfig " << drainSlaveMessage.config() + << (drainConfig.isSome() + ? "; previously stored DrainConfig " + stringify(*drainConfig) + : "") + << "; agent has no stored frameworks, tasks, or operations," + " so draining is already complete"; + + return; + } + hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds; foreachvalue (Framework* framework, frameworks) { foreachvalue (const auto& taskMap, framework->pendingTasks) { diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index c147bfc..335a1c4 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -11928,97 +11928,8 @@ TEST_F( } -// When the agent receives a `DrainSlaveMessage` from the master, the agent's -// drain info should be visible in the agent's API output. -TEST_F(SlaveTest, DrainInfoInAPIOutputs) -{ - Clock::pause(); - - const int GRACE_PERIOD_NANOS = 1000000; - - Try<Owned<cluster::Master>> master = StartMaster(); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - StandaloneMasterDetector detector(master.get()->pid); - - slave::Flags slaveFlags = CreateSlaveFlags(); - - Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); - ASSERT_SOME(slave); - - Clock::advance(slaveFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - // Simulate the master sending a `DrainSlaveMessage` to the agent. - DurationInfo maxGracePeriod; - maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS); - - DrainConfig drainConfig; - drainConfig.set_mark_gone(true); - drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); - - DrainSlaveMessage drainSlaveMessage; - drainSlaveMessage.mutable_config()->CopyFrom(drainConfig); - - process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage); - - Clock::settle(); - - { - v1::agent::Call call; - call.set_type(v1::agent::Call::GET_AGENT); - - const ContentType contentType = ContentType::PROTOBUF; - - process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); - headers["Accept"] = stringify(contentType); - - Future<process::http::Response> httpResponse = - process::http::post( - slave.get()->pid, - "api/v1", - headers, - serialize(contentType, call), - stringify(contentType)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse); - - Future<v1::agent::Response> responseMessage = - deserialize<v1::agent::Response>(contentType, httpResponse->body); - - AWAIT_READY(responseMessage); - ASSERT_TRUE(responseMessage->IsInitialized()); - ASSERT_EQ(v1::agent::Response::GET_AGENT, responseMessage->type()); - ASSERT_TRUE(responseMessage->get_agent().has_drain_config()); - EXPECT_EQ( - drainConfig, - devolve(responseMessage->get_agent().drain_config())); - } - - { - Future<Response> response = process::http::get( - slave.get()->pid, - "state", - None(), - createBasicAuthHeaders(DEFAULT_CREDENTIAL)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - - Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body); - - ASSERT_SOME(state); - - EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]); - } -} - - // When an agent receives a `DrainSlaveMessage`, it should kill running tasks. +// Agent API outputs related to draining are also verified. TEST_F(SlaveTest, DrainAgentKillsRunningTask) { Clock::pause(); @@ -12033,23 +11944,19 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask) slave::Flags slaveFlags = CreateSlaveFlags(); - Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags, true); ASSERT_SOME(slave); + slave.get()->start(); + Clock::advance(slaveFlags.registration_backoff_factor); AWAIT_READY(updateSlaveMessage); - // Set the partition-aware capability to ensure that the terminal update state - // is TASK_GONE_BY_OPERATOR, since we will set `mark_gone = true`. - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -12119,7 +12026,6 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask) maxGracePeriod.set_nanoseconds(0); DrainConfig drainConfig; - drainConfig.set_mark_gone(true); drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); DrainSlaveMessage drainSlaveMessage; @@ -12129,9 +12035,118 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask) AWAIT_READY(killedUpdate); - EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state()); + EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); EXPECT_EQ( v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason()); + + // Since the scheduler has not acknowledged the terminal task status update, + // the agent should still be in the draining state. Confirm that its drain + // info appears in API outputs. + { + v1::agent::Call call; + call.set_type(v1::agent::Call::GET_AGENT); + + const ContentType contentType = ContentType::PROTOBUF; + + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(contentType); + + Future<process::http::Response> httpResponse = + process::http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse); + + Future<v1::agent::Response> responseMessage = + deserialize<v1::agent::Response>(contentType, httpResponse->body); + + AWAIT_READY(responseMessage); + ASSERT_TRUE(responseMessage->get_agent().has_drain_config()); + EXPECT_EQ( + drainConfig, + devolve(responseMessage->get_agent().drain_config())); + } + + { + Future<Response> response = process::http::get( + slave.get()->pid, + "state", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body); + + ASSERT_SOME(state); + + EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]); + } + + // Now acknowledge the terminal update and confirm that the agent's drain info + // is gone. + + Future<StatusUpdateAcknowledgementMessage> terminalAcknowledgement = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + + // The agent won't complete draining until the framework has been removed. + // Set up an expectation to await on this event. + Future<Nothing> removeFramework; + EXPECT_CALL(*slave.get()->mock(), removeFramework(_)) + .WillOnce(DoAll(Invoke(slave.get()->mock(), + &MockSlave::unmocked_removeFramework), + FutureSatisfy(&removeFramework))); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(killedUpdate->status().task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); + acknowledge->set_uuid(killedUpdate->status().uuid()); + + mesos.send(call); + } + + // Resume the clock so that the timer used by `delay()` in `os::reap()` can + // elapse and allow the executor process to be reaped. + Clock::resume(); + + AWAIT_READY(terminalAcknowledgement); + AWAIT_READY(removeFramework); + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::GET_AGENT); + + const ContentType contentType = ContentType::PROTOBUF; + + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(contentType); + + Future<process::http::Response> httpResponse = + process::http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse); + + Future<v1::agent::Response> responseMessage = + deserialize<v1::agent::Response>(contentType, httpResponse->body); + + AWAIT_READY(responseMessage); + ASSERT_FALSE(responseMessage->get_agent().has_drain_config()); + } }