Updated command executor to support rootfs.

Review: https://reviews.apache.org/r/38900/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/782292df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/782292df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/782292df

Branch: refs/heads/master
Commit: 782292dfa29fd00e14069b747c20c7d010459abf
Parents: 6314fec
Author: Timothy Chen <[email protected]>
Authored: Mon Nov 2 18:56:04 2015 +0000
Committer: Timothy Chen <[email protected]>
Committed: Thu Nov 5 18:16:04 2015 -0800

----------------------------------------------------------------------
 src/launcher/executor.cpp | 147 +++++++++++++++++++++++++++++++++++++++--
 src/slave/constants.cpp   |   1 +
 src/slave/constants.hpp   |   3 +
 src/slave/slave.cpp       |  51 ++++++++++++--
 src/slave/slave.hpp       |   2 +-
 src/tests/slave_tests.cpp |   5 +-
 6 files changed, 196 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 50b3c6e..ee38387 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -52,6 +52,10 @@
 #include "common/http.hpp"
 #include "common/status_utils.hpp"
 
+#ifdef __linux__
+#include "linux/fs.hpp"
+#endif
+
 #include "logging/logging.hpp"
 
 #include "messages/messages.hpp"
@@ -76,26 +80,34 @@ using namespace process;
 class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
 {
 public:
-  CommandExecutorProcess(Option<char**> override, const string& 
_healthCheckDir)
+  CommandExecutorProcess(
+      const Option<char**>& override,
+      const string& _healthCheckDir,
+      const Option<string>& _sandboxDirectory,
+      const Option<string>& _user)
     : launched(false),
       killed(false),
       killedByHealthCheck(false),
       pid(-1),
       healthPid(-1),
       escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
+      executorInfo(None()),
       driver(None()),
       healthCheckDir(_healthCheckDir),
-      override(override) {}
+      override(override),
+      sandboxDirectory(_sandboxDirectory),
+      user(_user) {}
 
   virtual ~CommandExecutorProcess() {}
 
   void registered(
       ExecutorDriver* _driver,
-      const ExecutorInfo& executorInfo,
+      const ExecutorInfo& _executorInfo,
       const FrameworkInfo& frameworkInfo,
       const SlaveInfo& slaveInfo)
   {
     cout << "Registered executor on " << slaveInfo.hostname() << endl;
+    executorInfo = _executorInfo;
     driver = _driver;
   }
 
@@ -169,6 +181,65 @@ public:
       abort();
     }
 
+    CHECK_SOME(executorInfo);
+
+    Option<string> rootfs;
+    if (sandboxDirectory.isSome()) {
+      // If 'sandbox_diretory' is specified, that means the user
+      // task specifies a root filesystem, and that root filesystem has
+      // already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH.
+      // The command executor is reponsible for mounting the sandbox
+      // into the root filesystem, chrooting into it and changing the
+      // user before exec-ing the user process.
+#ifdef __linux__
+      Result<string> user = os::user();
+      if (user.isError()) {
+        cerr << "Failed to get current user: " << user.error() << endl;
+        abort();
+      } else if (user.isNone()) {
+        cerr << "Current username is not found" << endl;
+        abort();
+      } else if (user.get() != "root") {
+        cerr << "The command executor requires root with rootfs" << endl;
+        abort();
+      }
+
+      CHECK(executorInfo.get().has_container());
+
+      rootfs = path::join(
+          os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+
+      string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
+      if (!os::exists(sandbox)) {
+        Try<Nothing> mkdir = os::mkdir(sandbox);
+        if (mkdir.isError()) {
+          cerr << "Failed to create sandbox mount point  at '"
+               << sandbox << "': " << mkdir.error() << endl;
+          abort();
+        }
+      }
+
+      // Mount the sandbox into the container rootfs.
+      // NOTE: We don't use MS_REC here because the rootfs is already
+      // under the sandbox.
+      Try<Nothing> mount = fs::mount(
+          os::getcwd(),
+          sandbox,
+          None(),
+          MS_BIND,
+          NULL);
+
+      if (mount.isError()) {
+        cerr << "Unable to mount the work directory into container "
+             << "rootfs: " << mount.error() << endl;;
+        abort();
+      }
+#else
+      cerr << "Not expecting root volume with non-linux platform." << endl;
+      abort();
+#endif // __linux__
+    }
+
     // Prepare the argv before fork as it's not async signal safe.
     char **argv = new char*[task.command().arguments().size() + 1];
     for (int i = 0; i < task.command().arguments().size(); i++) {
@@ -231,6 +302,48 @@ public:
 
       os::close(pipes[1]);
 
+      if (rootfs.isSome()) {
+#ifdef __linux__
+        if (user.isSome()) {
+          // This is a work around to fix the problem that after we chroot
+          // os::su call afterwards failed because the linker may not be
+          // able to find the necessary library in the rootfs.
+          // We call os::su before chroot here to force the linker to load
+          // into memory.
+          // We also assume it's safe to su to "root" user since
+          // filesystem/linux.cpp checks for root already.
+          os::su("root");
+        }
+
+        Try<Nothing> chroot = fs::chroot::enter(rootfs.get());
+        if (chroot.isError()) {
+          cerr << "Failed to enter chroot '" << rootfs.get()
+               << "': " << chroot.error() << endl;;
+          abort();
+        }
+
+        Try<Nothing> chdir = os::chdir(sandboxDirectory.get());
+        if (chdir.isError()) {
+          cerr << "Failed to change directory to sandbox dir '"
+               << sandboxDirectory.get() << "': " << chdir.error();
+          abort();
+        }
+
+        if (user.isSome()) {
+          Try<Nothing> su = os::su(user.get());
+          if (su.isError()) {
+            cerr << "Failed to change user to '" << user.get() << "': "
+                 << su.error() << endl;
+            abort();
+          }
+        }
+#else
+        cerr << "Rootfs is only supported on Linux" << endl;
+        abort();
+#endif // __linux__
+      }
+
+
       cout << command << endl;
 
       // The child has successfully setsid, now run the command.
@@ -498,18 +611,26 @@ private:
   pid_t healthPid;
   Duration escalationTimeout;
   Timer escalationTimer;
+  Option<ExecutorInfo> executorInfo;
   Option<ExecutorDriver*> driver;
   string healthCheckDir;
   Option<char**> override;
+  Option<string> sandboxDirectory;
+  Option<string> user;
 };
 
 
 class CommandExecutor: public Executor
 {
 public:
-  CommandExecutor(Option<char**> override, string healthCheckDir)
+  CommandExecutor(
+      const Option<char**>& override,
+      const string& healthCheckDir,
+      const Option<string>& sandboxDirectory,
+      const Option<string>& user)
   {
-    process = new CommandExecutorProcess(override, healthCheckDir);
+    process = new CommandExecutorProcess(
+        override, healthCheckDir, sandboxDirectory, user);
     spawn(process);
   }
 
@@ -595,11 +716,24 @@ public:
         "subsequent 'argv' to be used with 'execvp'",
         false);
 
+    // The following flags are only applicable when a rootfs is provisioned
+    // for this command.
+    add(&sandbox_directory,
+        "sandbox_directory",
+        "The absolute path for the directory in the container where the\n"
+        "sandbox is mapped to");
+
+    add(&user,
+        "user",
+        "The user that the task should be running as.");
+
     // TODO(nnielsen): Add 'prefix' option to enable replacing
     // 'sh -c' with user specified wrapper.
   }
 
   bool override;
+  Option<string> sandbox_directory;
+  Option<string> user;
 };
 
 
@@ -636,7 +770,8 @@ int main(int argc, char** argv)
   string path =
     envPath.isSome() ? envPath.get()
                      : os::realpath(Path(argv[0]).dirname()).get();
-  mesos::internal::CommandExecutor executor(override, path);
+  mesos::internal::CommandExecutor executor(
+      override, path, flags.sandbox_directory, flags.user);
   mesos::MesosExecutorDriver driver(&executor);
   return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index b69471b..9ef5d5d 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -53,6 +53,7 @@ const Duration DOCKER_INSPECT_DELAY = Seconds(1);
 // TODO(tnachen): Make this a flag.
 const Duration DOCKER_VERSION_WAIT_TIMEOUT = Seconds(5);
 const std::string DEFAULT_AUTHENTICATEE = "crammd5";
+const std::string COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH = ".rootfs";
 
 Duration DEFAULT_MASTER_PING_TIMEOUT()
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index de6b58a..99026f7 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -118,6 +118,9 @@ const int DOCKER_PS_MAX_INSPECT_CALLS = 100;
 // trigger a re-detection of the master to cause a re-registration.
 Duration DEFAULT_MASTER_PING_TIMEOUT();
 
+// Container path that the slave sets to mount the command executor rootfs to.
+extern const std::string COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH;
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ddeece4..9739189 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1351,7 +1351,7 @@ void Slave::runTask(
     }
   }
 
-  const ExecutorInfo executorInfo = getExecutorInfo(frameworkId, task);
+  const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
   const ExecutorID& executorId = executorInfo.executor_id();
 
   if (HookManager::hooksAvailable()) {
@@ -1410,7 +1410,7 @@ void Slave::_runTask(
     return;
   }
 
-  const ExecutorInfo executorInfo = getExecutorInfo(frameworkId, task);
+  const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
   const ExecutorID& executorId = executorInfo.executor_id();
 
   if (framework->pending.contains(executorId) &&
@@ -3232,7 +3232,7 @@ Executor* Slave::getExecutor(
 
 
 ExecutorInfo Slave::getExecutorInfo(
-    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
     const TaskInfo& task)
 {
   CHECK_NE(task.has_executor(), task.has_command())
@@ -3244,7 +3244,7 @@ ExecutorInfo Slave::getExecutorInfo(
 
     // Command executors share the same id as the task.
     executor.mutable_executor_id()->set_value(task.task_id().value());
-    executor.mutable_framework_id()->CopyFrom(frameworkId);
+    executor.mutable_framework_id()->CopyFrom(frameworkInfo.id());
 
     if (task.has_container() &&
         task.container().type() != ContainerInfo::MESOS) {
@@ -3254,6 +3254,22 @@ ExecutorInfo Slave::getExecutorInfo(
       executor.mutable_container()->CopyFrom(task.container());
     }
 
+    bool hasRootfs = task.has_container() &&
+                     task.container().type() == ContainerInfo::MESOS &&
+                     task.container().mesos().has_image();
+
+    if (hasRootfs) {
+      ContainerInfo* container = executor.mutable_container();
+      container->set_type(ContainerInfo::MESOS);
+      Volume* volume = container->add_volumes();
+      volume->mutable_image()->CopyFrom(task.container().mesos().image());
+      volume->set_container_path(COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+      volume->set_mode(Volume::RW);
+      // We need to set the executor user as root as it needs to
+      // perform chroot (even when switch_user is set to false).
+      executor.mutable_command()->set_user("root");
+    }
+
     // Prepare an executor name which includes information on the
     // command being launched.
     string name = "(Task: " + task.task_id().value() + ") ";
@@ -3307,7 +3323,11 @@ ExecutorInfo Slave::getExecutorInfo(
           task.command().container());
     }
 
-    if (task.command().has_user()) {
+    // We skip setting the user for the command executor that has
+    // a rootfs image since we need root permissions to chroot.
+    // We assume command executor will change to the correct user
+    // later on.
+    if (!hasRootfs && task.command().has_user()) {
       executor.mutable_command()->set_user(task.command().user());
     }
 
@@ -3320,6 +3340,27 @@ ExecutorInfo Slave::getExecutorInfo(
     executor.mutable_command()->set_shell(true);
 
     if (path.isSome()) {
+      if (hasRootfs) {
+        executor.mutable_command()->set_shell(false);
+        executor.mutable_command()->add_arguments("mesos-executor");
+        executor.mutable_command()->add_arguments(
+            "--sandbox_directory=" + flags.sandbox_directory);
+
+        if (flags.switch_user) {
+          Option<string> user;
+          if (task.command().has_user()) {
+            user = task.command().user();
+          } else if (frameworkInfo.has_user()) {
+            user = frameworkInfo.user();
+          }
+
+          if (user.isSome()) {
+            executor.mutable_command()->add_arguments(
+                "--user=" + user.get());
+          }
+        }
+      }
+
       executor.mutable_command()->set_value(path.get());
     } else {
       executor.mutable_command()->set_value(

http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index e6fa66b..ec2dfa9 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -312,7 +312,7 @@ public:
   // Returns an ExecutorInfo for a TaskInfo (possibly
   // constructing one if the task has a CommandInfo).
   ExecutorInfo getExecutorInfo(
-      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
       const TaskInfo& task);
 
   // Shuts down the executor if it did not register yet.

http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 91dbdba..6b89eaf 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -543,6 +543,9 @@ TEST_F(SlaveTest, GetExecutorInfo)
   FrameworkID frameworkId;
   frameworkId.set_value("20141010-221431-251662764-60288-32120-0000");
 
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
   // Launch a task with the command executor.
   TaskInfo task;
   task.set_name("task");
@@ -560,7 +563,7 @@ TEST_F(SlaveTest, GetExecutorInfo)
 
   task.mutable_command()->MergeFrom(command);
 
-  const ExecutorInfo& executor = slave.getExecutorInfo(frameworkId, task);
+  const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task);
 
   // Now assert that it actually is running mesos-executor without any
   // bleedover from the command we intend on running.

Reply via email to