This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7ad390b3aa261f4a39ff7f2c0842f2aae39005f4 Author: Andrei Budnik <[email protected]> AuthorDate: Tue Sep 18 19:10:07 2018 +0200 Added `AgentAPITest.AttachContainerInputRepeat` test. This test verifies that we can call `ATTACH_CONTAINER_INPUT` more than once. We send a short message first then we send a long message in chunks. Review: https://reviews.apache.org/r/68231/ --- src/tests/api_tests.cpp | 218 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index e82b93c..b3fec4e 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -6894,6 +6894,224 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( } +// This test verifies that we can call `ATTACH_CONTAINER_INPUT` more than once. +// We send a short message first, then we send a long message by chunks. +TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat) +{ + const ContentType contentType = GetParam(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + Fetcher fetcher(flags); + + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), containerizer.get(), flags); + + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); + + TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status); + ASSERT_EQ(TASK_STARTING, status->state()); + + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + AWAIT_READY(containerIds); + ASSERT_EQ(1u, containerIds->size()); + + // Launch a nested container session that runs `cat`. + v1::ContainerID containerId; + containerId.set_value(id::UUID::random().toString()); + containerId.mutable_parent()->set_value(containerIds->begin()->value()); + + v1::agent::Call call; + call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + call.mutable_launch_nested_container_session()->mutable_container_id() + ->CopyFrom(containerId); + + call.mutable_launch_nested_container_session()->mutable_command()->set_value( + "cat"); + + http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(contentType); + + Future<http::Response> response = http::streaming::post( + slave.get()->pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type")); + ASSERT_NONE(response->headers.get(MESSAGE_CONTENT_TYPE)); + ASSERT_EQ(http::Response::PIPE, response->type); + + auto attachContainerInput = [&](const std::string& data, bool sendEOF) { + http::Pipe pipe; + http::Pipe::Writer writer = pipe.writer(); + http::Pipe::Reader reader = pipe.reader(); + + ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind( + serialize, contentType, lambda::_1)); + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); + + v1::agent::Call::AttachContainerInput* attach = + call.mutable_attach_container_input(); + + attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID); + attach->mutable_container_id()->CopyFrom(containerId); + + writer.write(encoder.encode(call)); + } + + size_t offset = 0; + size_t chunkSize = 4096; + while (offset < data.length()) { + string dataChunk = data.substr(offset, chunkSize); + offset += chunkSize; + + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); + + v1::agent::Call::AttachContainerInput* attach = + call.mutable_attach_container_input(); + + attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO); + + v1::agent::ProcessIO* processIO = attach->mutable_process_io(); + processIO->set_type(v1::agent::ProcessIO::DATA); + processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); + processIO->mutable_data()->set_data(dataChunk); + + writer.write(encoder.encode(call)); + } + + // Signal `EOF` to the 'cat' command. + if (sendEOF) { + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT); + + v1::agent::Call::AttachContainerInput* attach = + call.mutable_attach_container_input(); + + attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO); + + v1::agent::ProcessIO* processIO = attach->mutable_process_io(); + processIO->set_type(v1::agent::ProcessIO::DATA); + processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN); + processIO->mutable_data()->set_data(""); + + writer.write(encoder.encode(call)); + } + + writer.close(); + + // TODO(anand): Add a `post()` overload that handles request streaming. + { + http::URL agent = http::URL( + "http", + slave.get()->pid.address.ip, + slave.get()->pid.address.port, + slave.get()->pid.id + + "/api/v1"); + + Future<http::Connection> _connection = http::connect(agent); + AWAIT_READY(_connection); + + http::Connection connection = _connection.get(); // Remove const. + + http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Content-Type"] = stringify(ContentType::RECORDIO); + headers[MESSAGE_CONTENT_TYPE] = stringify(contentType); + + http::Request request; + request.url = agent; + request.method = "POST"; + request.type = http::Request::PIPE; + request.reader = reader; + request.headers = headers; + + Future<http::Response> response = connection.send(request); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + } + }; + + // Prepare the data to send to `cat` and send it over an + // `ATTACH_CONTAINER_INPUT` stream. + string data1 = "Hello, World!"; + string data2 = + "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " + "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim " + "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " + "aliquip ex ea commodo consequat. Duis aute irure dolor in " + "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla " + "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " + "culpa qui officia deserunt mollit anim id est laborum."; + + while (Bytes(data2.size()) < Megabytes(1)) { + data2.append(data2); + } + + attachContainerInput(data1, false); + attachContainerInput(data2, true); + + ASSERT_SOME(response->reader); + Future<tuple<string, string>> received = + getProcessIOData(contentType, response->reader.get()); + + AWAIT_READY(received); + + string stdoutReceived; + string stderrReceived; + + tie(stdoutReceived, stderrReceived) = received.get(); + + EXPECT_EQ(stdoutReceived, data1 + data2); + + ASSERT_TRUE(stderrReceived.empty()); + + driver.stop(); + driver.join(); +} + + // This test verifies that attaching to the output of a container fails if the // containerizer doesn't support the operation. TEST_P(AgentAPITest, AttachContainerOutputFailure)
