Updated the agent to generate executor secrets.

This patch updates the agent code to generate executor
authentication tokens when executor authentication is
enabled. For now, the generated `Secret` objects must
be of `VALUE` type, and they're passed directly into the
executor environment.

Review: https://reviews.apache.org/r/57743/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b6ddb5f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b6ddb5f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b6ddb5f

Branch: refs/heads/master
Commit: 8b6ddb5fcae38dcfad27cb5dae26b4054773134f
Parents: 2fc8033
Author: Greg Mann <[email protected]>
Authored: Sat Mar 25 12:04:49 2017 -0700
Committer: Anand Mazumdar <[email protected]>
Committed: Sat Mar 25 12:04:49 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 339 +++++++++++++++++++++++++++++++++--------------
 src/slave/slave.hpp |  19 ++-
 2 files changed, 255 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b6ddb5f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d68d6b9..5729849 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -73,6 +73,7 @@
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 #include "common/status_utils.hpp"
+#include "common/validation.hpp"
 
 #include "credentials/credentials.hpp"
 
@@ -2182,9 +2183,24 @@ void Slave::__run(
   Executor* executor = framework->getExecutor(executorId);
 
   if (executor == nullptr) {
-    executor = framework->launchExecutor(
-        executorInfo,
-        taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
+    executor = framework->addExecutor(executorInfo);
+
+    if (secretGenerator.get()) {
+      generateSecret(framework->id(), executor->id, executor->containerId)
+        .onAny(defer(
+            self(),
+            &Self::launchExecutor,
+            lambda::_1,
+            frameworkId,
+            executorId,
+            taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
+    } else {
+      launchExecutor(
+          None(),
+          frameworkId,
+          executorId,
+          taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
+    }
   }
 
   CHECK_NOTNULL(executor);
@@ -2302,7 +2318,7 @@ void Slave::__run(
   }
 
   // We don't perform the checks for 'removeFramework' here since
-  // we're guaranteed by 'launchExecutor' that 'framework->executors'
+  // we're guaranteed by 'addExecutor' that 'framework->executors'
   // will be non-empty.
   CHECK(!framework->executors.empty());
 }
@@ -2512,6 +2528,213 @@ void Slave::___run(
 }
 
 
+// Generates a secret for executor authentication.
+Future<Secret> Slave::generateSecret(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const ContainerID& containerId)
+{
+  Principal principal(
+      Option<string>::none(),
+      {
+        {"fid", frameworkId.value()},
+        {"eid", executorId.value()},
+        {"cid", containerId.value()}
+      });
+
+  return secretGenerator->generate(principal)
+    .then([](const Secret& secret) -> Future<Secret> {
+      Option<Error> error = common::validation::validateSecret(secret);
+
+      if (error.isSome()) {
+        return Failure(
+            "Failed to validate generated secret: " + error->message);
+      } else if (secret.type() != Secret::VALUE) {
+        return Failure(
+            "Expecting generated secret to be of VALUE type instead of " +
+            stringify(secret.type()) + " type; " +
+            "only VALUE type secrets are supported at this time");
+      }
+
+      return secret;
+    });
+}
+
+
+// Launches an executor which was previously created.
+void Slave::launchExecutor(
+    const Option<Future<Secret>>& future,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Option<TaskInfo>& taskInfo)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework == nullptr) {
+    LOG(WARNING) << "Ignoring launching executor '" << executorId
+                 << "' because the framework " << frameworkId
+                 << " does not exist";
+    return;
+  }
+
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Ignoring launching executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the framework is terminating";
+    return;
+  }
+
+  Executor* executor = framework->getExecutor(executorId);
+  if (executor == nullptr) {
+    LOG(WARNING) << "Ignoring launching executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the executor does not exist";
+    return;
+  }
+
+  if (executor->state == Executor::TERMINATING ||
+      executor->state == Executor::TERMINATED) {
+    string executorState;
+    if (executor->state == Executor::TERMINATING) {
+      executorState = "terminating";
+    } else {
+      executorState = "terminated";
+    }
+
+    LOG(WARNING) << "Ignoring launching executor " << *executor
+                 << " in container " << executor->containerId
+                 << " because the executor is " << executorState;
+
+    // The framework may have shutdown this executor already, transitioning it
+    // to the TERMINATING/TERMINATED state. However, the executor still exists
+    // in the agent's map, so we must send status updates for any queued tasks
+    // and perform cleanup via `executorTerminated`.
+    ContainerTermination termination;
+    termination.set_state(TASK_FAILED);
+    termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
+    termination.set_message("Executor " + executorState);
+
+    executorTerminated(frameworkId, executorId, termination);
+
+    return;
+  }
+
+  CHECK_EQ(Executor::REGISTERING, executor->state);
+
+  Option<Secret> authenticationToken;
+
+  if (future.isSome()) {
+    if (!future->isReady()) {
+      LOG(ERROR) << "Failed to launch executor " << *executor
+                 << " in container " << executor->containerId
+                 << " because secret generation failed: "
+                 << (future->isFailed() ? future->failure() : "discarded");
+
+      ContainerTermination termination;
+      termination.set_state(TASK_FAILED);
+      termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
+      termination.set_message(
+          "Secret generation failed: " +
+          (future->isFailed() ? future->failure() : "discarded"));
+
+      executorTerminated(frameworkId, executorId, termination);
+
+      return;
+    }
+
+    authenticationToken = future->get();
+  }
+
+  // Tell the containerizer to launch the executor.
+  ExecutorInfo executorInfo_ = executor->info;
+
+  // Populate the command info for default executor. We modify the ExecutorInfo
+  // to avoid resetting command info upon re-registering with the master since
+  // the master doesn't store them; they are generated by the slave.
+  if (executorInfo_.has_type() &&
+      executorInfo_.type() == ExecutorInfo::DEFAULT) {
+    CHECK(!executorInfo_.has_command());
+
+    executorInfo_.mutable_command()->CopyFrom(
+        defaultExecutorCommandInfo(flags.launcher_dir, executor->user));
+  }
+
+  Resources resources = executorInfo_.resources();
+
+  // NOTE: We modify the ExecutorInfo to include the task's
+  // resources when launching the executor so that the containerizer
+  // has non-zero resources to work with when the executor has
+  // no resources. This should be revisited after MESOS-600.
+  if (taskInfo.isSome()) {
+    resources += taskInfo->resources();
+  }
+
+  executorInfo_.mutable_resources()->CopyFrom(resources);
+
+  // Prepare environment variables for the executor.
+  map<string, string> environment = executorEnvironment(
+      flags,
+      executorInfo_,
+      executor->directory,
+      info.id(),
+      self(),
+      authenticationToken,
+      framework->info.checkpoint());
+
+  // Launch the container.
+  Future<bool> launch;
+  if (!executor->isCommandExecutor()) {
+    // If the executor is _not_ a command executor, this means that
+    // the task will include the executor to run. The actual task to
+    // run will be enqueued and subsequently handled by the executor
+    // when it has registered to the slave.
+    launch = containerizer->launch(
+        executor->containerId,
+        None(),
+        executorInfo_,
+        executor->directory,
+        executor->user,
+        info.id(),
+        environment,
+        framework->info.checkpoint());
+  } else {
+    // An executor has _not_ been provided by the task and will
+    // instead define a command and/or container to run. Right now,
+    // these tasks will require an executor anyway and the slave
+    // creates a command executor. However, it is up to the
+    // containerizer how to execute those tasks and the generated
+    // executor info works as a placeholder.
+    // TODO(nnielsen): Obsolete the requirement for executors to run
+    // one-off tasks.
+    launch = containerizer->launch(
+        executor->containerId,
+        taskInfo,
+        executorInfo_,
+        executor->directory,
+        executor->user,
+        info.id(),
+        environment,
+        framework->info.checkpoint());
+  }
+
+  launch.onAny(defer(self(),
+                     &Self::executorLaunched,
+                     frameworkId,
+                     executor->id,
+                     executor->containerId,
+                     lambda::_1));
+
+  // Make sure the executor registers within the given timeout.
+  delay(flags.executor_registration_timeout,
+        self(),
+        &Self::registerExecutorTimeout,
+        frameworkId,
+        executor->id,
+        executor->containerId);
+
+  return;
+}
+
+
 void Slave::runTaskGroup(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
@@ -4855,7 +5078,8 @@ void Slave::executorLaunched(
 }
 
 
-// Called by the isolator when an executor process terminates.
+// Called by the isolator when an executor process terminates, and by
+// `Slave::launchExecutor` when executor secret generation fails.
 void Slave::executorTerminated(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
@@ -6592,10 +6816,7 @@ Framework::~Framework()
 }
 
 
-// Create and launch an executor.
-Executor* Framework::launchExecutor(
-    const ExecutorInfo& executorInfo,
-    const Option<TaskInfo>& taskInfo)
+Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
 {
   // Verify that Resource.AllocationInfo is set, if coming
   // from a MULTI_ROLE master this will be set, otherwise
@@ -6605,9 +6826,9 @@ Executor* Framework::launchExecutor(
   }
 
   // Generate an ID for the executor's container.
-  // TODO(idownes) This should be done by the containerizer but we
-  // need the ContainerID to create the executor's directory. Fix
-  // this when 'launchExecutor()' is handled asynchronously.
+  // TODO(idownes) This should be done by the containerizer but we need the
+  // ContainerID to create the executor's directory and generate the secret.
+  // Consider fixing this since 'launchExecutor()' is handled asynchronously.
   ContainerID containerId;
   containerId.set_value(UUID::random().toString());
 
@@ -6680,92 +6901,6 @@ Executor* Framework::launchExecutor(
   slave->files->attach(executor->directory, executor->directory, authorize)
     .onAny(defer(slave, &Slave::fileAttached, lambda::_1, 
executor->directory));
 
-  // Tell the containerizer to launch the executor.
-  ExecutorInfo executorInfo_ = executor->info;
-
-  // Populate the command info for default executor. We modify the ExecutorInfo
-  // to avoid resetting command info upon re-registering with the master since
-  // the master doesn't store them; they are generated by the slave.
-  if (executorInfo_.has_type() &&
-      executorInfo_.type() == ExecutorInfo::DEFAULT) {
-    CHECK(!executorInfo_.has_command());
-
-    executorInfo_.mutable_command()->CopyFrom(
-        defaultExecutorCommandInfo(slave->flags.launcher_dir, user));
-  }
-
-  Resources resources = executorInfo_.resources();
-
-  // NOTE: We modify the ExecutorInfo to include the task's
-  // resources when launching the executor so that the containerizer
-  // has non-zero resources to work with when the executor has
-  // no resources. This should be revisited after MESOS-600.
-  if (taskInfo.isSome()) {
-    resources += taskInfo->resources();
-  }
-
-  executorInfo_.mutable_resources()->CopyFrom(resources);
-
-  // Prepare environment variables for the executor.
-  map<string, string> environment = executorEnvironment(
-      slave->flags,
-      executorInfo_,
-      executor->directory,
-      slave->info.id(),
-      slave->self(),
-      info.checkpoint());
-
-  // Launch the container.
-  Future<bool> launch;
-  if (!executor->isCommandExecutor()) {
-    // If the executor is _not_ a command executor, this means that
-    // the task will include the executor to run. The actual task to
-    // run will be enqueued and subsequently handled by the executor
-    // when it has registered to the slave.
-    launch = slave->containerizer->launch(
-        containerId,
-        None(),
-        executorInfo_,
-        executor->directory,
-        user,
-        slave->info.id(),
-        environment,
-        info.checkpoint());
-  } else {
-    // An executor has _not_ been provided by the task and will
-    // instead define a command and/or container to run. Right now,
-    // these tasks will require an executor anyway and the slave
-    // creates a command executor. However, it is up to the
-    // containerizer how to execute those tasks and the generated
-    // executor info works as a placeholder.
-    // TODO(nnielsen): Obsolete the requirement for executors to run
-    // one-off tasks.
-    launch = slave->containerizer->launch(
-        containerId,
-        taskInfo,
-        executorInfo_,
-        executor->directory,
-        user,
-        slave->info.id(),
-        environment,
-        info.checkpoint());
-  }
-
-  launch.onAny(defer(slave,
-                     &Slave::executorLaunched,
-                     id(),
-                     executor->id,
-                     containerId,
-                     lambda::_1));
-
-  // Make sure the executor registers within the given timeout.
-  delay(slave->flags.executor_registration_timeout,
-        slave,
-        &Slave::registerExecutorTimeout,
-        id(),
-        executor->id,
-        containerId);
-
   return executor;
 }
 
@@ -7342,6 +7477,7 @@ map<string, string> executorEnvironment(
     const string& directory,
     const SlaveID& slaveId,
     const PID<Slave>& slavePid,
+    const Option<Secret>& authenticationToken,
     bool checkpoint)
 {
   map<string, string> environment;
@@ -7434,6 +7570,13 @@ map<string, string> executorEnvironment(
       stringify(EXECUTOR_REREGISTER_TIMEOUT);
   }
 
+  if (authenticationToken.isSome()) {
+    CHECK(authenticationToken->has_value());
+
+    environment["MESOS_EXECUTOR_AUTHENTICATION_TOKEN"] =
+      authenticationToken->value().data();
+  }
+
   if (HookManager::hooksAvailable()) {
     // Include any environment variables from Hooks.
     // TODO(karya): Call environment decorator hook _after_ putting all

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b6ddb5f/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index e06525b..59efa4e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -357,6 +357,18 @@ public:
       const std::list<TaskInfo>& tasks,
       const std::list<TaskGroupInfo>& taskGroups);
 
+  process::Future<Secret> generateSecret(
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const ContainerID& containerId);
+
+  // If an executor is launched for a task group, `taskInfo` would not be set.
+  void launchExecutor(
+      const Option<process::Future<Secret>>& future,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const Option<TaskInfo>& taskInfo);
+
   void fileAttached(const process::Future<Nothing>& result,
                     const std::string& path);
 
@@ -1103,11 +1115,7 @@ struct Framework
 
   ~Framework();
 
-  // If an executor is launched for a task group, `taskInfo` would
-  // not be set.
-  Executor* launchExecutor(
-      const ExecutorInfo& executorInfo,
-      const Option<TaskInfo>& taskInfo);
+  Executor* addExecutor(const ExecutorInfo& executorInfo);
   void destroyExecutor(const ExecutorID& executorId);
   Executor* getExecutor(const ExecutorID& executorId) const;
   Executor* getExecutor(const TaskID& taskId) const;
@@ -1206,6 +1214,7 @@ std::map<std::string, std::string> executorEnvironment(
     const std::string& directory,
     const SlaveID& slaveId,
     const process::PID<Slave>& slavePid,
+    const Option<Secret>& authenticationToken,
     bool checkpoint);
 
 

Reply via email to