This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7ad390b3aa261f4a39ff7f2c0842f2aae39005f4
Author: Andrei Budnik <[email protected]>
AuthorDate: Tue Sep 18 19:10:07 2018 +0200

    Added `AgentAPITest.AttachContainerInputRepeat` test.
    
    This test verifies that we can call `ATTACH_CONTAINER_INPUT` more
    than once. We send a short message first then we send a long message
    in chunks.
    
    Review: https://reviews.apache.org/r/68231/
---
 src/tests/api_tests.cpp | 218 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 218 insertions(+)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index e82b93c..b3fec4e 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -6894,6 +6894,224 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
 }
 
 
+// This test verifies that we can call `ATTACH_CONTAINER_INPUT` more than once.
+// We send a short message first, then we send a long message by chunks.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
+{
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_STARTING, status->state());
+
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+  ASSERT_EQ(1u, containerIds->size());
+
+  // Launch a nested container session that runs `cat`.
+  v1::ContainerID containerId;
+  containerId.set_value(id::UUID::random().toString());
+  containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+  v1::agent::Call call;
+  call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+  call.mutable_launch_nested_container_session()->mutable_container_id()
+    ->CopyFrom(containerId);
+
+  call.mutable_launch_nested_container_session()->mutable_command()->set_value(
+      "cat");
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+    slave.get()->pid,
+    "api/v1",
+    headers,
+    serialize(contentType, call),
+    stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type"));
+  ASSERT_NONE(response->headers.get(MESSAGE_CONTENT_TYPE));
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  auto attachContainerInput = [&](const std::string& data, bool sendEOF) {
+    http::Pipe pipe;
+    http::Pipe::Writer writer = pipe.writer();
+    http::Pipe::Reader reader = pipe.reader();
+
+    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+        serialize, contentType, lambda::_1));
+
+    {
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+      attach->mutable_container_id()->CopyFrom(containerId);
+
+      writer.write(encoder.encode(call));
+    }
+
+    size_t offset = 0;
+    size_t chunkSize = 4096;
+    while (offset < data.length()) {
+      string dataChunk = data.substr(offset, chunkSize);
+      offset += chunkSize;
+
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+      v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+      processIO->set_type(v1::agent::ProcessIO::DATA);
+      processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+      processIO->mutable_data()->set_data(dataChunk);
+
+      writer.write(encoder.encode(call));
+    }
+
+    // Signal `EOF` to the 'cat' command.
+    if (sendEOF) {
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+      v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+      processIO->set_type(v1::agent::ProcessIO::DATA);
+      processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+      processIO->mutable_data()->set_data("");
+
+      writer.write(encoder.encode(call));
+    }
+
+    writer.close();
+
+    // TODO(anand): Add a `post()` overload that handles request streaming.
+    {
+      http::URL agent = http::URL(
+          "http",
+          slave.get()->pid.address.ip,
+          slave.get()->pid.address.port,
+          slave.get()->pid.id +
+          "/api/v1");
+
+      Future<http::Connection> _connection = http::connect(agent);
+      AWAIT_READY(_connection);
+
+      http::Connection connection = _connection.get(); // Remove const.
+
+      http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+      headers["Content-Type"] = stringify(ContentType::RECORDIO);
+      headers[MESSAGE_CONTENT_TYPE] = stringify(contentType);
+
+      http::Request request;
+      request.url = agent;
+      request.method = "POST";
+      request.type = http::Request::PIPE;
+      request.reader = reader;
+      request.headers = headers;
+
+      Future<http::Response> response = connection.send(request);
+      AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+    }
+  };
+
+  // Prepare the data to send to `cat` and send it over an
+  // `ATTACH_CONTAINER_INPUT` stream.
+  string data1 = "Hello, World!";
+  string data2 =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data2.size()) < Megabytes(1)) {
+    data2.append(data2);
+  }
+
+  attachContainerInput(data1, false);
+  attachContainerInput(data2, true);
+
+  ASSERT_SOME(response->reader);
+  Future<tuple<string, string>> received =
+    getProcessIOData(contentType, response->reader.get());
+
+  AWAIT_READY(received);
+
+  string stdoutReceived;
+  string stderrReceived;
+
+  tie(stdoutReceived, stderrReceived) = received.get();
+
+  EXPECT_EQ(stdoutReceived, data1 + data2);
+
+  ASSERT_TRUE(stderrReceived.empty());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that attaching to the output of a container fails if the
 // containerizer doesn't support the operation.
 TEST_P(AgentAPITest, AttachContainerOutputFailure)

Reply via email to