This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit e443026c608f842e6dc29ba3cc78c69ce4272779 Author: Qian Zhang <[email protected]> AuthorDate: Sat Aug 3 10:44:28 2019 +0800 Added a test `ROOT_VETH_VerifyNestedContainerIPAfterReboot`. Review: https://reviews.apache.org/r/71235 --- src/tests/containerizer/cni_isolator_tests.cpp | 165 +++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/src/tests/containerizer/cni_isolator_tests.cpp b/src/tests/containerizer/cni_isolator_tests.cpp index 0fc664b..ad8239b 100644 --- a/src/tests/containerizer/cni_isolator_tests.cpp +++ b/src/tests/containerizer/cni_isolator_tests.cpp @@ -2763,6 +2763,171 @@ TEST_F(CniIsolatorTest, ROOT_CleanupAfterReboot) ASSERT_TRUE(os::exists(cniDeleteSignalFile)); } + +// This test verifies that after agent recover the task status update still +// contains the correct IP address. This is a regression test for MESOS-9868. +TEST_F(CniIsolatorTest, ROOT_VETH_VerifyNestedContainerIPAfterReboot) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + flags.isolation = "network/cni"; + flags.network_cni_plugins_dir = cniPluginDir; + flags.network_cni_config_dir = cniConfigDir; + + Owned<MasterDetector> detector = master.get()->createDetector(); + + // Start the slave with a static process ID. This allows the executor to + // reconnect with the slave upon a process restart. + const string id("agent"); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), id, flags); + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + // Start the framework with the task killing capability. + v1::FrameworkInfo::Capability capability; + capability.set_type(v1::FrameworkInfo::Capability::TASK_KILLING_STATE); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + frameworkInfo.add_capabilities()->CopyFrom(capability); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + "test_default_executor", + None(), + "cpus:0.1;mem:32;disk:32", + v1::ExecutorInfo::DEFAULT); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + mesos::v1::ContainerInfo *container = executorInfo.mutable_container(); + container->set_type(mesos::v1::ContainerInfo::MESOS); + + // Make sure the container joins the mock CNI network. + container->add_network_infos()->set_name("veth"); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::TaskInfo taskInfo = v1::createTask( + agentId, + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), + "sleep 1000"); + + Future<Event::Update> updateStarting; + Future<Event::Update> updateRunning; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(DoAll(FutureArg<1>(&updateStarting), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))) + .WillOnce(DoAll(FutureArg<1>(&updateRunning), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))); + + Future<Nothing> ackStarting = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> ackRunning = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo})); + + mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); + + AWAIT_READY(updateStarting); + ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state()); + EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id()); + + AWAIT_READY(ackStarting); + + AWAIT_READY(updateRunning); + ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state()); + EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); + + AWAIT_READY(ackRunning); + + // Stop the slave after TASK_RUNNING is received. + slave.get()->terminate(); + slave->reset(); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Restart the slave. + slave = StartSlave(detector.get(), id, flags); + ASSERT_SOME(slave); + + // Wait for the slave to reregister. + AWAIT_READY(slaveReregisteredMessage); + + Future<Event::Update> updateKilling; + Future<Event::Update> updateKilled; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(DoAll(FutureArg<1>(&updateKilling), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))) + .WillOnce(DoAll(FutureArg<1>(&updateKilled), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))); + + // Kill the task. + mesos.send(v1::createCallKill(frameworkId, taskInfo.task_id())); + + AWAIT_READY(updateKilling); + ASSERT_EQ(v1::TASK_KILLING, updateKilling->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateKilling->status().task_id()); + ASSERT_TRUE(updateKilling->status().has_container_status()); + ASSERT_EQ(updateKilling->status().container_status().network_infos_size(), 1); + + const mesos::v1::NetworkInfo networkInfo = + updateKilling->status().container_status().network_infos(0); + + ASSERT_EQ(networkInfo.ip_addresses_size(), 1); + + // Check the IP address in the task status update is from + // the mock CNI network. + ASSERT_TRUE(strings::startsWith( + networkInfo.ip_addresses(0).ip_address(), "203.0.113")); + + AWAIT_READY(updateKilled); + ASSERT_EQ(v1::TASK_KILLED, updateKilled->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateKilled->status().task_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
