Repository: mesos
Updated Branches:
  refs/heads/master 1d12aeb39 -> d9dd07960


Allowed slave to checkpoint resources.

Review: https://reviews.apache.org/r/26990


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b0efdfe7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b0efdfe7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b0efdfe7

Branch: refs/heads/master
Commit: b0efdfe74d21d2d37a598a5b06654898288abf21
Parents: 1d12aeb
Author: Jie Yu <[email protected]>
Authored: Mon Oct 20 17:34:22 2014 -0700
Committer: Jie Yu <[email protected]>
Committed: Fri Nov 21 16:46:03 2014 -0800

----------------------------------------------------------------------
 src/slave/paths.hpp                |  12 +++
 src/slave/slave.cpp                |  41 +++++-----
 src/slave/slave.hpp                |   2 +-
 src/slave/state.cpp                | 135 +++++++++++++++++++++++++++-----
 src/slave/state.hpp                |  40 ++++++++--
 src/tests/slave_recovery_tests.cpp |   5 +-
 6 files changed, 186 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index 41bb73d..ee8687d 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -54,6 +54,7 @@ const std::string EXECUTOR_SENTINEL_FILE = 
"executor.sentinel";
 const std::string FORKED_PID_FILE = "forked.pid";
 const std::string TASK_INFO_FILE = "task.info";
 const std::string TASK_UPDATES_FILE = "task.updates";
+const std::string RESOURCES_INFO_FILE = "resources.info";
 
 // Path layout templates.
 const std::string ROOT_PATH = "%s";
@@ -93,6 +94,8 @@ const std::string TASK_INFO_PATH =
   path::join(TASK_PATH, TASK_INFO_FILE);
 const std::string TASK_UPDATES_PATH =
   path::join(TASK_PATH, TASK_UPDATES_FILE);
+const std::string RESOURCES_INFO_PATH =
+  path::join(ROOT_PATH, "resources", RESOURCES_INFO_FILE);
 
 
 inline std::string getMetaRootDir(const std::string rootDir)
@@ -327,6 +330,15 @@ inline std::string getTaskUpdatesPath(
 }
 
 
+inline std::string getResourcesInfoPath(
+    const std::string& rootDir)
+{
+  return strings::format(
+      RESOURCES_INFO_PATH,
+      rootDir).get();
+}
+
+
 inline std::string createExecutorDirectory(
     const std::string& rootDir,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 373c8b4..ed63ded 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3348,53 +3348,54 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
 
 
 
-Future<Nothing> Slave::recover(const Result<SlaveState>& _state)
+Future<Nothing> Slave::recover(const Result<state::State>& state)
 {
-  if (_state.isError()) {
-    return Failure(_state.error());
+  if (state.isError()) {
+    return Failure(state.error());
   }
 
-  // Convert Result<SlaveState> to Option<SlaveState> for convenience.
-  Option<SlaveState> state;
-  if (_state.isSome()) {
-    state = _state.get();
+  Option<SlaveState> slaveState;
+  if (state.isSome()) {
+    slaveState = state.get().slave;
   }
 
-  if (state.isSome() && state.get().info.isSome()) {
+  if (slaveState.isSome() && slaveState.get().info.isSome()) {
     // Check for SlaveInfo compatibility.
     // TODO(vinod): Also check for version compatibility.
-    // NOTE: We set the 'id' field in 'info' from the recovered state,
+    // NOTE: We set the 'id' field in 'info' from the recovered slave,
     // as a hack to compare the info created from options/flags with
     // the recovered info.
-    info.mutable_id()->CopyFrom(state.get().id);
-    if (flags.recover == "reconnect" && !(info == state.get().info.get())) {
+    info.mutable_id()->CopyFrom(slaveState.get().id);
+    if (flags.recover == "reconnect" &&
+        !(info == slaveState.get().info.get())) {
       return Failure(strings::join(
           "\n",
           "Incompatible slave info detected.",
           "------------------------------------------------------------",
-          "Old slave info:\n" + stringify(state.get().info.get()),
+          "Old slave info:\n" + stringify(slaveState.get().info.get()),
           "------------------------------------------------------------",
           "New slave info:\n" + stringify(info),
           "------------------------------------------------------------"));
     }
 
-    info = state.get().info.get(); // Recover the slave info.
+    info = slaveState.get().info.get(); // Recover the slave info.
 
-    recoveryErrors = state.get().errors;
-    metrics.recovery_errors += state.get().errors;
-    if (recoveryErrors > 0) {
-      LOG(WARNING) << "Errors encountered during recovery: " << recoveryErrors;
+    if (slaveState.get().errors > 0) {
+      LOG(WARNING) << "Errors encountered during slave recovery: "
+                   << slaveState.get().errors;
+
+      metrics.recovery_errors += slaveState.get().errors;
     }
 
     // Recover the frameworks.
     foreachvalue (const FrameworkState& frameworkState,
-                  state.get().frameworks) {
+                  slaveState.get().frameworks) {
       recoverFramework(frameworkState);
     }
   }
 
-  return statusUpdateManager->recover(metaDir, state)
-    .then(defer(self(), &Slave::_recoverContainerizer, state));
+  return statusUpdateManager->recover(metaDir, slaveState)
+    .then(defer(self(), &Slave::_recoverContainerizer, slaveState));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c6d11ef..70bd8c1 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -301,7 +301,7 @@ public:
   void checkDiskUsage();
 
   // Recovers the slave, status update manager and isolator.
-  process::Future<Nothing> recover(const Result<state::SlaveState>& state);
+  process::Future<Nothing> recover(const Result<state::State>& state);
 
   // This is called after 'recover()'. If 'flags.reconnect' is
   // 'reconnect', the slave attempts to reconnect to any old live

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 48695da..5e73dbc 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -28,7 +28,7 @@ using std::string;
 using std::max;
 
 
-Result<SlaveState> recover(const string& rootDir, bool strict)
+Result<State> recover(const string& rootDir, bool strict)
 {
   LOG(INFO) << "Recovering state from '" << rootDir << "'";
 
@@ -39,7 +39,19 @@ Result<SlaveState> recover(const string& rootDir, bool 
strict)
     return None();
   }
 
-  // Did the machine reboot?
+  // Now, start to recover state from 'rootDir'.
+  State state;
+
+  // Recover resources regardless whether the host has rebooted.
+  Try<ResourcesState> resources = ResourcesState::recover(rootDir, strict);
+  if (resources.isError()) {
+    return Error(resources.error());
+  }
+
+  state.resources = resources.get();
+
+  // Did the machine reboot? No need to recover slave state if the
+  // machine has rebooted.
   if (os::exists(paths::getBootIdPath(rootDir))) {
     Try<string> read = os::read(paths::getBootIdPath(rootDir));
     if (read.isSome()) {
@@ -48,7 +60,7 @@ Result<SlaveState> recover(const string& rootDir, bool strict)
 
       if (id.get() != strings::trim(read.get())) {
         LOG(INFO) << "Slave host rebooted";
-        return None();
+        return state;
       }
     }
   }
@@ -60,7 +72,7 @@ Result<SlaveState> recover(const string& rootDir, bool strict)
     // The slave was asked to shutdown or died before it registered
     // and had a chance to create the "latest" symlink.
     LOG(INFO) << "Failed to find the latest slave from '" << rootDir << "'";
-    return None();
+    return state;
   }
 
   // Get the latest slave id.
@@ -75,12 +87,14 @@ Result<SlaveState> recover(const string& rootDir, bool 
strict)
   SlaveID slaveId;
   slaveId.set_value(os::basename(directory.get()).get());
 
-  Try<SlaveState> state = SlaveState::recover(rootDir, slaveId, strict);
-  if (state.isError()) {
-    return Error(state.error());
+  Try<SlaveState> slave = SlaveState::recover(rootDir, slaveId, strict);
+  if (slave.isError()) {
+    return Error(slave.error());
   }
 
-  return state.get();
+  state.slave = slave.get();
+
+  return state;
 }
 
 
@@ -95,8 +109,8 @@ Try<SlaveState> SlaveState::recover(
   // Read the slave info.
   const string& path = paths::getSlaveInfoPath(rootDir, slaveId);
   if (!os::exists(path)) {
-    // This could happen if the slave died before it registered
-    // with the master.
+    // This could happen if the slave died before it registered with
+    // the master.
     LOG(WARNING) << "Failed to find slave info file '" << path << "'";
     return state;
   }
@@ -168,8 +182,8 @@ Try<FrameworkState> FrameworkState::recover(
   string path = paths::getFrameworkInfoPath(rootDir, slaveId, frameworkId);
   if (!os::exists(path)) {
     // This could happen if the slave died after creating the
-    // framework directory but before it checkpointed the
-    // framework info.
+    // framework directory but before it checkpointed the framework
+    // info.
     LOG(WARNING) << "Failed to find framework info file '" << path << "'";
     return state;
   }
@@ -556,14 +570,15 @@ Try<TaskState> TaskState::recover(
   path = paths::getTaskUpdatesPath(
       rootDir, slaveId, frameworkId, executorId, containerId, taskId);
   if (!os::exists(path)) {
-    // This could happen if the slave died before it checkpointed
-    // any status updates for this task.
+    // This could happen if the slave died before it checkpointed any
+    // status updates for this task.
     LOG(WARNING) << "Failed to find status updates file '" << path << "'";
     return state;
   }
 
-  // Open the status updates file for reading and writing (for truncating).
-  Try<int> fd = os::open(path, O_RDWR);
+  // Open the status updates file for reading and writing (for
+  // truncating).
+  Try<int> fd = os::open(path, O_RDWR | O_CLOEXEC);
 
   if (fd.isError()) {
     message = "Failed to open status updates file '" + path +
@@ -597,15 +612,16 @@ Try<TaskState> TaskState::recover(
   }
 
   // Always truncate the file to contain only valid updates.
-  // NOTE: This is safe even though we ignore partial protobuf
-  // read errors above, because the 'fd' is properly set to the
-  // end of the last valid update by 'protobuf::read()'.
+  // NOTE: This is safe even though we ignore partial protobuf read
+  // errors above, because the 'fd' is properly set to the end of the
+  // last valid update by 'protobuf::read()'.
   if (ftruncate(fd.get(), lseek(fd.get(), 0, SEEK_CUR)) != 0) {
     return ErrnoError(
         "Failed to truncate status updates file '" + path + "'");
   }
 
-  // After reading a non-corrupted updates file, 'record' should be 'none'.
+  // After reading a non-corrupted updates file, 'record' should be
+  // 'none'.
   if (record.isError()) {
     message = "Failed to read status updates file  '" + path +
               "': " + record.error();
@@ -639,6 +655,85 @@ Try<TaskState> TaskState::recover(
 }
 
 
+Try<ResourcesState> ResourcesState::recover(
+    const std::string& rootDir,
+    bool strict)
+{
+  ResourcesState state;
+
+  const string& path = paths::getResourcesInfoPath(rootDir);
+  if (!os::exists(path)) {
+    LOG(INFO) << "Failed to find resources file '" << path << "'";
+    return state;
+  }
+
+  Try<int> fd = os::open(path, O_RDWR | O_CLOEXEC);
+  if (fd.isError()) {
+    string message =
+      "Failed to open resources file '" + path + "': " + fd.error();
+
+    if (strict) {
+      return Error(message);
+    } else {
+      LOG(WARNING) << message;
+      state.errors++;
+      return state;
+    }
+  }
+
+  Result<Resource> resource = None();
+  while (true) {
+    // Ignore errors due to partial protobuf read and enable undoing
+    // failed reads by reverting to the previous seek position.
+    resource = ::protobuf::read<Resource>(fd.get(), true, true);
+    if (!resource.isSome()) {
+      break;
+    }
+
+    state.resources += resource.get();
+  }
+
+  // Always truncate the file to contain only valid resources.
+  // NOTE: This is safe even though we ignore partial protobuf read
+  // errors above, because the 'fd' is properly set to the end of the
+  // last valid resource by 'protobuf::read()'.
+  if (ftruncate(fd.get(), lseek(fd.get(), 0, SEEK_CUR)) != 0) {
+    return ErrnoError("Failed to truncate resources file '" + path + "'");
+  }
+
+  // After reading a non-corrupted resources file, 'record' should be
+  // 'none'.
+  if (resource.isError()) {
+    string message =
+      "Failed to read resources file  '" + path + "': " + resource.error();
+
+    if (strict) {
+      return Error(message);
+    } else {
+      LOG(WARNING) << message;
+      state.errors++;
+      return state;
+    }
+  }
+
+  Try<Nothing> close = os::close(fd.get());
+  if (close.isError()) {
+    string message =
+      "Failed to close resources file '" + path + "': " + close.error();
+
+    if (strict) {
+      return Error(message);
+    } else {
+      LOG(WARNING) << message;
+      state.errors++;
+      return state;
+    }
+  }
+
+  return state;
+}
+
+
 // Helpers to checkpoint string/protobuf to disk, with necessary error 
checking.
 
 Try<Nothing> checkpoint(

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 22f569d..592182b 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -23,6 +23,8 @@
 
 #include <vector>
 
+#include <mesos/resources.hpp>
+
 #include <process/pid.hpp>
 
 #include <stout/foreach.hpp>
@@ -45,11 +47,13 @@ namespace slave {
 namespace state {
 
 // Forward declarations.
+struct State;
 struct SlaveState;
 struct FrameworkState;
 struct ExecutorState;
 struct RunState;
 struct TaskState;
+struct ResourcesState;
 
 // This function performs recovery from the state stored at 'rootDir'.
 // If the 'strict' flag is set, any errors encountered while
@@ -60,10 +64,10 @@ struct TaskState;
 // recovery continues by recovering as much of the state as possible,
 // while increasing the 'errors' count. Note that 'errors' on a struct
 // includes the 'errors' encountered recursively. In other words,
-// 'SlaveState.errors' is the sum total of all recovery errors.
-// If the machine has rebooted since the last slave run,
-// None is returned.
-Result<SlaveState> recover(const std::string& rootDir, bool strict);
+// 'State.errors' is the sum total of all recovery errors. If the
+// machine has rebooted since the last slave run, None is returned.
+Result<State> recover(const std::string& rootDir, bool strict);
+
 
 // Thin wrappers to checkpoint data to disk and perform the
 // necessary error checking.
@@ -77,8 +81,32 @@ Try<Nothing> checkpoint(
 // Checkpoints a string at the given path.
 Try<Nothing> checkpoint(const std::string& path, const std::string& message);
 
-// Each of the structs below (recursively) recover the checkpointed
-// state.
+
+// The top level state. Each of the structs below (recursively)
+// recover the checkpointed state.
+struct State
+{
+  State() : errors(0) {}
+
+  Option<ResourcesState> resources;
+  Option<SlaveState> slave;
+  unsigned int errors;
+};
+
+
+struct ResourcesState
+{
+  ResourcesState() : errors(0) {}
+
+  static Try<ResourcesState> recover(
+      const std::string& rootDir,
+      bool strict);
+
+  Resources resources;
+  unsigned int errors;
+};
+
+
 struct SlaveState
 {
   SlaveState () : errors(0) {}

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 782f57a..2b6c76a 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -219,12 +219,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
   AWAIT_READY(_ack);
 
   // Recover the state.
-  Result<slave::state::SlaveState> recover = slave::state::recover(
+  Result<slave::state::State> recover = slave::state::recover(
       paths::getMetaRootDir(flags.work_dir), true);
 
   ASSERT_SOME(recover);
+  ASSERT_SOME(recover.get().slave);
 
-  slave::state::SlaveState state = recover.get();
+  slave::state::SlaveState state = recover.get().slave.get();
 
   // Check slave id.
   ASSERT_EQ(slaveId, state.id);

Reply via email to