Repository: mesos Updated Branches: refs/heads/master 1d12aeb39 -> d9dd07960
Allowed slave to checkpoint resources. Review: https://reviews.apache.org/r/26990 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b0efdfe7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b0efdfe7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b0efdfe7 Branch: refs/heads/master Commit: b0efdfe74d21d2d37a598a5b06654898288abf21 Parents: 1d12aeb Author: Jie Yu <[email protected]> Authored: Mon Oct 20 17:34:22 2014 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Nov 21 16:46:03 2014 -0800 ---------------------------------------------------------------------- src/slave/paths.hpp | 12 +++ src/slave/slave.cpp | 41 +++++----- src/slave/slave.hpp | 2 +- src/slave/state.cpp | 135 +++++++++++++++++++++++++++----- src/slave/state.hpp | 40 ++++++++-- src/tests/slave_recovery_tests.cpp | 5 +- 6 files changed, 186 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/paths.hpp ---------------------------------------------------------------------- diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp index 41bb73d..ee8687d 100644 --- a/src/slave/paths.hpp +++ b/src/slave/paths.hpp @@ -54,6 +54,7 @@ const std::string EXECUTOR_SENTINEL_FILE = "executor.sentinel"; const std::string FORKED_PID_FILE = "forked.pid"; const std::string TASK_INFO_FILE = "task.info"; const std::string TASK_UPDATES_FILE = "task.updates"; +const std::string RESOURCES_INFO_FILE = "resources.info"; // Path layout templates. const std::string ROOT_PATH = "%s"; @@ -93,6 +94,8 @@ const std::string TASK_INFO_PATH = path::join(TASK_PATH, TASK_INFO_FILE); const std::string TASK_UPDATES_PATH = path::join(TASK_PATH, TASK_UPDATES_FILE); +const std::string RESOURCES_INFO_PATH = + path::join(ROOT_PATH, "resources", RESOURCES_INFO_FILE); inline std::string getMetaRootDir(const std::string rootDir) @@ -327,6 +330,15 @@ inline std::string getTaskUpdatesPath( } +inline std::string getResourcesInfoPath( + const std::string& rootDir) +{ + return strings::format( + RESOURCES_INFO_PATH, + rootDir).get(); +} + + inline std::string createExecutorDirectory( const std::string& rootDir, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 373c8b4..ed63ded 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3348,53 +3348,54 @@ void Slave::_checkDiskUsage(const Future<double>& usage) -Future<Nothing> Slave::recover(const Result<SlaveState>& _state) +Future<Nothing> Slave::recover(const Result<state::State>& state) { - if (_state.isError()) { - return Failure(_state.error()); + if (state.isError()) { + return Failure(state.error()); } - // Convert Result<SlaveState> to Option<SlaveState> for convenience. - Option<SlaveState> state; - if (_state.isSome()) { - state = _state.get(); + Option<SlaveState> slaveState; + if (state.isSome()) { + slaveState = state.get().slave; } - if (state.isSome() && state.get().info.isSome()) { + if (slaveState.isSome() && slaveState.get().info.isSome()) { // Check for SlaveInfo compatibility. // TODO(vinod): Also check for version compatibility. - // NOTE: We set the 'id' field in 'info' from the recovered state, + // NOTE: We set the 'id' field in 'info' from the recovered slave, // as a hack to compare the info created from options/flags with // the recovered info. - info.mutable_id()->CopyFrom(state.get().id); - if (flags.recover == "reconnect" && !(info == state.get().info.get())) { + info.mutable_id()->CopyFrom(slaveState.get().id); + if (flags.recover == "reconnect" && + !(info == slaveState.get().info.get())) { return Failure(strings::join( "\n", "Incompatible slave info detected.", "------------------------------------------------------------", - "Old slave info:\n" + stringify(state.get().info.get()), + "Old slave info:\n" + stringify(slaveState.get().info.get()), "------------------------------------------------------------", "New slave info:\n" + stringify(info), "------------------------------------------------------------")); } - info = state.get().info.get(); // Recover the slave info. + info = slaveState.get().info.get(); // Recover the slave info. - recoveryErrors = state.get().errors; - metrics.recovery_errors += state.get().errors; - if (recoveryErrors > 0) { - LOG(WARNING) << "Errors encountered during recovery: " << recoveryErrors; + if (slaveState.get().errors > 0) { + LOG(WARNING) << "Errors encountered during slave recovery: " + << slaveState.get().errors; + + metrics.recovery_errors += slaveState.get().errors; } // Recover the frameworks. foreachvalue (const FrameworkState& frameworkState, - state.get().frameworks) { + slaveState.get().frameworks) { recoverFramework(frameworkState); } } - return statusUpdateManager->recover(metaDir, state) - .then(defer(self(), &Slave::_recoverContainerizer, state)); + return statusUpdateManager->recover(metaDir, slaveState) + .then(defer(self(), &Slave::_recoverContainerizer, slaveState)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index c6d11ef..70bd8c1 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -301,7 +301,7 @@ public: void checkDiskUsage(); // Recovers the slave, status update manager and isolator. - process::Future<Nothing> recover(const Result<state::SlaveState>& state); + process::Future<Nothing> recover(const Result<state::State>& state); // This is called after 'recover()'. If 'flags.reconnect' is // 'reconnect', the slave attempts to reconnect to any old live http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/state.cpp ---------------------------------------------------------------------- diff --git a/src/slave/state.cpp b/src/slave/state.cpp index 48695da..5e73dbc 100644 --- a/src/slave/state.cpp +++ b/src/slave/state.cpp @@ -28,7 +28,7 @@ using std::string; using std::max; -Result<SlaveState> recover(const string& rootDir, bool strict) +Result<State> recover(const string& rootDir, bool strict) { LOG(INFO) << "Recovering state from '" << rootDir << "'"; @@ -39,7 +39,19 @@ Result<SlaveState> recover(const string& rootDir, bool strict) return None(); } - // Did the machine reboot? + // Now, start to recover state from 'rootDir'. + State state; + + // Recover resources regardless whether the host has rebooted. + Try<ResourcesState> resources = ResourcesState::recover(rootDir, strict); + if (resources.isError()) { + return Error(resources.error()); + } + + state.resources = resources.get(); + + // Did the machine reboot? No need to recover slave state if the + // machine has rebooted. if (os::exists(paths::getBootIdPath(rootDir))) { Try<string> read = os::read(paths::getBootIdPath(rootDir)); if (read.isSome()) { @@ -48,7 +60,7 @@ Result<SlaveState> recover(const string& rootDir, bool strict) if (id.get() != strings::trim(read.get())) { LOG(INFO) << "Slave host rebooted"; - return None(); + return state; } } } @@ -60,7 +72,7 @@ Result<SlaveState> recover(const string& rootDir, bool strict) // The slave was asked to shutdown or died before it registered // and had a chance to create the "latest" symlink. LOG(INFO) << "Failed to find the latest slave from '" << rootDir << "'"; - return None(); + return state; } // Get the latest slave id. @@ -75,12 +87,14 @@ Result<SlaveState> recover(const string& rootDir, bool strict) SlaveID slaveId; slaveId.set_value(os::basename(directory.get()).get()); - Try<SlaveState> state = SlaveState::recover(rootDir, slaveId, strict); - if (state.isError()) { - return Error(state.error()); + Try<SlaveState> slave = SlaveState::recover(rootDir, slaveId, strict); + if (slave.isError()) { + return Error(slave.error()); } - return state.get(); + state.slave = slave.get(); + + return state; } @@ -95,8 +109,8 @@ Try<SlaveState> SlaveState::recover( // Read the slave info. const string& path = paths::getSlaveInfoPath(rootDir, slaveId); if (!os::exists(path)) { - // This could happen if the slave died before it registered - // with the master. + // This could happen if the slave died before it registered with + // the master. LOG(WARNING) << "Failed to find slave info file '" << path << "'"; return state; } @@ -168,8 +182,8 @@ Try<FrameworkState> FrameworkState::recover( string path = paths::getFrameworkInfoPath(rootDir, slaveId, frameworkId); if (!os::exists(path)) { // This could happen if the slave died after creating the - // framework directory but before it checkpointed the - // framework info. + // framework directory but before it checkpointed the framework + // info. LOG(WARNING) << "Failed to find framework info file '" << path << "'"; return state; } @@ -556,14 +570,15 @@ Try<TaskState> TaskState::recover( path = paths::getTaskUpdatesPath( rootDir, slaveId, frameworkId, executorId, containerId, taskId); if (!os::exists(path)) { - // This could happen if the slave died before it checkpointed - // any status updates for this task. + // This could happen if the slave died before it checkpointed any + // status updates for this task. LOG(WARNING) << "Failed to find status updates file '" << path << "'"; return state; } - // Open the status updates file for reading and writing (for truncating). - Try<int> fd = os::open(path, O_RDWR); + // Open the status updates file for reading and writing (for + // truncating). + Try<int> fd = os::open(path, O_RDWR | O_CLOEXEC); if (fd.isError()) { message = "Failed to open status updates file '" + path + @@ -597,15 +612,16 @@ Try<TaskState> TaskState::recover( } // Always truncate the file to contain only valid updates. - // NOTE: This is safe even though we ignore partial protobuf - // read errors above, because the 'fd' is properly set to the - // end of the last valid update by 'protobuf::read()'. + // NOTE: This is safe even though we ignore partial protobuf read + // errors above, because the 'fd' is properly set to the end of the + // last valid update by 'protobuf::read()'. if (ftruncate(fd.get(), lseek(fd.get(), 0, SEEK_CUR)) != 0) { return ErrnoError( "Failed to truncate status updates file '" + path + "'"); } - // After reading a non-corrupted updates file, 'record' should be 'none'. + // After reading a non-corrupted updates file, 'record' should be + // 'none'. if (record.isError()) { message = "Failed to read status updates file '" + path + "': " + record.error(); @@ -639,6 +655,85 @@ Try<TaskState> TaskState::recover( } +Try<ResourcesState> ResourcesState::recover( + const std::string& rootDir, + bool strict) +{ + ResourcesState state; + + const string& path = paths::getResourcesInfoPath(rootDir); + if (!os::exists(path)) { + LOG(INFO) << "Failed to find resources file '" << path << "'"; + return state; + } + + Try<int> fd = os::open(path, O_RDWR | O_CLOEXEC); + if (fd.isError()) { + string message = + "Failed to open resources file '" + path + "': " + fd.error(); + + if (strict) { + return Error(message); + } else { + LOG(WARNING) << message; + state.errors++; + return state; + } + } + + Result<Resource> resource = None(); + while (true) { + // Ignore errors due to partial protobuf read and enable undoing + // failed reads by reverting to the previous seek position. + resource = ::protobuf::read<Resource>(fd.get(), true, true); + if (!resource.isSome()) { + break; + } + + state.resources += resource.get(); + } + + // Always truncate the file to contain only valid resources. + // NOTE: This is safe even though we ignore partial protobuf read + // errors above, because the 'fd' is properly set to the end of the + // last valid resource by 'protobuf::read()'. + if (ftruncate(fd.get(), lseek(fd.get(), 0, SEEK_CUR)) != 0) { + return ErrnoError("Failed to truncate resources file '" + path + "'"); + } + + // After reading a non-corrupted resources file, 'record' should be + // 'none'. + if (resource.isError()) { + string message = + "Failed to read resources file '" + path + "': " + resource.error(); + + if (strict) { + return Error(message); + } else { + LOG(WARNING) << message; + state.errors++; + return state; + } + } + + Try<Nothing> close = os::close(fd.get()); + if (close.isError()) { + string message = + "Failed to close resources file '" + path + "': " + close.error(); + + if (strict) { + return Error(message); + } else { + LOG(WARNING) << message; + state.errors++; + return state; + } + } + + return state; +} + + // Helpers to checkpoint string/protobuf to disk, with necessary error checking. Try<Nothing> checkpoint( http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/slave/state.hpp ---------------------------------------------------------------------- diff --git a/src/slave/state.hpp b/src/slave/state.hpp index 22f569d..592182b 100644 --- a/src/slave/state.hpp +++ b/src/slave/state.hpp @@ -23,6 +23,8 @@ #include <vector> +#include <mesos/resources.hpp> + #include <process/pid.hpp> #include <stout/foreach.hpp> @@ -45,11 +47,13 @@ namespace slave { namespace state { // Forward declarations. +struct State; struct SlaveState; struct FrameworkState; struct ExecutorState; struct RunState; struct TaskState; +struct ResourcesState; // This function performs recovery from the state stored at 'rootDir'. // If the 'strict' flag is set, any errors encountered while @@ -60,10 +64,10 @@ struct TaskState; // recovery continues by recovering as much of the state as possible, // while increasing the 'errors' count. Note that 'errors' on a struct // includes the 'errors' encountered recursively. In other words, -// 'SlaveState.errors' is the sum total of all recovery errors. -// If the machine has rebooted since the last slave run, -// None is returned. -Result<SlaveState> recover(const std::string& rootDir, bool strict); +// 'State.errors' is the sum total of all recovery errors. If the +// machine has rebooted since the last slave run, None is returned. +Result<State> recover(const std::string& rootDir, bool strict); + // Thin wrappers to checkpoint data to disk and perform the // necessary error checking. @@ -77,8 +81,32 @@ Try<Nothing> checkpoint( // Checkpoints a string at the given path. Try<Nothing> checkpoint(const std::string& path, const std::string& message); -// Each of the structs below (recursively) recover the checkpointed -// state. + +// The top level state. Each of the structs below (recursively) +// recover the checkpointed state. +struct State +{ + State() : errors(0) {} + + Option<ResourcesState> resources; + Option<SlaveState> slave; + unsigned int errors; +}; + + +struct ResourcesState +{ + ResourcesState() : errors(0) {} + + static Try<ResourcesState> recover( + const std::string& rootDir, + bool strict); + + Resources resources; + unsigned int errors; +}; + + struct SlaveState { SlaveState () : errors(0) {} http://git-wip-us.apache.org/repos/asf/mesos/blob/b0efdfe7/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 782f57a..2b6c76a 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -219,12 +219,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) AWAIT_READY(_ack); // Recover the state. - Result<slave::state::SlaveState> recover = slave::state::recover( + Result<slave::state::State> recover = slave::state::recover( paths::getMetaRootDir(flags.work_dir), true); ASSERT_SOME(recover); + ASSERT_SOME(recover.get().slave); - slave::state::SlaveState state = recover.get(); + slave::state::SlaveState state = recover.get().slave.get(); // Check slave id. ASSERT_EQ(slaveId, state.id);
