Updated command executor to support rootfs. Review: https://reviews.apache.org/r/38900/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/782292df Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/782292df Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/782292df Branch: refs/heads/master Commit: 782292dfa29fd00e14069b747c20c7d010459abf Parents: 6314fec Author: Timothy Chen <[email protected]> Authored: Mon Nov 2 18:56:04 2015 +0000 Committer: Timothy Chen <[email protected]> Committed: Thu Nov 5 18:16:04 2015 -0800 ---------------------------------------------------------------------- src/launcher/executor.cpp | 147 +++++++++++++++++++++++++++++++++++++++-- src/slave/constants.cpp | 1 + src/slave/constants.hpp | 3 + src/slave/slave.cpp | 51 ++++++++++++-- src/slave/slave.hpp | 2 +- src/tests/slave_tests.cpp | 5 +- 6 files changed, 196 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/launcher/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index 50b3c6e..ee38387 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -52,6 +52,10 @@ #include "common/http.hpp" #include "common/status_utils.hpp" +#ifdef __linux__ +#include "linux/fs.hpp" +#endif + #include "logging/logging.hpp" #include "messages/messages.hpp" @@ -76,26 +80,34 @@ using namespace process; class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess> { public: - CommandExecutorProcess(Option<char**> override, const string& _healthCheckDir) + CommandExecutorProcess( + const Option<char**>& override, + const string& _healthCheckDir, + const Option<string>& _sandboxDirectory, + const Option<string>& _user) : launched(false), killed(false), killedByHealthCheck(false), pid(-1), healthPid(-1), escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT), + executorInfo(None()), driver(None()), healthCheckDir(_healthCheckDir), - override(override) {} + override(override), + sandboxDirectory(_sandboxDirectory), + user(_user) {} virtual ~CommandExecutorProcess() {} void registered( ExecutorDriver* _driver, - const ExecutorInfo& executorInfo, + const ExecutorInfo& _executorInfo, const FrameworkInfo& frameworkInfo, const SlaveInfo& slaveInfo) { cout << "Registered executor on " << slaveInfo.hostname() << endl; + executorInfo = _executorInfo; driver = _driver; } @@ -169,6 +181,65 @@ public: abort(); } + CHECK_SOME(executorInfo); + + Option<string> rootfs; + if (sandboxDirectory.isSome()) { + // If 'sandbox_diretory' is specified, that means the user + // task specifies a root filesystem, and that root filesystem has + // already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH. + // The command executor is reponsible for mounting the sandbox + // into the root filesystem, chrooting into it and changing the + // user before exec-ing the user process. +#ifdef __linux__ + Result<string> user = os::user(); + if (user.isError()) { + cerr << "Failed to get current user: " << user.error() << endl; + abort(); + } else if (user.isNone()) { + cerr << "Current username is not found" << endl; + abort(); + } else if (user.get() != "root") { + cerr << "The command executor requires root with rootfs" << endl; + abort(); + } + + CHECK(executorInfo.get().has_container()); + + rootfs = path::join( + os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH); + + string sandbox = path::join(rootfs.get(), sandboxDirectory.get()); + if (!os::exists(sandbox)) { + Try<Nothing> mkdir = os::mkdir(sandbox); + if (mkdir.isError()) { + cerr << "Failed to create sandbox mount point at '" + << sandbox << "': " << mkdir.error() << endl; + abort(); + } + } + + // Mount the sandbox into the container rootfs. + // NOTE: We don't use MS_REC here because the rootfs is already + // under the sandbox. + Try<Nothing> mount = fs::mount( + os::getcwd(), + sandbox, + None(), + MS_BIND, + NULL); + + if (mount.isError()) { + cerr << "Unable to mount the work directory into container " + << "rootfs: " << mount.error() << endl;; + abort(); + } +#else + cerr << "Not expecting root volume with non-linux platform." << endl; + abort(); +#endif // __linux__ + } + // Prepare the argv before fork as it's not async signal safe. char **argv = new char*[task.command().arguments().size() + 1]; for (int i = 0; i < task.command().arguments().size(); i++) { @@ -231,6 +302,48 @@ public: os::close(pipes[1]); + if (rootfs.isSome()) { +#ifdef __linux__ + if (user.isSome()) { + // This is a work around to fix the problem that after we chroot + // os::su call afterwards failed because the linker may not be + // able to find the necessary library in the rootfs. + // We call os::su before chroot here to force the linker to load + // into memory. + // We also assume it's safe to su to "root" user since + // filesystem/linux.cpp checks for root already. + os::su("root"); + } + + Try<Nothing> chroot = fs::chroot::enter(rootfs.get()); + if (chroot.isError()) { + cerr << "Failed to enter chroot '" << rootfs.get() + << "': " << chroot.error() << endl;; + abort(); + } + + Try<Nothing> chdir = os::chdir(sandboxDirectory.get()); + if (chdir.isError()) { + cerr << "Failed to change directory to sandbox dir '" + << sandboxDirectory.get() << "': " << chdir.error(); + abort(); + } + + if (user.isSome()) { + Try<Nothing> su = os::su(user.get()); + if (su.isError()) { + cerr << "Failed to change user to '" << user.get() << "': " + << su.error() << endl; + abort(); + } + } +#else + cerr << "Rootfs is only supported on Linux" << endl; + abort(); +#endif // __linux__ + } + + cout << command << endl; // The child has successfully setsid, now run the command. @@ -498,18 +611,26 @@ private: pid_t healthPid; Duration escalationTimeout; Timer escalationTimer; + Option<ExecutorInfo> executorInfo; Option<ExecutorDriver*> driver; string healthCheckDir; Option<char**> override; + Option<string> sandboxDirectory; + Option<string> user; }; class CommandExecutor: public Executor { public: - CommandExecutor(Option<char**> override, string healthCheckDir) + CommandExecutor( + const Option<char**>& override, + const string& healthCheckDir, + const Option<string>& sandboxDirectory, + const Option<string>& user) { - process = new CommandExecutorProcess(override, healthCheckDir); + process = new CommandExecutorProcess( + override, healthCheckDir, sandboxDirectory, user); spawn(process); } @@ -595,11 +716,24 @@ public: "subsequent 'argv' to be used with 'execvp'", false); + // The following flags are only applicable when a rootfs is provisioned + // for this command. + add(&sandbox_directory, + "sandbox_directory", + "The absolute path for the directory in the container where the\n" + "sandbox is mapped to"); + + add(&user, + "user", + "The user that the task should be running as."); + // TODO(nnielsen): Add 'prefix' option to enable replacing // 'sh -c' with user specified wrapper. } bool override; + Option<string> sandbox_directory; + Option<string> user; }; @@ -636,7 +770,8 @@ int main(int argc, char** argv) string path = envPath.isSome() ? envPath.get() : os::realpath(Path(argv[0]).dirname()).get(); - mesos::internal::CommandExecutor executor(override, path); + mesos::internal::CommandExecutor executor( + override, path, flags.sandbox_directory, flags.user); mesos::MesosExecutorDriver driver(&executor); return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE; } http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/constants.cpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp index b69471b..9ef5d5d 100644 --- a/src/slave/constants.cpp +++ b/src/slave/constants.cpp @@ -53,6 +53,7 @@ const Duration DOCKER_INSPECT_DELAY = Seconds(1); // TODO(tnachen): Make this a flag. const Duration DOCKER_VERSION_WAIT_TIMEOUT = Seconds(5); const std::string DEFAULT_AUTHENTICATEE = "crammd5"; +const std::string COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH = ".rootfs"; Duration DEFAULT_MASTER_PING_TIMEOUT() { http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index de6b58a..99026f7 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -118,6 +118,9 @@ const int DOCKER_PS_MAX_INSPECT_CALLS = 100; // trigger a re-detection of the master to cause a re-registration. Duration DEFAULT_MASTER_PING_TIMEOUT(); +// Container path that the slave sets to mount the command executor rootfs to. +extern const std::string COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH; + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index ddeece4..9739189 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1351,7 +1351,7 @@ void Slave::runTask( } } - const ExecutorInfo executorInfo = getExecutorInfo(frameworkId, task); + const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task); const ExecutorID& executorId = executorInfo.executor_id(); if (HookManager::hooksAvailable()) { @@ -1410,7 +1410,7 @@ void Slave::_runTask( return; } - const ExecutorInfo executorInfo = getExecutorInfo(frameworkId, task); + const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task); const ExecutorID& executorId = executorInfo.executor_id(); if (framework->pending.contains(executorId) && @@ -3232,7 +3232,7 @@ Executor* Slave::getExecutor( ExecutorInfo Slave::getExecutorInfo( - const FrameworkID& frameworkId, + const FrameworkInfo& frameworkInfo, const TaskInfo& task) { CHECK_NE(task.has_executor(), task.has_command()) @@ -3244,7 +3244,7 @@ ExecutorInfo Slave::getExecutorInfo( // Command executors share the same id as the task. executor.mutable_executor_id()->set_value(task.task_id().value()); - executor.mutable_framework_id()->CopyFrom(frameworkId); + executor.mutable_framework_id()->CopyFrom(frameworkInfo.id()); if (task.has_container() && task.container().type() != ContainerInfo::MESOS) { @@ -3254,6 +3254,22 @@ ExecutorInfo Slave::getExecutorInfo( executor.mutable_container()->CopyFrom(task.container()); } + bool hasRootfs = task.has_container() && + task.container().type() == ContainerInfo::MESOS && + task.container().mesos().has_image(); + + if (hasRootfs) { + ContainerInfo* container = executor.mutable_container(); + container->set_type(ContainerInfo::MESOS); + Volume* volume = container->add_volumes(); + volume->mutable_image()->CopyFrom(task.container().mesos().image()); + volume->set_container_path(COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH); + volume->set_mode(Volume::RW); + // We need to set the executor user as root as it needs to + // perform chroot (even when switch_user is set to false). + executor.mutable_command()->set_user("root"); + } + // Prepare an executor name which includes information on the // command being launched. string name = "(Task: " + task.task_id().value() + ") "; @@ -3307,7 +3323,11 @@ ExecutorInfo Slave::getExecutorInfo( task.command().container()); } - if (task.command().has_user()) { + // We skip setting the user for the command executor that has + // a rootfs image since we need root permissions to chroot. + // We assume command executor will change to the correct user + // later on. + if (!hasRootfs && task.command().has_user()) { executor.mutable_command()->set_user(task.command().user()); } @@ -3320,6 +3340,27 @@ ExecutorInfo Slave::getExecutorInfo( executor.mutable_command()->set_shell(true); if (path.isSome()) { + if (hasRootfs) { + executor.mutable_command()->set_shell(false); + executor.mutable_command()->add_arguments("mesos-executor"); + executor.mutable_command()->add_arguments( + "--sandbox_directory=" + flags.sandbox_directory); + + if (flags.switch_user) { + Option<string> user; + if (task.command().has_user()) { + user = task.command().user(); + } else if (frameworkInfo.has_user()) { + user = frameworkInfo.user(); + } + + if (user.isSome()) { + executor.mutable_command()->add_arguments( + "--user=" + user.get()); + } + } + } + executor.mutable_command()->set_value(path.get()); } else { executor.mutable_command()->set_value( http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index e6fa66b..ec2dfa9 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -312,7 +312,7 @@ public: // Returns an ExecutorInfo for a TaskInfo (possibly // constructing one if the task has a CommandInfo). ExecutorInfo getExecutorInfo( - const FrameworkID& frameworkId, + const FrameworkInfo& frameworkInfo, const TaskInfo& task); // Shuts down the executor if it did not register yet. http://git-wip-us.apache.org/repos/asf/mesos/blob/782292df/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 91dbdba..6b89eaf 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -543,6 +543,9 @@ TEST_F(SlaveTest, GetExecutorInfo) FrameworkID frameworkId; frameworkId.set_value("20141010-221431-251662764-60288-32120-0000"); + FrameworkInfo frameworkInfo; + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + // Launch a task with the command executor. TaskInfo task; task.set_name("task"); @@ -560,7 +563,7 @@ TEST_F(SlaveTest, GetExecutorInfo) task.mutable_command()->MergeFrom(command); - const ExecutorInfo& executor = slave.getExecutorInfo(frameworkId, task); + const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task); // Now assert that it actually is running mesos-executor without any // bleedover from the command we intend on running.
