Updated the agent to generate executor secrets. This patch updates the agent code to generate executor authentication tokens when executor authentication is enabled. For now, the generated `Secret` objects must be of `VALUE` type, and they're passed directly into the executor environment.
Review: https://reviews.apache.org/r/57743/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b6ddb5f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b6ddb5f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b6ddb5f Branch: refs/heads/master Commit: 8b6ddb5fcae38dcfad27cb5dae26b4054773134f Parents: 2fc8033 Author: Greg Mann <[email protected]> Authored: Sat Mar 25 12:04:49 2017 -0700 Committer: Anand Mazumdar <[email protected]> Committed: Sat Mar 25 12:04:49 2017 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 339 +++++++++++++++++++++++++++++++++-------------- src/slave/slave.hpp | 19 ++- 2 files changed, 255 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8b6ddb5f/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index d68d6b9..5729849 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -73,6 +73,7 @@ #include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" #include "common/status_utils.hpp" +#include "common/validation.hpp" #include "credentials/credentials.hpp" @@ -2182,9 +2183,24 @@ void Slave::__run( Executor* executor = framework->getExecutor(executorId); if (executor == nullptr) { - executor = framework->launchExecutor( - executorInfo, - taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()); + executor = framework->addExecutor(executorInfo); + + if (secretGenerator.get()) { + generateSecret(framework->id(), executor->id, executor->containerId) + .onAny(defer( + self(), + &Self::launchExecutor, + lambda::_1, + frameworkId, + executorId, + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none())); + } else { + launchExecutor( + None(), + frameworkId, + executorId, + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()); + } } CHECK_NOTNULL(executor); @@ -2302,7 +2318,7 @@ void Slave::__run( } // We don't perform the checks for 'removeFramework' here since - // we're guaranteed by 'launchExecutor' that 'framework->executors' + // we're guaranteed by 'addExecutor' that 'framework->executors' // will be non-empty. CHECK(!framework->executors.empty()); } @@ -2512,6 +2528,213 @@ void Slave::___run( } +// Generates a secret for executor authentication. +Future<Secret> Slave::generateSecret( + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const ContainerID& containerId) +{ + Principal principal( + Option<string>::none(), + { + {"fid", frameworkId.value()}, + {"eid", executorId.value()}, + {"cid", containerId.value()} + }); + + return secretGenerator->generate(principal) + .then([](const Secret& secret) -> Future<Secret> { + Option<Error> error = common::validation::validateSecret(secret); + + if (error.isSome()) { + return Failure( + "Failed to validate generated secret: " + error->message); + } else if (secret.type() != Secret::VALUE) { + return Failure( + "Expecting generated secret to be of VALUE type instead of " + + stringify(secret.type()) + " type; " + + "only VALUE type secrets are supported at this time"); + } + + return secret; + }); +} + + +// Launches an executor which was previously created. +void Slave::launchExecutor( + const Option<Future<Secret>>& future, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const Option<TaskInfo>& taskInfo) +{ + Framework* framework = getFramework(frameworkId); + if (framework == nullptr) { + LOG(WARNING) << "Ignoring launching executor '" << executorId + << "' because the framework " << frameworkId + << " does not exist"; + return; + } + + if (framework->state == Framework::TERMINATING) { + LOG(WARNING) << "Ignoring launching executor '" << executorId + << "' of framework " << frameworkId + << " because the framework is terminating"; + return; + } + + Executor* executor = framework->getExecutor(executorId); + if (executor == nullptr) { + LOG(WARNING) << "Ignoring launching executor '" << executorId + << "' of framework " << frameworkId + << " because the executor does not exist"; + return; + } + + if (executor->state == Executor::TERMINATING || + executor->state == Executor::TERMINATED) { + string executorState; + if (executor->state == Executor::TERMINATING) { + executorState = "terminating"; + } else { + executorState = "terminated"; + } + + LOG(WARNING) << "Ignoring launching executor " << *executor + << " in container " << executor->containerId + << " because the executor is " << executorState; + + // The framework may have shutdown this executor already, transitioning it + // to the TERMINATING/TERMINATED state. However, the executor still exists + // in the agent's map, so we must send status updates for any queued tasks + // and perform cleanup via `executorTerminated`. + ContainerTermination termination; + termination.set_state(TASK_FAILED); + termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); + termination.set_message("Executor " + executorState); + + executorTerminated(frameworkId, executorId, termination); + + return; + } + + CHECK_EQ(Executor::REGISTERING, executor->state); + + Option<Secret> authenticationToken; + + if (future.isSome()) { + if (!future->isReady()) { + LOG(ERROR) << "Failed to launch executor " << *executor + << " in container " << executor->containerId + << " because secret generation failed: " + << (future->isFailed() ? future->failure() : "discarded"); + + ContainerTermination termination; + termination.set_state(TASK_FAILED); + termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); + termination.set_message( + "Secret generation failed: " + + (future->isFailed() ? future->failure() : "discarded")); + + executorTerminated(frameworkId, executorId, termination); + + return; + } + + authenticationToken = future->get(); + } + + // Tell the containerizer to launch the executor. + ExecutorInfo executorInfo_ = executor->info; + + // Populate the command info for default executor. We modify the ExecutorInfo + // to avoid resetting command info upon re-registering with the master since + // the master doesn't store them; they are generated by the slave. + if (executorInfo_.has_type() && + executorInfo_.type() == ExecutorInfo::DEFAULT) { + CHECK(!executorInfo_.has_command()); + + executorInfo_.mutable_command()->CopyFrom( + defaultExecutorCommandInfo(flags.launcher_dir, executor->user)); + } + + Resources resources = executorInfo_.resources(); + + // NOTE: We modify the ExecutorInfo to include the task's + // resources when launching the executor so that the containerizer + // has non-zero resources to work with when the executor has + // no resources. This should be revisited after MESOS-600. + if (taskInfo.isSome()) { + resources += taskInfo->resources(); + } + + executorInfo_.mutable_resources()->CopyFrom(resources); + + // Prepare environment variables for the executor. + map<string, string> environment = executorEnvironment( + flags, + executorInfo_, + executor->directory, + info.id(), + self(), + authenticationToken, + framework->info.checkpoint()); + + // Launch the container. + Future<bool> launch; + if (!executor->isCommandExecutor()) { + // If the executor is _not_ a command executor, this means that + // the task will include the executor to run. The actual task to + // run will be enqueued and subsequently handled by the executor + // when it has registered to the slave. + launch = containerizer->launch( + executor->containerId, + None(), + executorInfo_, + executor->directory, + executor->user, + info.id(), + environment, + framework->info.checkpoint()); + } else { + // An executor has _not_ been provided by the task and will + // instead define a command and/or container to run. Right now, + // these tasks will require an executor anyway and the slave + // creates a command executor. However, it is up to the + // containerizer how to execute those tasks and the generated + // executor info works as a placeholder. + // TODO(nnielsen): Obsolete the requirement for executors to run + // one-off tasks. + launch = containerizer->launch( + executor->containerId, + taskInfo, + executorInfo_, + executor->directory, + executor->user, + info.id(), + environment, + framework->info.checkpoint()); + } + + launch.onAny(defer(self(), + &Self::executorLaunched, + frameworkId, + executor->id, + executor->containerId, + lambda::_1)); + + // Make sure the executor registers within the given timeout. + delay(flags.executor_registration_timeout, + self(), + &Self::registerExecutorTimeout, + frameworkId, + executor->id, + executor->containerId); + + return; +} + + void Slave::runTaskGroup( const UPID& from, const FrameworkInfo& frameworkInfo, @@ -4855,7 +5078,8 @@ void Slave::executorLaunched( } -// Called by the isolator when an executor process terminates. +// Called by the isolator when an executor process terminates, and by +// `Slave::launchExecutor` when executor secret generation fails. void Slave::executorTerminated( const FrameworkID& frameworkId, const ExecutorID& executorId, @@ -6592,10 +6816,7 @@ Framework::~Framework() } -// Create and launch an executor. -Executor* Framework::launchExecutor( - const ExecutorInfo& executorInfo, - const Option<TaskInfo>& taskInfo) +Executor* Framework::addExecutor(const ExecutorInfo& executorInfo) { // Verify that Resource.AllocationInfo is set, if coming // from a MULTI_ROLE master this will be set, otherwise @@ -6605,9 +6826,9 @@ Executor* Framework::launchExecutor( } // Generate an ID for the executor's container. - // TODO(idownes) This should be done by the containerizer but we - // need the ContainerID to create the executor's directory. Fix - // this when 'launchExecutor()' is handled asynchronously. + // TODO(idownes) This should be done by the containerizer but we need the + // ContainerID to create the executor's directory and generate the secret. + // Consider fixing this since 'launchExecutor()' is handled asynchronously. ContainerID containerId; containerId.set_value(UUID::random().toString()); @@ -6680,92 +6901,6 @@ Executor* Framework::launchExecutor( slave->files->attach(executor->directory, executor->directory, authorize) .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory)); - // Tell the containerizer to launch the executor. - ExecutorInfo executorInfo_ = executor->info; - - // Populate the command info for default executor. We modify the ExecutorInfo - // to avoid resetting command info upon re-registering with the master since - // the master doesn't store them; they are generated by the slave. - if (executorInfo_.has_type() && - executorInfo_.type() == ExecutorInfo::DEFAULT) { - CHECK(!executorInfo_.has_command()); - - executorInfo_.mutable_command()->CopyFrom( - defaultExecutorCommandInfo(slave->flags.launcher_dir, user)); - } - - Resources resources = executorInfo_.resources(); - - // NOTE: We modify the ExecutorInfo to include the task's - // resources when launching the executor so that the containerizer - // has non-zero resources to work with when the executor has - // no resources. This should be revisited after MESOS-600. - if (taskInfo.isSome()) { - resources += taskInfo->resources(); - } - - executorInfo_.mutable_resources()->CopyFrom(resources); - - // Prepare environment variables for the executor. - map<string, string> environment = executorEnvironment( - slave->flags, - executorInfo_, - executor->directory, - slave->info.id(), - slave->self(), - info.checkpoint()); - - // Launch the container. - Future<bool> launch; - if (!executor->isCommandExecutor()) { - // If the executor is _not_ a command executor, this means that - // the task will include the executor to run. The actual task to - // run will be enqueued and subsequently handled by the executor - // when it has registered to the slave. - launch = slave->containerizer->launch( - containerId, - None(), - executorInfo_, - executor->directory, - user, - slave->info.id(), - environment, - info.checkpoint()); - } else { - // An executor has _not_ been provided by the task and will - // instead define a command and/or container to run. Right now, - // these tasks will require an executor anyway and the slave - // creates a command executor. However, it is up to the - // containerizer how to execute those tasks and the generated - // executor info works as a placeholder. - // TODO(nnielsen): Obsolete the requirement for executors to run - // one-off tasks. - launch = slave->containerizer->launch( - containerId, - taskInfo, - executorInfo_, - executor->directory, - user, - slave->info.id(), - environment, - info.checkpoint()); - } - - launch.onAny(defer(slave, - &Slave::executorLaunched, - id(), - executor->id, - containerId, - lambda::_1)); - - // Make sure the executor registers within the given timeout. - delay(slave->flags.executor_registration_timeout, - slave, - &Slave::registerExecutorTimeout, - id(), - executor->id, - containerId); - return executor; } @@ -7342,6 +7477,7 @@ map<string, string> executorEnvironment( const string& directory, const SlaveID& slaveId, const PID<Slave>& slavePid, + const Option<Secret>& authenticationToken, bool checkpoint) { map<string, string> environment; @@ -7434,6 +7570,13 @@ map<string, string> executorEnvironment( stringify(EXECUTOR_REREGISTER_TIMEOUT); } + if (authenticationToken.isSome()) { + CHECK(authenticationToken->has_value()); + + environment["MESOS_EXECUTOR_AUTHENTICATION_TOKEN"] = + authenticationToken->value().data(); + } + if (HookManager::hooksAvailable()) { // Include any environment variables from Hooks. // TODO(karya): Call environment decorator hook _after_ putting all http://git-wip-us.apache.org/repos/asf/mesos/blob/8b6ddb5f/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index e06525b..59efa4e 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -357,6 +357,18 @@ public: const std::list<TaskInfo>& tasks, const std::list<TaskGroupInfo>& taskGroups); + process::Future<Secret> generateSecret( + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const ContainerID& containerId); + + // If an executor is launched for a task group, `taskInfo` would not be set. + void launchExecutor( + const Option<process::Future<Secret>>& future, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const Option<TaskInfo>& taskInfo); + void fileAttached(const process::Future<Nothing>& result, const std::string& path); @@ -1103,11 +1115,7 @@ struct Framework ~Framework(); - // If an executor is launched for a task group, `taskInfo` would - // not be set. - Executor* launchExecutor( - const ExecutorInfo& executorInfo, - const Option<TaskInfo>& taskInfo); + Executor* addExecutor(const ExecutorInfo& executorInfo); void destroyExecutor(const ExecutorID& executorId); Executor* getExecutor(const ExecutorID& executorId) const; Executor* getExecutor(const TaskID& taskId) const; @@ -1206,6 +1214,7 @@ std::map<std::string, std::string> executorEnvironment( const std::string& directory, const SlaveID& slaveId, const process::PID<Slave>& slavePid, + const Option<Secret>& authenticationToken, bool checkpoint);
