This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit e941d206f651bde861675a6517a89e44d1f61a34 Author: Andrei Budnik <[email protected]> AuthorDate: Tue Sep 18 19:10:01 2018 +0200 Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test. This test verifies that IOSwitchboard, which holds an open HTTP input connection, terminates once IO redirects finish for the corresponding nested container. Review: https://reviews.apache.org/r/68230/ --- src/tests/api_tests.cpp | 248 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 248 insertions(+) diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index 6be0dfa..e82b93c 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -6646,6 +6646,254 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( } +// This test verifies that IOSwitchboard, which holds an open HTTP input +// connection, terminates once IO redirects finish for the corresponding +// nested container: +// 1. Launches a parent container `sleep 1000` via the default executor. +// 2. Launches "sh" as a nested container session. +// 3. Attaches to nested container's input via `ATTACH_CONTAINER_INPUT` +// call to send "kill `pgrep sleep`" command into "sh" which kills +// the parent container. +// 4. Check that all containers have been terminated. +TEST_P_TEMP_DISABLED_ON_WINDOWS( + AgentAPITest, + ROOT_CGROUPS_LaunchNestedContainerSessionKillTask) +{ + const ContentType contentType = GetParam(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + flags.isolation = "filesystem/linux,namespaces/pid"; + + Fetcher fetcher(flags); + + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + Owned<slave::Containerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), containerizer.get(), flags); + + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + contentType, + scheduler); + + AWAIT_READY(subscribed); + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Future<v1::scheduler::Event::Update> updateStarting; + Future<v1::scheduler::Event::Update> updateRunning; + Future<v1::scheduler::Event::Update> updateFailed; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce( + DoAll( + FutureArg<1>(&updateStarting), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&updateRunning), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&updateFailed), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + v1::Resources resources = + v1::Resources::parse(defaultTaskResourcesString).get(); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo}))})); + + AWAIT_READY(updateStarting); + ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id()); + + AWAIT_READY(updateRunning); + ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); + ASSERT_TRUE(updateRunning->status().has_container_status()); + + v1::ContainerStatus status = updateRunning->status().container_status(); + + ASSERT_TRUE(status.has_container_id()); + EXPECT_TRUE(status.container_id().has_parent()); + + // Launch "sh" command via `LAUNCH_NESTED_CONTAINER_SESSION` call. + v1::ContainerID containerId; + containerId.mutable_parent()->CopyFrom(status.container_id()); + containerId.set_value(id::UUID::random().toString()); + + Future<http::Response> sessionResponse; + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + call.mutable_launch_nested_container_session()->mutable_container_id() + ->CopyFrom(containerId); + + call.mutable_launch_nested_container_session()->mutable_command() + ->CopyFrom(v1::createCommandInfo("sh")); + + http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(ContentType::RECORDIO); + headers[MESSAGE_ACCEPT] = stringify(contentType); + + sessionResponse = http::streaming::post( + slave.get()->pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse); + } + + // Send "pkill sleep" to "sh" via `ATTACH_CONTAINER_INPUT` call. + // Note, that we do not close input connection because we want to emulate + // user's interactive debug session. + http::Pipe pipe; + http::Pipe::Writer writer = pipe.writer(); + http::Pipe::Reader reader = pipe.reader(); + + ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( + serialize, contentType, lambda::_1)); + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); + + v1::agent::Call::AttachContainerInput* attach = + call.mutable_attach_container_input(); + + attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); + attach->mutable_container_id()->CopyFrom(containerId); + + writer.write(encoder.encode(call)); + } + + const std::string command = "pkill sleep\n"; + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); + + v1::agent::Call::AttachContainerInput* attach = + call.mutable_attach_container_input(); + + attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO); + + v1::agent::ProcessIO* processIO = attach->mutable_process_io(); + processIO->set_type(v1::agent::ProcessIO::DATA); + processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); + processIO->mutable_data()->set_data(command); + + writer.write(encoder.encode(call)); + } + + { + // TODO(anand): Add a `post()` overload that handles request streaming. + http::URL agent = http::URL( + "http", + slave.get()->pid.address.ip, + slave.get()->pid.address.port, + slave.get()->pid.id + + "/api/v1"); + + Future<http::Connection> _connection = http::connect(agent); + AWAIT_READY(_connection); + + http::Connection connection = _connection.get(); // Remove const. + + http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Content-Type"] = stringify(ContentType::RECORDIO); + headers[MESSAGE_CONTENT_TYPE] = stringify(contentType); + + http::Request request; + request.url = agent; + request.method = "POST"; + request.type = http::Request::PIPE; + request.reader = reader; + request.headers = headers; + + Future<http::Response> response = connection.send(request); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + } + + // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION. + ASSERT_SOME(sessionResponse->reader); + + Option<http::Pipe::Reader> output = sessionResponse->reader.get(); + ASSERT_SOME(output); + + Future<tuple<string, string>> received = + getProcessIOData(contentType, output.get()); + + AWAIT_READY(received); + + string stdoutReceived; + string stderrReceived; + + tie(stdoutReceived, stderrReceived) = received.get(); + + ASSERT_TRUE(stdoutReceived.empty()); + ASSERT_TRUE(stderrReceived.empty()); + + // Check terminal status update of the task. + AWAIT_READY(updateFailed); + + ASSERT_EQ(v1::TASK_FAILED, updateFailed->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateFailed->status().task_id()); +} + + // This test verifies that attaching to the output of a container fails if the // containerizer doesn't support the operation. TEST_P(AgentAPITest, AttachContainerOutputFailure)
