This is an automated email from the ASF dual-hosted git repository.

bennoe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 62829583bccbd4ed8c0360b70e2ae084acaef061
Author: Benno Evers <[email protected]>
AuthorDate: Mon Nov 25 16:02:52 2019 +0100

    Made the default executors connect via domain sockets if available.
    
    Added support for the `MESOS_DOMAIN_SOCKET` environment variable
    to the default executor. If this variable is present, the executor
    will use the specified domain socket to connect to the agent,
    as opposed to the IP address encoded in the agent PID.
    
    Review: https://reviews.apache.org/r/71815
---
 src/executor/executor.cpp            | 52 +++++++++++++++++++++---
 src/launcher/default_executor.cpp    | 53 ++++++++++++++++++++----
 src/tests/command_executor_tests.cpp | 78 ++++++++++++++++++++++++++++++++++++
 src/tests/default_executor_tests.cpp | 74 ++++++++++++++++++++++++++++++++++
 4 files changed, 244 insertions(+), 13 deletions(-)

diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index dfa5820..7cff258 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -43,6 +43,7 @@
 #include <stout/unreachable.hpp>
 #include <stout/uuid.hpp>
 
+#include "common/domain_sockets.hpp"
 #include "common/http.hpp"
 #include "common/recordio.hpp"
 #include "common/validation.hpp"
@@ -232,12 +233,51 @@ public:
     }
 #endif // USE_SSL_SOCKET
 
-    agent = ::URL(
-        scheme,
-        upid.address.ip,
-        upid.address.port,
-        upid.id +
-        "/api/v1/executor");
+    value = env.get("MESOS_DOMAIN_SOCKET");
+    if (value.isSome()) {
+      string scheme = "http+unix";
+      std::string path = value.get();
+
+      // Currently this check should not trigger because the agent already
+      // checks the path length on startup. We still do the involved checking
+      // procedure below, on the one hand because the agent might have gotten
+      // a relative path but mainly so we are able to seamlessly start
+      // supporting custom executors with their own rootfs in the future.
+      if (path.size() >= common::DOMAIN_SOCKET_MAX_PATH_LENGTH) {
+        std::string cwd = os::getcwd();
+        VLOG(1) << "Path " << path << " too long, shortening it by using"
+                << " the relative path to " << cwd;
+
+        Try<std::string> relative = path::relative(path, cwd);
+        if (relative.isError()) {
+          EXIT(EXIT_FAILURE)
+            << "Couldnt compute path of " << path
+            << " relative to " << cwd << ": "
+            << relative.error();
+        }
+
+        path = "./" + *relative;
+      }
+
+      if (path.size() >= common::DOMAIN_SOCKET_MAX_PATH_LENGTH) {
+        EXIT(EXIT_FAILURE)
+          << "Cannot use domain sockets for communication as requested: "
+          << "Path " << path << " is longer than 108 characters";
+      }
+
+      agent = ::URL(
+          scheme,
+          path,
+          upid.id + "/api/v1/executor");
+    } else {
+      agent = ::URL(
+          scheme,
+          upid.address.ip,
+          upid.address.port,
+          upid.id + "/api/v1/executor");
+    }
+
+    LOG(INFO) << "Using URL " << agent << " for the executor API endpoint";
 
     value = env.get("MESOS_EXECUTOR_AUTHENTICATION_TOKEN");
     if (value.isSome()) {
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index 521494a..4369fd0 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -1727,16 +1727,55 @@ int main(int argc, char** argv)
       << "Expecting 'MESOS_SLAVE_PID' to be set in the environment";
   }
 
-  process::initialize();
-
   UPID upid(value.get());
   CHECK(upid) << "Failed to parse MESOS_SLAVE_PID '" << value.get() << "'";
 
-  agent = ::URL(
-      scheme,
-      upid.address.ip,
-      upid.address.port,
-      upid.id + "/api/v1");
+  value = os::getenv("MESOS_DOMAIN_SOCKET");
+  if (value.isSome()) {
+    // The previous value of `scheme` can be ignored here, since we do not
+    // use https over unix domain sockets anyways.
+    scheme = "http+unix";
+    std::string path = value.get();
+
+    if (path.size() >= 108) {
+      std::string cwd = os::getcwd();
+      VLOG(1) << "Path " << path << " too long, shortening it by using"
+              << " the relative path to " << cwd;
+
+      Try<std::string> relative = path::relative(path, cwd);
+      if (relative.isError()) {
+        EXIT(EXIT_FAILURE)
+          << "Couldnt compute relative path of socket location: "
+          << relative.error();
+      }
+      path = "./" + *relative;
+    }
+
+    // This should not happen, because the socket is supposed to
+    // be in `$MESOS_SANDBOX/agent.sock`, so the relative path should
+    // always be `./agent.sock`.
+    if (path.size() >= 108) {
+      EXIT(EXIT_FAILURE)
+        << "Cannot use domain sockets for communication as requested: "
+        << "Path " << path << " is longer than 108 characters";
+    }
+
+    agent = ::URL(
+        scheme,
+        path,
+        upid.id + "/api/v1");
+
+  } else {
+    agent = ::URL(
+        scheme,
+        upid.address.ip,
+        upid.address.port,
+        upid.id + "/api/v1");
+  }
+
+  LOG(INFO) << "Using URL " << agent << " for the streaming endpoint";
+
+  process::initialize();
 
   value = os::getenv("MESOS_SANDBOX");
   if (value.isNone()) {
diff --git a/src/tests/command_executor_tests.cpp 
b/src/tests/command_executor_tests.cpp
index c7f7711..73f8006 100644
--- a/src/tests/command_executor_tests.cpp
+++ b/src/tests/command_executor_tests.cpp
@@ -496,6 +496,84 @@ TEST_P(CommandExecutorTest, 
AllocationRoleEnvironmentVariable)
 }
 
 
+// This test checks that the command executor can communicate
+// with the agent using unix domain sockets, when the necessary
+// flags are set on the agent.
+TEST_P(CommandExecutorTest, ExecutorDomainSockets)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.http_command_executor = GetParam();
+  flags.http_executor_domain_sockets = true;
+  flags.domain_socket_location = *sandbox + "/agent.sock";
+
+  // Forward `GLOG_v` environment variable to the executor;
+  // useful when debugging tests.
+  JSON::Object executorEnvironment;
+  executorEnvironment.values["GLOG_v"] =
+    JSON::String(os::getenv("GLOG_v").getOrElse("0"));
+
+  flags.executor_environment_variables = executorEnvironment;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Start the framework without the task killing capability.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_EQ(1u, offers->size());
+
+  // Launch a task with the command executor.
+  TaskInfo task = createTask(
+      offers->front().slave_id(),
+      offers->front().resources(),
+      "test -S $MESOS_DOMAIN_SOCKET");
+
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished));
+
+  driver.launchTasks(offers->front().id(), {task});
+
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  // The task will be successful even if the executor is
+  // not using domain sockets, because the container is
+  // not mounting a separate root fs, so `$MESOS_DOMAIN_SOCKET`
+  // will point to the agent domain socket on the host
+  // filesystem.
+  AWAIT_READY(statusFinished);
+  EXPECT_EQ(TASK_FINISHED, statusFinished->state());
+
+  driver.stop();
+  driver.join();
+}
+
+
 class HTTPCommandExecutorTest
   : public MesosTest {};
 
diff --git a/src/tests/default_executor_tests.cpp 
b/src/tests/default_executor_tests.cpp
index 49c4e3b..6c71b3c 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -4417,6 +4417,80 @@ TEST_P(DefaultExecutorTest, 
AllocationRoleEnvironmentVariable)
   ASSERT_EQ(taskInfo.task_id(), finishedUpdate->status().task_id());
 }
 
+
+// This test verifies that the default executor will attempt to connect to
+// the domain socket located at `MESOS_DOMAIN_SOCKET` if that environment
+// variable is set.
+TEST_P(DefaultExecutorTest, DomainSockets)
+{
+  Try<string> socketDir = os::mkdtemp();
+  ASSERT_SOME(socketDir);
+
+  std::string socketPath = socketDir.get() + "/agent.sock";
+  Try<process::network::unix::Address> address =
+    process::network::unix::Address::create(socketPath);
+
+  ASSERT_SOME(address);
+
+  Try<process::network::unix::Socket> socket =
+    process::network::unix::Socket::create();
+  ASSERT_SOME(socket);
+
+  Try<process::network::unix::Address> bound = socket->bind(address.get());
+  ASSERT_SOME(bound);
+
+  socket->listen(1);
+  Future<process::network::unix::Socket> client = socket->accept();
+
+  std::vector<std::string> argv {
+    "mesos-default-executor"
+  };
+
+  std::map<string, string> environment = {
+    {"MESOS_DOMAIN_SOCKET", socketPath},
+    {"MESOS_EXECUTOR_ID", "some_executor"},
+    {"MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD", "1ms"},
+    {"MESOS_FRAMEWORK_ID", "some_framework"},
+    {"MESOS_SANDBOX", socketDir.get()},
+    {"MESOS_SLAVE_PID", "mesos-agent@localhost:5051"},
+    {"GLOG_v", os::getenv("GLOG_v").getOrElse("0") },
+  };
+
+  Result<std::string> path = os::realpath(BUILD_DIR);
+  ASSERT_SOME(path);
+
+  Try<process::Subprocess> executor = process::subprocess(
+        path::join(path.get(), "src/mesos-default-executor"),
+        argv,
+        process::Subprocess::FD(STDIN_FILENO),
+        process::Subprocess::FD(STDOUT_FILENO),
+        process::Subprocess::FD(STDERR_FILENO),
+        nullptr, // Don't pass flags.
+        environment,
+        None(),  // Use default clone.
+        {},      // No parent hooks.
+        { process::Subprocess::ChildHook::SETSID() });
+
+  ASSERT_SOME(executor);
+  AWAIT_ASSERT_READY(client);
+
+  std::string data(5, '\0');
+  Future<size_t> n = client->recv(&data[0], data.size());
+  AWAIT_READY(n);
+  EXPECT_GE(n.get(), 5u);
+  EXPECT_EQ(data, "POST ");
+
+  os::kill(executor->pid(), SIGTERM);
+  auto status = executor->status();
+  AWAIT_READY(status);
+
+  Try<Nothing, SocketError> shutdown = socket->shutdown();
+  ASSERT_SOME(shutdown);
+
+  Try<Nothing> rmdir = os::rmdir(socketDir.get());
+  ASSERT_SOME(rmdir);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to