This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new a8059a7  Added tests for 'volume/csi' isolator recovery.
a8059a7 is described below

commit a8059a78473774e3d95e8e908f360ee5e9aadd0d
Author: Greg Mann <[email protected]>
AuthorDate: Fri Sep 4 10:39:10 2020 -0700

    Added tests for 'volume/csi' isolator recovery.
    
    Review: https://reviews.apache.org/r/72806/
---
 .../containerizer/volume_csi_isolator_tests.cpp    | 360 +++++++++++++++++++++
 1 file changed, 360 insertions(+)

diff --git a/src/tests/containerizer/volume_csi_isolator_tests.cpp 
b/src/tests/containerizer/volume_csi_isolator_tests.cpp
index dafb0b7..d51d3c9 100644
--- a/src/tests/containerizer/volume_csi_isolator_tests.cpp
+++ b/src/tests/containerizer/volume_csi_isolator_tests.cpp
@@ -1117,6 +1117,366 @@ TEST_P(VolumeCSIIsolatorTest, ROOT_UnmanagedPlugin)
   AWAIT_READY(finishedUpdate);
 }
 
+
+// When the agent fails over while a CSI volume is mounted to a container, the
+// agent should recover the volume state so that the volume can be successfully
+// unpublished after agent recovery is complete.
+TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_UnpublishAfterAgentFailover)
+{
+  createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+
+  Fetcher fetcher(agentFlags);
+
+  // Use a consistent ID across agent restart so that the executor can 
register.
+  string processId = process::ID::generate("slave");
+
+  SlaveOptions agentOptions = SlaveOptions(detector.get())
+    .withId(processId)
+    .withFlags(agentFlags);
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions);
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  v1::Offer offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  // Run a command which will loop until a file disappears. This allows us to
+  // terminate the task after agent failover.
+  Try<string> taskCommand = strings::format(
+      "touch %s && while [ -e %s ]; do : sleep 0.01 ; done",
+      TEST_CONTAINER_PATH + TEST_OUTPUT_FILE,
+      TEST_CONTAINER_PATH + TEST_OUTPUT_FILE);
+
+  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, 
taskCommand.get());
+
+  taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo(
+      "alpine",
+      {v1::createVolumeCsi(
+          TEST_CSI_PLUGIN_TYPE,
+          TEST_VOLUME_ID,
+          TEST_CONTAINER_PATH,
+          mesos::v1::Volume::Source::CSIVolume::VolumeCapability
+            ::AccessMode::SINGLE_NODE_WRITER,
+          false)}));
+
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  Future<v1::scheduler::Event::Update> finishedUpdate;
+
+  testing::Sequence taskSequence;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&finishedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(acknowledgement);
+
+  // We wait for this acknowledgement to ensure that the agent will not re-send
+  // the TASK_RUNNING update after recovery.
+  acknowledgement = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, 
_);
+
+  AWAIT_READY(runningUpdate);
+  AWAIT_READY(acknowledgement);
+
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(agentFlags.work_dir),
+          TEST_CSI_PLUGIN_TYPE,
+          "default"),
+      TEST_VOLUME_ID);
+
+  ASSERT_TRUE(os::exists(targetPath));
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Fail over the agent and restart with a new containerizer.
+  agent.get()->terminate();
+  agent->reset();
+
+  agent = StartSlave(agentOptions);
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Signal the task to complete.
+  ASSERT_SOME(os::rm(path::join(targetPath, TEST_OUTPUT_FILE)));
+
+  AWAIT_READY(finishedUpdate);
+
+  ASSERT_FALSE(os::exists(targetPath));
+}
+
+
+// When a task with a CSI volume finishes while the Mesos agent is down, the 
CSI
+// volume should be correctly unpublished when the agent recovers.
+TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_FinishedWhileAgentDown)
+{
+  createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+
+  Fetcher fetcher(agentFlags);
+
+  // Use a consistent ID across agent restart so that the executor can 
register.
+  string processId = process::ID::generate("slave");
+
+  SlaveOptions agentOptions = SlaveOptions(detector.get())
+    .withId(processId)
+    .withFlags(agentFlags);
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions);
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  v1::Offer offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  // Run a command which will loop until a file disappears. This allows us to
+  // terminate the task after agent failover.
+  Try<string> taskCommand = strings::format(
+      "touch %s && while [ -e %s ]; do : sleep 0.1 ; done",
+      TEST_CONTAINER_PATH + TEST_OUTPUT_FILE,
+      TEST_CONTAINER_PATH + TEST_OUTPUT_FILE);
+
+  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, 
taskCommand.get());
+
+  taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo(
+      "alpine",
+      {v1::createVolumeCsi(
+          TEST_CSI_PLUGIN_TYPE,
+          TEST_VOLUME_ID,
+          TEST_CONTAINER_PATH,
+          mesos::v1::Volume::Source::CSIVolume::VolumeCapability
+            ::AccessMode::SINGLE_NODE_WRITER,
+          false)}));
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  Future<v1::scheduler::Event::Update> finishedUpdate;
+
+  testing::Sequence taskSequence;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED)))
+    .InSequence(taskSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&finishedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(acknowledgement);
+
+  // We wait for this acknowledgement to ensure that the agent will not re-send
+  // the TASK_RUNNING update after recovery.
+  acknowledgement = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, 
_);
+
+  AWAIT_READY(runningUpdate);
+  AWAIT_READY(acknowledgement);
+
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(agentFlags.work_dir),
+          TEST_CSI_PLUGIN_TYPE,
+          "default"),
+      TEST_VOLUME_ID);
+
+  ASSERT_TRUE(os::exists(targetPath));
+
+  v1::ContainerStatus status = runningUpdate->status().container_status();
+
+  ASSERT_TRUE(status.has_container_id());
+
+  Result<pid_t> containerPid = getContainerPid(
+      agentFlags.runtime_dir,
+      devolve(status.container_id()));
+
+  ASSERT_SOME(containerPid);
+
+  Future<Option<int>> reaped = process::reap(containerPid.get());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  agent.get()->terminate();
+  agent->reset();
+
+  // Signal the task to complete.
+  ASSERT_SOME(os::rm(path::join(targetPath, TEST_OUTPUT_FILE)));
+
+  AWAIT_READY(reaped);
+
+  agent = StartSlave(agentOptions);
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  AWAIT_READY(finishedUpdate);
+
+  ASSERT_FALSE(os::exists(targetPath));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to