This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new a8059a7 Added tests for 'volume/csi' isolator recovery.
a8059a7 is described below
commit a8059a78473774e3d95e8e908f360ee5e9aadd0d
Author: Greg Mann <[email protected]>
AuthorDate: Fri Sep 4 10:39:10 2020 -0700
Added tests for 'volume/csi' isolator recovery.
Review: https://reviews.apache.org/r/72806/
---
.../containerizer/volume_csi_isolator_tests.cpp | 360 +++++++++++++++++++++
1 file changed, 360 insertions(+)
diff --git a/src/tests/containerizer/volume_csi_isolator_tests.cpp
b/src/tests/containerizer/volume_csi_isolator_tests.cpp
index dafb0b7..d51d3c9 100644
--- a/src/tests/containerizer/volume_csi_isolator_tests.cpp
+++ b/src/tests/containerizer/volume_csi_isolator_tests.cpp
@@ -1117,6 +1117,366 @@ TEST_P(VolumeCSIIsolatorTest, ROOT_UnmanagedPlugin)
AWAIT_READY(finishedUpdate);
}
+
+// When the agent fails over while a CSI volume is mounted to a container, the
+// agent should recover the volume state so that the volume can be successfully
+// unpublished after agent recovery is complete.
+TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_UnpublishAfterAgentFailover)
+{
+ createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB");
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+
+ Fetcher fetcher(agentFlags);
+
+ // Use a consistent ID across agent restart so that the executor can
register.
+ string processId = process::ID::generate("slave");
+
+ SlaveOptions agentOptions = SlaveOptions(detector.get())
+ .withId(processId)
+ .withFlags(agentFlags);
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions);
+ ASSERT_SOME(agent);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ v1::Offer offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ // Run a command which will loop until a file disappears. This allows us to
+ // terminate the task after agent failover.
+ Try<string> taskCommand = strings::format(
+ "touch %s && while [ -e %s ]; do : sleep 0.01 ; done",
+ TEST_CONTAINER_PATH + TEST_OUTPUT_FILE,
+ TEST_CONTAINER_PATH + TEST_OUTPUT_FILE);
+
+ v1::TaskInfo taskInfo = v1::createTask(agentId, resources,
taskCommand.get());
+
+ taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo(
+ "alpine",
+ {v1::createVolumeCsi(
+ TEST_CSI_PLUGIN_TYPE,
+ TEST_VOLUME_ID,
+ TEST_CONTAINER_PATH,
+ mesos::v1::Volume::Source::CSIVolume::VolumeCapability
+ ::AccessMode::SINGLE_NODE_WRITER,
+ false)}));
+
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+ Future<v1::scheduler::Event::Update> finishedUpdate;
+
+ testing::Sequence taskSequence;
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&finishedUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(acknowledgement);
+
+ // We wait for this acknowledgement to ensure that the agent will not re-send
+ // the TASK_RUNNING update after recovery.
+ acknowledgement = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _,
_);
+
+ AWAIT_READY(runningUpdate);
+ AWAIT_READY(acknowledgement);
+
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(agentFlags.work_dir),
+ TEST_CSI_PLUGIN_TYPE,
+ "default"),
+ TEST_VOLUME_ID);
+
+ ASSERT_TRUE(os::exists(targetPath));
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Fail over the agent and restart with a new containerizer.
+ agent.get()->terminate();
+ agent->reset();
+
+ agent = StartSlave(agentOptions);
+ ASSERT_SOME(agent);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Signal the task to complete.
+ ASSERT_SOME(os::rm(path::join(targetPath, TEST_OUTPUT_FILE)));
+
+ AWAIT_READY(finishedUpdate);
+
+ ASSERT_FALSE(os::exists(targetPath));
+}
+
+
+// When a task with a CSI volume finishes while the Mesos agent is down, the
CSI
+// volume should be correctly unpublished when the agent recovers.
+TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_FinishedWhileAgentDown)
+{
+ createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB");
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+
+ Fetcher fetcher(agentFlags);
+
+ // Use a consistent ID across agent restart so that the executor can
register.
+ string processId = process::ID::generate("slave");
+
+ SlaveOptions agentOptions = SlaveOptions(detector.get())
+ .withId(processId)
+ .withFlags(agentFlags);
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions);
+ ASSERT_SOME(agent);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ v1::Offer offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ // Run a command which will loop until a file disappears. This allows us to
+ // terminate the task after agent failover.
+ Try<string> taskCommand = strings::format(
+ "touch %s && while [ -e %s ]; do : sleep 0.1 ; done",
+ TEST_CONTAINER_PATH + TEST_OUTPUT_FILE,
+ TEST_CONTAINER_PATH + TEST_OUTPUT_FILE);
+
+ v1::TaskInfo taskInfo = v1::createTask(agentId, resources,
taskCommand.get());
+
+ taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo(
+ "alpine",
+ {v1::createVolumeCsi(
+ TEST_CSI_PLUGIN_TYPE,
+ TEST_VOLUME_ID,
+ TEST_CONTAINER_PATH,
+ mesos::v1::Volume::Source::CSIVolume::VolumeCapability
+ ::AccessMode::SINGLE_NODE_WRITER,
+ false)}));
+
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
+
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+ Future<v1::scheduler::Event::Update> finishedUpdate;
+
+ testing::Sequence taskSequence;
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED)))
+ .InSequence(taskSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&finishedUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+ Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+ mesos.send(v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(acknowledgement);
+
+ // We wait for this acknowledgement to ensure that the agent will not re-send
+ // the TASK_RUNNING update after recovery.
+ acknowledgement = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _,
_);
+
+ AWAIT_READY(runningUpdate);
+ AWAIT_READY(acknowledgement);
+
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(agentFlags.work_dir),
+ TEST_CSI_PLUGIN_TYPE,
+ "default"),
+ TEST_VOLUME_ID);
+
+ ASSERT_TRUE(os::exists(targetPath));
+
+ v1::ContainerStatus status = runningUpdate->status().container_status();
+
+ ASSERT_TRUE(status.has_container_id());
+
+ Result<pid_t> containerPid = getContainerPid(
+ agentFlags.runtime_dir,
+ devolve(status.container_id()));
+
+ ASSERT_SOME(containerPid);
+
+ Future<Option<int>> reaped = process::reap(containerPid.get());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ agent.get()->terminate();
+ agent->reset();
+
+ // Signal the task to complete.
+ ASSERT_SOME(os::rm(path::join(targetPath, TEST_OUTPUT_FILE)));
+
+ AWAIT_READY(reaped);
+
+ agent = StartSlave(agentOptions);
+ ASSERT_SOME(agent);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ AWAIT_READY(finishedUpdate);
+
+ ASSERT_FALSE(os::exists(targetPath));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {