Repository: mesos
Updated Branches:
  refs/heads/master ab6f0d2c8 -> 32b85a2b0


Fixed detaching task volume directories of destroyed frameworks.

Review: https://reviews.apache.org/r/65231/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/32b85a2b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/32b85a2b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/32b85a2b

Branch: refs/heads/master
Commit: 32b85a2b06f676b68a16deaa8359ae64a1e8ead9
Parents: ab6f0d2
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
Authored: Fri Jan 19 11:08:49 2018 +0800
Committer: Qian Zhang <zhq527...@gmail.com>
Committed: Fri Jan 19 11:08:49 2018 +0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 258 +++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |  52 +++++-----
 2 files changed, 176 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/32b85a2b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1672c06..43c7955 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1021,6 +1021,97 @@ Nothing Slave::detachFile(const string& path)
 }
 
 
+void Slave::attachTaskVolumeDirectory(
+    const ExecutorInfo& executorInfo,
+    const ContainerID& executorContainerId,
+    const Task& task)
+{
+  CHECK(executorInfo.has_type() &&
+        executorInfo.type() == ExecutorInfo::DEFAULT);
+
+  CHECK_EQ(task.executor_id(), executorInfo.executor_id());
+
+  foreach (const Resource& resource, task.resources()) {
+    // Ignore if there are no disk resources or if the
+    // disk resources did not specify a volume mapping.
+    if (!resource.has_disk() || !resource.disk().has_volume()) {
+      continue;
+    }
+
+    const Volume& volume = resource.disk().volume();
+
+    const string executorDirectory = paths::getExecutorRunPath(
+        flags.work_dir,
+        info.id(),
+        task.framework_id(),
+        task.executor_id(),
+        executorContainerId);
+
+    const string executorVolumePath =
+      path::join(executorDirectory, volume.container_path());
+
+    const string taskPath = paths::getTaskPath(
+        flags.work_dir,
+        info.id(),
+        task.framework_id(),
+        task.executor_id(),
+        executorContainerId,
+        task.task_id());
+
+    const string taskVolumePath =
+      path::join(taskPath, volume.container_path());
+
+    files->attach(executorVolumePath, taskVolumePath)
+      .onAny(defer(
+          self(),
+          &Self::fileAttached,
+          lambda::_1,
+          executorVolumePath,
+          taskVolumePath));
+  }
+}
+
+
+void Slave::detachTaskVolumeDirectories(
+    const ExecutorInfo& executorInfo,
+    const ContainerID& executorContainerId,
+    const vector<Task>& tasks)
+{
+  // NOTE: If the executor is not a default executor, this function will
+  // still be called but with an empty list of tasks.
+  CHECK(tasks.empty() ||
+        (executorInfo.has_type() &&
+         executorInfo.type() == ExecutorInfo::DEFAULT));
+
+  foreach (const Task& task, tasks) {
+    CHECK_EQ(task.executor_id(), executorInfo.executor_id());
+
+    foreach (const Resource& resource, task.resources()) {
+      // Ignore if there are no disk resources or if the
+      // disk resources did not specify a volume mapping.
+      if (!resource.has_disk() || !resource.disk().has_volume()) {
+        continue;
+      }
+
+      const Volume& volume = resource.disk().volume();
+
+      const string taskPath = paths::getTaskPath(
+          flags.work_dir,
+          info.id(),
+          task.framework_id(),
+          task.executor_id(),
+          executorContainerId,
+          task.task_id());
+
+      const string taskVolumePath =
+        path::join(taskPath, volume.container_path());
+
+      files->detach(taskVolumePath);
+    }
+  }
+}
+
+
 void Slave::detected(const Future<Option<MasterInfo>>& _master)
 {
   CHECK(state == DISCONNECTED ||
@@ -5870,26 +5961,34 @@ void Slave::removeExecutor(Framework* framework, 
Executor* executor)
       executor->id,
       executor->containerId);
 
-  os::utime(path); // Update the modification time.
-  garbageCollect(path)
-    .onAny(defer(self(), [=](const Future<Nothing>& future) {
-      detachFile(path);
+  // NOTE: We keep a list of default executor tasks here to for
+  // detaching task volume directories, since the executor may be
+  // already destroyed when the GC completes (MESOS-8460).
+  vector<Task> defaultExecutorTasks;
+  if (executor->info.has_type() &&
+      executor->info.type() == ExecutorInfo::DEFAULT) {
+    foreachvalue (const Task* task, executor->launchedTasks) {
+      defaultExecutorTasks.push_back(*task);
+    }
 
-      if (executor->info.has_type() &&
-          executor->info.type() == ExecutorInfo::DEFAULT) {
-        foreachvalue (const Task* task, executor->launchedTasks) {
-          executor->detachTaskVolumeDirectory(*task);
-        }
+    foreachvalue (const Task* task, executor->terminatedTasks) {
+      defaultExecutorTasks.push_back(*task);
+    }
 
-        foreachvalue (const Task* task, executor->terminatedTasks) {
-          executor->detachTaskVolumeDirectory(*task);
-        }
+    foreach (const shared_ptr<Task>& task, executor->completedTasks) {
+      defaultExecutorTasks.push_back(*task);
+    }
+  }
 
-        foreach (const shared_ptr<Task>& task, executor->completedTasks) {
-          executor->detachTaskVolumeDirectory(*task);
-        }
-      }
-    }));
+  os::utime(path); // Update the modification time.
+  garbageCollect(path)
+    .onAny(defer(self(), &Self::detachFile, path))
+    .onAny(defer(
+        self(),
+        &Self::detachTaskVolumeDirectories,
+        executor->info,
+        executor->containerId,
+        defaultExecutorTasks));
 
   // Schedule the top level executor work directory, only if the
   // framework doesn't have any 'pending' tasks for this executor.
@@ -5913,10 +6012,8 @@ void Slave::removeExecutor(Framework* framework, 
Executor* executor)
 
     os::utime(path); // Update the modification time.
     garbageCollect(path)
-      .onAny(defer(self(), [=](const Future<Nothing>& future) {
-        detachFile(latestPath);
-        detachFile(virtualLatestPath);
-      }));
+      .onAny(defer(self(), &Self::detachFile, latestPath))
+      .onAny(defer(self(), &Self::detachFile, virtualLatestPath));
   }
 
   if (executor->checkpoint) {
@@ -8629,25 +8726,33 @@ void Framework::recoverExecutor(
     const string path = paths::getExecutorRunPath(
         slave->flags.work_dir, slave->info.id(), id(), state.id, runId);
 
-    slave->garbageCollect(path)
-      .onAny(defer(slave->self(), [=](const Future<Nothing>& future) {
-        slave->detachFile(path);
+    // NOTE: We keep a list of default executor tasks here to for
+    // detaching task volume directories, since the executor may be
+    // already destroyed when the GC completes (MESOS-8460).
+    vector<Task> defaultExecutorTasks;
+    if (executor->info.has_type() &&
+        executor->info.type() == ExecutorInfo::DEFAULT) {
+      foreachvalue (const Task* task, executor->launchedTasks) {
+        defaultExecutorTasks.push_back(*task);
+      }
 
-        if (executor->info.has_type() &&
-            executor->info.type() == ExecutorInfo::DEFAULT) {
-          foreachvalue (const Task* task, executor->launchedTasks) {
-            executor->detachTaskVolumeDirectory(*task);
-          }
+      foreachvalue (const Task* task, executor->terminatedTasks) {
+        defaultExecutorTasks.push_back(*task);
+      }
 
-          foreachvalue (const Task* task, executor->terminatedTasks) {
-            executor->detachTaskVolumeDirectory(*task);
-          }
+      foreach (const shared_ptr<Task>& task, executor->completedTasks) {
+        defaultExecutorTasks.push_back(*task);
+      }
+    }
 
-          foreach (const shared_ptr<Task>& task, executor->completedTasks) {
-            executor->detachTaskVolumeDirectory(*task);
-          }
-        }
-      }));
+    slave->garbageCollect(path)
+      .onAny(defer(slave, &Slave::detachFile, path))
+      .onAny(defer(
+          slave,
+          &Slave::detachTaskVolumeDirectories,
+          executor->info,
+          executor->containerId,
+          defaultExecutorTasks));
 
     // GC the executor run's meta directory.
     slave->garbageCollect(paths::getExecutorRunPath(
@@ -8656,10 +8761,8 @@ void Framework::recoverExecutor(
     // GC the top level executor work directory.
     slave->garbageCollect(paths::getExecutorPath(
         slave->flags.work_dir, slave->info.id(), id(), state.id))
-        .onAny(defer(slave->self(), [=](const Future<Nothing>& future) {
-          slave->detachFile(latestPath);
-          slave->detachFile(virtualLatestPath);
-        }));
+        .onAny(defer(slave, &Slave::detachFile, latestPath))
+        .onAny(defer(slave, &Slave::detachFile, virtualLatestPath));
 
     // GC the top level executor meta directory.
     slave->garbageCollect(paths::getExecutorPath(
@@ -8954,7 +9057,7 @@ Task* Executor::addLaunchedTask(const TaskInfo& task)
   launchedTasks[task.task_id()] = t;
 
   if (info.has_type() && info.type() == ExecutorInfo::DEFAULT) {
-    attachTaskVolumeDirectory(*t);
+    slave->attachTaskVolumeDirectory(info, containerId, *t);
   }
 
   return t;
@@ -8976,7 +9079,7 @@ void Executor::completeTask(const TaskID& taskId)
       info.type() == ExecutorInfo::DEFAULT &&
       completedTasks.full()) {
     const shared_ptr<Task>& firstTask = completedTasks.front();
-    detachTaskVolumeDirectory(*firstTask);
+    slave->detachTaskVolumeDirectories(info, containerId, {*firstTask});
   }
 
   Task* task = terminatedTasks[taskId];
@@ -9050,7 +9153,7 @@ void Executor::recoverTask(const TaskState& state, bool 
recheckpointTask)
   launchedTasks[state.id] = task;
 
   if (info.has_type() && info.type() == ExecutorInfo::DEFAULT) {
-    attachTaskVolumeDirectory(*task);
+    slave->attachTaskVolumeDirectory(info, containerId, *task);
   }
 
   // Read updates to get the latest state of the task.
@@ -9158,73 +9261,6 @@ bool Executor::incompleteTasks()
 }
 
 
-void Executor::attachTaskVolumeDirectory(const Task& task)
-{
-  CHECK(info.has_type() && info.type() == ExecutorInfo::DEFAULT);
-
-  foreach (const Resource& resource, task.resources()) {
-    // Ignore if there are no disk resources or if the
-    // disk resources did not specify a volume mapping.
-    if (!resource.has_disk() || !resource.disk().has_volume()) {
-      continue;
-    }
-
-    const Volume& volume = resource.disk().volume();
-
-    const string executorVolumePath =
-      path::join(directory, volume.container_path());
-
-    const string taskPath = paths::getTaskPath(
-        slave->flags.work_dir,
-        slave->info.id(),
-        frameworkId,
-        id,
-        containerId,
-        task.task_id());
-
-    const string taskVolumePath =
-      path::join(taskPath, volume.container_path());
-
-    slave->files->attach(executorVolumePath, taskVolumePath)
-      .onAny(defer(
-          slave,
-          &Slave::fileAttached,
-          lambda::_1,
-          executorVolumePath,
-          taskVolumePath));
-  }
-}
-
-
-void Executor::detachTaskVolumeDirectory(const Task& task)
-{
-  CHECK(info.has_type() && info.type() == ExecutorInfo::DEFAULT);
-
-  foreach (const Resource& resource, task.resources()) {
-    // Ignore if there are no disk resources or if the
-    // disk resources did not specify a volume mapping.
-    if (!resource.has_disk() || !resource.disk().has_volume()) {
-      continue;
-    }
-
-    const Volume& volume = resource.disk().volume();
-
-    const string taskPath = paths::getTaskPath(
-        slave->flags.work_dir,
-        slave->info.id(),
-        frameworkId,
-        id,
-        containerId,
-        task.task_id());
-
-    const string taskVolumePath =
-      path::join(taskPath, volume.container_path());
-
-    slave->files->detach(taskVolumePath);
-  }
-}
-
-
 bool Executor::isGeneratedForCommandTask() const
 {
   return isGeneratedForCommandTask_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/32b85a2b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a07f046..09c01eb 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -409,6 +409,35 @@ public:
 
   Nothing detachFile(const std::string& path);
 
+  // TODO(qianzhang): This is a workaround to make the default executor
+  // task's volume directory visible in MESOS UI. In MESOS-7225, we made
+  // sure a task can access any volumes specified in its disk resources
+  // from its sandbox by introducing a workaround to the default executor,
+  // i.e., adding a `SANDBOX_PATH` volume with type `PARENT` to the
+  // corresponding nested container. This volume gets translated into a
+  // bind mount in the nested container's mount namespace, which is is not
+  // visible in Mesos UI because it operates in the host namespace. See
+  // Mesos-8279 for details.
+  //
+  // To make the task's volume directory visible in Mesos UI, here we
+  // attach the executor's volume directory to it, so when users browse
+  // task's volume directory in Mesos UI, what they actually browse is the
+  // executor's volume directory. Note when calling `Files::attach()`, the
+  // third argument `authorized` is not specified because it is already
+  // specified when we do the attach for the executor's sandbox and it also
+  // applies to the executor's tasks.
+  void attachTaskVolumeDirectory(
+      const ExecutorInfo& executorInfo,
+      const ContainerID& executorContainerId,
+      const Task& task);
+
+  // TODO(qianzhang): Remove the task's volume directory from the /files
+  // endpoint. This is a workaround for MESOS-8279.
+  void detachTaskVolumeDirectories(
+      const ExecutorInfo& executorInfo,
+      const ContainerID& executorContainerId,
+      const std::vector<Task>& tasks);
+
   // Triggers a re-detection of the master when the slave does
   // not receive a ping.
   void pingTimeout(process::Future<Option<MasterInfo>> future);
@@ -833,29 +862,6 @@ public:
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();
 
-  // TODO(qianzhang): This is a workaround to make the default executor
-  // task's volume directory visible in MESOS UI. In MESOS-7225, we made
-  // sure a task can access any volumes specified in its disk resources
-  // from its sandbox by introducing a workaround to the default executor,
-  // i.e., adding a `SANDBOX_PATH` volume with type `PARENT` to the
-  // corresponding nested container. This volume gets translated into a
-  // bind mount in the nested container's mount namespace, which is is not
-  // visible in Mesos UI because it operates in the host namespace. See
-  // Mesos-8279 for details.
-  //
-  // To make the task's volume directory visible in Mesos UI, here we
-  // attach the executor's volume directory to it, so when users browse
-  // task's volume directory in Mesos UI, what they actually browse is the
-  // executor's volume directory. Note when calling `Files::attach()`, the
-  // third argument `authorized` is not specified because it is already
-  // specified when we do the attach for the executor's sandbox and it also
-  // applies to the executor's tasks.
-  void attachTaskVolumeDirectory(const Task& task);
-
-  // TODO(qianzhang): Remove the task's volume directory from the /files
-  // endpoint. This is a workaround for MESOS-8279.
-  void detachTaskVolumeDirectory(const Task& task);
-
   // Sends a message to the connected executor.
   template <typename Message>
   void send(const Message& message)

Reply via email to