Added some slave recovery DockerContainerizer tests. Review: https://reviews.apache.org/r/24765
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd553815 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd553815 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd553815 Branch: refs/heads/master Commit: fd55381510c96ae7bb48b36cae2bcc329d535382 Parents: 6d50275 Author: Benjamin Hindman <[email protected]> Authored: Fri Aug 15 20:56:17 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Sat Aug 16 08:25:39 2014 -0700 ---------------------------------------------------------------------- src/tests/docker_containerizer_tests.cpp | 364 +++++++++++++++++++++++++- 1 file changed, 354 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fd553815/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index 0d7c3b1..3a55f5e 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -20,14 +20,18 @@ #include <gtest/gtest.h> #include <process/future.hpp> +#include <process/gmock.hpp> #include <process/subprocess.hpp> #include "linux/cgroups.hpp" +#include "messages/messages.hpp" + #include "tests/flags.hpp" #include "tests/mesos.hpp" #include "slave/containerizer/docker.hpp" +#include "slave/paths.hpp" #include "slave/slave.hpp" #include "slave/state.hpp" @@ -43,7 +47,9 @@ using mesos::internal::slave::Slave; using mesos::internal::slave::DockerContainerizer; using process::Future; +using process::Message; using process::PID; +using process::UPID; using std::vector; using std::list; @@ -51,6 +57,7 @@ using std::string; using testing::_; using testing::DoDefault; +using testing::Eq; using testing::Invoke; using testing::Return; @@ -189,7 +196,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -299,7 +306,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -390,7 +397,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -618,7 +625,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -855,7 +862,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -962,7 +969,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -1057,7 +1064,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD) } -// The following test uses a docker image (mesosphere/inky) that has +// The following test uses a Docker image (mesosphere/inky) that has // an entrypoint "echo" and a default command "inky". TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) { @@ -1070,7 +1077,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -1170,7 +1177,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) } -// The following test uses a docker image (mesosphere/inky) that has +// The following test uses a Docker image (mesosphere/inky) that has // an entrypoint "echo" and a default command "inky". TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) { @@ -1183,7 +1190,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) MockDockerContainerizer dockerContainerizer(flags, docker); - Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags); ASSERT_SOME(slave); MockScheduler sched; @@ -1282,3 +1289,340 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) Shutdown(); } + + +// The slave is stopped before the first update for a task is received +// from the executor. When it comes back up we make sure the executor +// re-registers and the slave properly sends the update. +TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + // Setup recovery slave flags. + flags.checkpoint = true; + flags.recover = "reconnect"; + flags.strict = true; + + Docker docker = Docker::create(tests::flags.docker, false).get(); + + // We put the containerizer on the heap so we can more easily + // control it's lifetime, i.e., when we invoke the destructor. + MockDockerContainerizer* dockerContainerizer1 = + new MockDockerContainerizer(flags, docker); + + Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags); + ASSERT_SOME(slave1); + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo; + frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + + CommandInfo command; + command.set_value("sleep 1000"); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("busybox"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<ContainerID> containerId; + EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + Invoke(dockerContainerizer1, + &MockDockerContainerizer::_launch))); + + // Drop the first update from the executor. + Future<StatusUpdateMessage> statusUpdateMessage = + DROP_PROTOBUF(StatusUpdateMessage(), _, _); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(containerId); + + // Stop the slave before the status update is received. + AWAIT_READY(statusUpdateMessage); + + Stop(slave1.get()); + + delete dockerContainerizer1; + + Future<Message> reregisterExecutorMessage = + FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + MockDockerContainerizer* dockerContainerizer2 = + new MockDockerContainerizer(flags, docker); + + Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags); + ASSERT_SOME(slave2); + + // Ensure the executor re-registers. + AWAIT_READY(reregisterExecutorMessage); + UPID executorPid = reregisterExecutorMessage.get().from; + + ReregisterExecutorMessage reregister; + reregister.ParseFromString(reregisterExecutorMessage.get().body); + + // Executor should inform about the unacknowledged update. + ASSERT_EQ(1, reregister.updates_size()); + const StatusUpdate& update = reregister.updates(0); + ASSERT_EQ(task.task_id(), update.status().task_id()); + ASSERT_EQ(TASK_RUNNING, update.status().state()); + + // Scheduler should receive the recovered update. + AWAIT_READY(status); + ASSERT_EQ(TASK_RUNNING, status.get().state()); + + // Make sure the container is still running. + Future<list<Docker::Container> > containers = + docker.ps(true, slave::DOCKER_NAME_PREFIX); + + AWAIT_READY(containers); + + ASSERT_TRUE(exists(containers.get(), containerId.get())); + + driver.stop(); + driver.join(); + + Shutdown(); + + delete dockerContainerizer2; +} + + +// The slave is stopped before the first update for a task is received +// from the executor. When it comes back up we make sure the executor +// re-registers and the slave properly sends the update. +// +// TODO(benh): This test is currently disabled because the executor +// inside the image mesosphere/test-executor does not properly set the +// executor PID that is uses during registration, so when the new +// slave recovers it can't reconnect and instead destroys that +// container. In particular, it uses '0' for it's IP which we properly +// parse and can even properly use for sending other messages, but the +// current implementation of 'UPID::operator bool ()' fails if the IP +// component of a PID is '0'. +TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + // Setup recovery slave flags. + flags.checkpoint = true; + flags.recover = "reconnect"; + flags.strict = true; + + Docker docker = Docker::create(tests::flags.docker, false).get(); + + MockDockerContainerizer* dockerContainerizer1 = + new MockDockerContainerizer(flags, docker); + + Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags); + ASSERT_SOME(slave1); + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo; + frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + + ExecutorInfo executorInfo; + ExecutorID executorId; + executorId.set_value("e1"); + executorInfo.mutable_executor_id()->CopyFrom(executorId); + + CommandInfo command; + command.set_value("test-executor"); + executorInfo.mutable_command()->CopyFrom(command); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("mesosphere/test-executor"); + + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + executorInfo.mutable_container()->CopyFrom(containerInfo); + + task.mutable_executor()->CopyFrom(executorInfo); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<ContainerID> containerId; + Future<SlaveID> slaveId; + EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + FutureArg<4>(&slaveId), + Invoke(dockerContainerizer1, + &MockDockerContainerizer::_launchExecutor))); + + // The test-executor in the image immediately sends a TASK_RUNNING + // followed by TASK_FINISHED (no sleep/delay in between) so we need + // to drop the first TWO updates that come from the executor rather + // than only the first update like above where we can control how + // the length of the task. + Future<StatusUpdateMessage> statusUpdateMessage1 = + DROP_PROTOBUF(StatusUpdateMessage(), _, _); + + // Drop the first update from the executor. + Future<StatusUpdateMessage> statusUpdateMessage2 = + DROP_PROTOBUF(StatusUpdateMessage(), _, _); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(containerId); + AWAIT_READY(slaveId); + + // We also need to wait until the container's pid has been been + // checkpointed so that when the next slave recovers it won't treat + // the executor as having gone lost! + string path = slave::paths::getForkedPidPath( + slave::paths::getMetaRootDir(flags.work_dir), + slaveId.get(), + frameworkId.get(), + executorId, + containerId.get()); + + Duration waited = Duration::zero(); + do { + if (os::exists(path)) { + Try<string> read = os::read(path); + if (read.isSome() && read.get() != "") { + break; + } + } + os::sleep(Milliseconds(100)); + waited += Milliseconds(100); + } while (waited < Seconds(3)); + + ASSERT_TRUE(os::exists(path)); + ASSERT_SOME_NE("", os::read(path)); + + // Stop the slave before the status update is received. + AWAIT_READY(statusUpdateMessage1); + AWAIT_READY(statusUpdateMessage2); + + Stop(slave1.get()); + + delete dockerContainerizer1; + + Future<Message> reregisterExecutorMessage = + FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + MockDockerContainerizer* dockerContainerizer2 = + new MockDockerContainerizer(flags, docker); + + Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags); + ASSERT_SOME(slave2); + + // Ensure the executor re-registers. + AWAIT_READY(reregisterExecutorMessage); + UPID executorPid = reregisterExecutorMessage.get().from; + + ReregisterExecutorMessage reregister; + reregister.ParseFromString(reregisterExecutorMessage.get().body); + + // Executor should inform about the unacknowledged update. + ASSERT_EQ(1, reregister.updates_size()); + const StatusUpdate& update = reregister.updates(0); + ASSERT_EQ(task.task_id(), update.status().task_id()); + ASSERT_EQ(TASK_RUNNING, update.status().state()); + + // Scheduler should receive the recovered update. + AWAIT_READY(status); + ASSERT_EQ(TASK_RUNNING, status.get().state()); + + // Make sure the container is still running. + Future<list<Docker::Container> > containers = + docker.ps(true, slave::DOCKER_NAME_PREFIX); + + AWAIT_READY(containers); + + ASSERT_TRUE(exists(containers.get(), containerId.get())); + + driver.stop(); + driver.join(); + + Shutdown(); + + delete dockerContainerizer2; +}
