Added the "task" prefix to the name of the status update manager files.
Review: https://reviews.apache.org/r/63853/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62d11733 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62d11733 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62d11733 Branch: refs/heads/master Commit: 62d11733446cf54beb02295e7cd94d419541f152 Parents: 2377cd8 Author: Gaston Kleiman <gas...@mesosphere.io> Authored: Mon Nov 20 16:43:12 2017 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Mon Nov 20 16:53:05 2017 -0800 ---------------------------------------------------------------------- src/CMakeLists.txt | 2 +- src/Makefile.am | 6 +- src/local/local.cpp | 2 +- src/slave/main.cpp | 2 +- src/slave/slave.cpp | 2 +- src/slave/status_update_manager.cpp | 897 ------------------- src/slave/status_update_manager.hpp | 210 ----- src/slave/task_status_update_manager.cpp | 898 ++++++++++++++++++++ src/slave/task_status_update_manager.hpp | 210 +++++ src/tests/CMakeLists.txt | 2 +- src/tests/cluster.cpp | 2 +- src/tests/cluster.hpp | 2 +- src/tests/mock_slave.cpp | 2 +- src/tests/status_update_manager_tests.cpp | 852 ------------------- src/tests/task_status_update_manager_tests.cpp | 852 +++++++++++++++++++ 15 files changed, 1971 insertions(+), 1970 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4f11418..be212d9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -257,7 +257,7 @@ set(AGENT_SRC slave/resource_estimator.cpp slave/slave.cpp slave/state.cpp - slave/status_update_manager.cpp + slave/task_status_update_manager.cpp slave/validation.cpp slave/container_loggers/sandbox.cpp slave/containerizer/composing.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 49dec55..9641ad4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1022,7 +1022,7 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/resource_estimator.cpp \ slave/slave.cpp \ slave/state.cpp \ - slave/status_update_manager.cpp \ + slave/task_status_update_manager.cpp \ slave/validation.cpp \ slave/container_loggers/sandbox.cpp \ slave/containerizer/composing.cpp \ @@ -1167,7 +1167,7 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/posix_signalhandler.hpp \ slave/slave.hpp \ slave/state.hpp \ - slave/status_update_manager.hpp \ + slave/task_status_update_manager.hpp \ slave/validation.hpp \ slave/windows_ctrlhandler.hpp \ slave/container_loggers/sandbox.hpp \ @@ -2469,7 +2469,7 @@ mesos_tests_SOURCES = \ tests/slave_tests.cpp \ tests/sorter_tests.cpp \ tests/state_tests.cpp \ - tests/status_update_manager_tests.cpp \ + tests/task_status_update_manager_tests.cpp \ tests/teardown_tests.cpp \ tests/upgrade_tests.cpp \ tests/uri_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/local/local.cpp ---------------------------------------------------------------------- diff --git a/src/local/local.cpp b/src/local/local.cpp index 63d9822..2e141c6 100644 --- a/src/local/local.cpp +++ b/src/local/local.cpp @@ -75,7 +75,7 @@ #include "slave/gc.hpp" #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/containerizer/fetcher.hpp" http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/main.cpp ---------------------------------------------------------------------- diff --git a/src/slave/main.cpp b/src/slave/main.cpp index de87553..f0716fb 100644 --- a/src/slave/main.cpp +++ b/src/slave/main.cpp @@ -69,7 +69,7 @@ #include "slave/gc.hpp" #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #include "version/version.hpp" http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index c3a4088..6e9adc6 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -98,7 +98,7 @@ #include "slave/flags.hpp" #include "slave/paths.hpp" #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #ifdef __WINDOWS__ // Used to install a Windows console ctrl handler. http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp deleted file mode 100644 index fed0903..0000000 --- a/src/slave/status_update_manager.cpp +++ /dev/null @@ -1,897 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <process/delay.hpp> -#include <process/id.hpp> -#include <process/process.hpp> -#include <process/timer.hpp> - -#include <stout/check.hpp> -#include <stout/foreach.hpp> -#include <stout/hashmap.hpp> -#include <stout/hashset.hpp> -#include <stout/lambda.hpp> -#include <stout/option.hpp> -#include <stout/os.hpp> -#include <stout/path.hpp> -#include <stout/protobuf.hpp> -#include <stout/stringify.hpp> -#include <stout/utils.hpp> -#include <stout/uuid.hpp> - -#include "common/protobuf_utils.hpp" - -#include "logging/logging.hpp" - -#include "slave/constants.hpp" -#include "slave/flags.hpp" -#include "slave/slave.hpp" -#include "slave/state.hpp" -#include "slave/status_update_manager.hpp" - -using lambda::function; - -using std::string; - -using process::wait; // Necessary on some OS's to disambiguate. -using process::Failure; -using process::Future; -using process::PID; -using process::Timeout; -using process::UPID; - -namespace mesos { -namespace internal { -namespace slave { - -using state::SlaveState; -using state::FrameworkState; -using state::ExecutorState; -using state::RunState; -using state::TaskState; - - -class TaskStatusUpdateManagerProcess - : public ProtobufProcess<TaskStatusUpdateManagerProcess> -{ -public: - TaskStatusUpdateManagerProcess(const Flags& flags); - virtual ~TaskStatusUpdateManagerProcess(); - - // Explicitly use 'initialize' since we're overloading below. - using process::ProcessBase::initialize; - - // TaskStatusUpdateManager implementation. - void initialize(const function<void(StatusUpdate)>& forward); - - Future<Nothing> update( - const StatusUpdate& update, - const SlaveID& slaveId, - const ExecutorID& executorId, - const ContainerID& containerId); - - Future<Nothing> update( - const StatusUpdate& update, - const SlaveID& slaveId); - - Future<bool> acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid); - - Future<Nothing> recover( - const string& rootDir, - const Option<SlaveState>& state); - - void pause(); - void resume(); - - void cleanup(const FrameworkID& frameworkId); - -private: - // Helper function to handle update. - Future<Nothing> _update( - const StatusUpdate& update, - const SlaveID& slaveId, - bool checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId); - - // Status update timeout. - void timeout(const Duration& duration); - - // Forwards the status update to the master and starts a timer based - // on the 'duration' to check for ACK from the scheduler. - // NOTE: This should only be used for those messages that expect an - // ACK (e.g updates from the executor). - Timeout forward(const StatusUpdate& update, const Duration& duration); - - // Helper functions. - - // Creates a new status update stream (opening the updates file, if path is - // present) and adds it to streams. - TaskStatusUpdateStream* createStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId, - const SlaveID& slaveId, - bool checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId); - - TaskStatusUpdateStream* getStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId); - - void cleanupStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId); - - const Flags flags; - bool paused; - - function<void(StatusUpdate)> forward_; - - hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams; -}; - - -TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess( - const Flags& _flags) - : ProcessBase(process::ID::generate("task-status-update-manager")), - flags(_flags), - paused(false) -{ -} - - -TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess() -{ - foreachkey (const FrameworkID& frameworkId, streams) { - foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { - delete stream; - } - } - streams.clear(); -} - - -void TaskStatusUpdateManagerProcess::initialize( - const function<void(StatusUpdate)>& forward) -{ - forward_ = forward; -} - - -void TaskStatusUpdateManagerProcess::pause() -{ - LOG(INFO) << "Pausing sending task status updates"; - paused = true; -} - - -void TaskStatusUpdateManagerProcess::resume() -{ - LOG(INFO) << "Resuming sending task status updates"; - paused = false; - - foreachkey (const FrameworkID& frameworkId, streams) { - foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { - if (!stream->pending.empty()) { - const StatusUpdate& update = stream->pending.front(); - LOG(WARNING) << "Resending task status update " << update; - stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN); - } - } - } -} - - -Future<Nothing> TaskStatusUpdateManagerProcess::recover( - const string& rootDir, - const Option<SlaveState>& state) -{ - LOG(INFO) << "Recovering task status update manager"; - - if (state.isNone()) { - return Nothing(); - } - - foreachvalue (const FrameworkState& framework, state->frameworks) { - foreachvalue (const ExecutorState& executor, framework.executors) { - LOG(INFO) << "Recovering executor '" << executor.id - << "' of framework " << framework.id; - - if (executor.info.isNone()) { - LOG(WARNING) << "Skipping recovering task status updates of" - << " executor '" << executor.id - << "' of framework " << framework.id - << " because its info cannot be recovered"; - continue; - } - - if (executor.latest.isNone()) { - LOG(WARNING) << "Skipping recovering task status updates of" - << " executor '" << executor.id - << "' of framework " << framework.id - << " because its latest run cannot be recovered"; - continue; - } - - // We are only interested in the latest run of the executor! - const ContainerID& latest = executor.latest.get(); - Option<RunState> run = executor.runs.get(latest); - CHECK_SOME(run); - - if (run->completed) { - VLOG(1) << "Skipping recovering task status updates of" - << " executor '" << executor.id - << "' of framework " << framework.id - << " because its latest run " << latest.value() - << " is completed"; - continue; - } - - foreachvalue (const TaskState& task, run->tasks) { - // No updates were ever received for this task! - // This means either: - // 1) the executor never received this task or - // 2) executor launched it but the slave died before it got any updates. - if (task.updates.empty()) { - LOG(WARNING) << "No status updates found for task " << task.id - << " of framework " << framework.id; - continue; - } - - // Create a new status update stream. - TaskStatusUpdateStream* stream = createStatusUpdateStream( - task.id, framework.id, state->id, true, executor.id, latest); - - // Replay the stream. - Try<Nothing> replay = stream->replay(task.updates, task.acks); - if (replay.isError()) { - return Failure( - "Failed to replay status updates for task " + stringify(task.id) + - " of framework " + stringify(framework.id) + - ": " + replay.error()); - } - - // At the end of the replay, the stream is either terminated or - // contains only unacknowledged, if any, pending updates. The - // pending updates will be flushed after the slave - // re-registers with the master. - if (stream->terminated) { - cleanupStatusUpdateStream(task.id, framework.id); - } - } - } - } - - return Nothing(); -} - - -void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId) -{ - LOG(INFO) << "Closing task status update streams for framework " - << frameworkId; - - if (streams.contains(frameworkId)) { - foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) { - cleanupStatusUpdateStream(taskId, frameworkId); - } - } -} - - -Future<Nothing> TaskStatusUpdateManagerProcess::update( - const StatusUpdate& update, - const SlaveID& slaveId, - const ExecutorID& executorId, - const ContainerID& containerId) -{ - return _update(update, slaveId, true, executorId, containerId); -} - - -Future<Nothing> TaskStatusUpdateManagerProcess::update( - const StatusUpdate& update, - const SlaveID& slaveId) -{ - return _update(update, slaveId, false, None(), None()); -} - - -Future<Nothing> TaskStatusUpdateManagerProcess::_update( - const StatusUpdate& update, - const SlaveID& slaveId, - bool checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId) -{ - const TaskID& taskId = update.status().task_id(); - const FrameworkID& frameworkId = update.framework_id(); - - LOG(INFO) << "Received task status update " << update; - - // Write the status update to disk and enqueue it to send it to the master. - // Create/Get the status update stream for this task. - TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId); - if (stream == nullptr) { - stream = createStatusUpdateStream( - taskId, frameworkId, slaveId, checkpoint, executorId, containerId); - } - - // Verify that we didn't get a non-checkpointable update for a - // stream that is checkpointable, and vice-versa. - if (stream->checkpoint != checkpoint) { - return Failure( - "Mismatched checkpoint value for task status update " + - stringify(update) + " (expected checkpoint=" + - stringify(stream->checkpoint) + " actual checkpoint=" + - stringify(checkpoint) + ")"); - } - - // Handle the status update. - Try<bool> result = stream->update(update); - if (result.isError()) { - return Failure(result.error()); - } - - // We don't return a failed future here so that the slave can re-ack - // the duplicate update. - if (!result.get()) { - return Nothing(); - } - - // Forward the status update to the master if this is the first in the stream. - // Subsequent status updates will get sent in 'acknowledgement()'. - if (!paused && stream->pending.size() == 1) { - CHECK_NONE(stream->timeout); - const Result<StatusUpdate>& next = stream->next(); - if (next.isError()) { - return Failure(next.error()); - } - - CHECK_SOME(next); - stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); - } - - return Nothing(); -} - - -Timeout TaskStatusUpdateManagerProcess::forward( - const StatusUpdate& update, - const Duration& duration) -{ - CHECK(!paused); - - VLOG(1) << "Forwarding task status update " << update << " to the agent"; - - // Forward the update. - forward_(update); - - // Send a message to self to resend after some delay if no ACK is received. - return delay(duration, - self(), - &TaskStatusUpdateManagerProcess::timeout, - duration).timeout(); -} - - -Future<bool> TaskStatusUpdateManagerProcess::acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid) -{ - LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid - << ") for task " << taskId - << " of framework " << frameworkId; - - TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId); - - // This might happen if we haven't completed recovery yet or if the - // acknowledgement is for a stream that has been cleaned up. - if (stream == nullptr) { - return Failure( - "Cannot find the task status update stream for task " + - stringify(taskId) + " of framework " + stringify(frameworkId)); - } - - // Get the corresponding update for this ACK. - const Result<StatusUpdate>& update = stream->next(); - if (update.isError()) { - return Failure(update.error()); - } - - // This might happen if we retried a status update and got back - // acknowledgments for both the original and the retried update. - if (update.isNone()) { - return Failure( - "Unexpected task status update acknowledgment (UUID: " + - uuid.toString() + ") for task " + stringify(taskId) + " of framework " + - stringify(frameworkId)); - } - - // Handle the acknowledgement. - Try<bool> result = - stream->acknowledgement(taskId, frameworkId, uuid, update.get()); - - if (result.isError()) { - return Failure(result.error()); - } - - if (!result.get()) { - return Failure("Duplicate task status acknowledgement"); - } - - // Reset the timeout. - stream->timeout = None(); - - // Get the next update in the queue. - const Result<StatusUpdate>& next = stream->next(); - if (next.isError()) { - return Failure(next.error()); - } - - bool terminated = stream->terminated; - - if (terminated) { - if (next.isSome()) { - LOG(WARNING) << "Acknowledged a terminal" - << " task status update " << update.get() - << " but updates are still pending"; - } - cleanupStatusUpdateStream(taskId, frameworkId); - } else if (!paused && next.isSome()) { - // Forward the next queued status update. - stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); - } - - return !terminated; -} - - -// TODO(vinod): There should be a limit on the retries. -void TaskStatusUpdateManagerProcess::timeout(const Duration& duration) -{ - if (paused) { - return; - } - - // Check and see if we should resend any status updates. - foreachkey (const FrameworkID& frameworkId, streams) { - foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { - CHECK_NOTNULL(stream); - if (!stream->pending.empty()) { - CHECK_SOME(stream->timeout); - if (stream->timeout->expired()) { - const StatusUpdate& update = stream->pending.front(); - LOG(WARNING) << "Resending task status update " << update; - - // Bounded exponential backoff. - Duration duration_ = - std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX); - - stream->timeout = forward(update, duration_); - } - } - } - } -} - - -TaskStatusUpdateStream* -TaskStatusUpdateManagerProcess::createStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId, - const SlaveID& slaveId, - bool checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId) -{ - VLOG(1) << "Creating StatusUpdate stream for task " << taskId - << " of framework " << frameworkId; - - TaskStatusUpdateStream* stream = new TaskStatusUpdateStream( - taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId); - - streams[frameworkId][taskId] = stream; - return stream; -} - - -TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId) -{ - if (!streams.contains(frameworkId)) { - return nullptr; - } - - if (!streams[frameworkId].contains(taskId)) { - return nullptr; - } - - return streams[frameworkId][taskId]; -} - - -void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream( - const TaskID& taskId, - const FrameworkID& frameworkId) -{ - VLOG(1) << "Cleaning up status update stream" - << " for task " << taskId - << " of framework " << frameworkId; - - CHECK(streams.contains(frameworkId)) - << "Cannot find the task status update streams for framework " - << frameworkId; - - CHECK(streams[frameworkId].contains(taskId)) - << "Cannot find the status update streams for task " << taskId; - - TaskStatusUpdateStream* stream = streams[frameworkId][taskId]; - - streams[frameworkId].erase(taskId); - if (streams[frameworkId].empty()) { - streams.erase(frameworkId); - } - - delete stream; -} - - -TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags) -{ - process = new TaskStatusUpdateManagerProcess(flags); - spawn(process); -} - - -TaskStatusUpdateManager::~TaskStatusUpdateManager() -{ - terminate(process); - wait(process); - delete process; -} - - -void TaskStatusUpdateManager::initialize( - const function<void(StatusUpdate)>& forward) -{ - dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward); -} - - -Future<Nothing> TaskStatusUpdateManager::update( - const StatusUpdate& update, - const SlaveID& slaveId, - const ExecutorID& executorId, - const ContainerID& containerId) -{ - return dispatch( - process, - &TaskStatusUpdateManagerProcess::update, - update, - slaveId, - executorId, - containerId); -} - - -Future<Nothing> TaskStatusUpdateManager::update( - const StatusUpdate& update, - const SlaveID& slaveId) -{ - return dispatch( - process, - &TaskStatusUpdateManagerProcess::update, - update, - slaveId); -} - - -Future<bool> TaskStatusUpdateManager::acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid) -{ - return dispatch( - process, - &TaskStatusUpdateManagerProcess::acknowledgement, - taskId, - frameworkId, - uuid); -} - - -Future<Nothing> TaskStatusUpdateManager::recover( - const string& rootDir, - const Option<SlaveState>& state) -{ - return dispatch( - process, &TaskStatusUpdateManagerProcess::recover, rootDir, state); -} - - -void TaskStatusUpdateManager::pause() -{ - dispatch(process, &TaskStatusUpdateManagerProcess::pause); -} - - -void TaskStatusUpdateManager::resume() -{ - dispatch(process, &TaskStatusUpdateManagerProcess::resume); -} - - -void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId) -{ - dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId); -} - - -TaskStatusUpdateStream::TaskStatusUpdateStream( - const TaskID& _taskId, - const FrameworkID& _frameworkId, - const SlaveID& _slaveId, - const Flags& _flags, - bool _checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId) - : checkpoint(_checkpoint), - terminated(false), - taskId(_taskId), - frameworkId(_frameworkId), - slaveId(_slaveId), - flags(_flags), - error(None()) -{ - if (checkpoint) { - CHECK_SOME(executorId); - CHECK_SOME(containerId); - - path = paths::getTaskUpdatesPath( - paths::getMetaRootDir(flags.work_dir), - slaveId, - frameworkId, - executorId.get(), - containerId.get(), - taskId); - - // Create the base updates directory, if it doesn't exist. - const string& dirName = Path(path.get()).dirname(); - Try<Nothing> directory = os::mkdir(dirName); - if (directory.isError()) { - error = "Failed to create '" + dirName + "': " + directory.error(); - return; - } - - // Open the updates file. - // NOTE: We don't use `O_SYNC` here because we only read this file - // if the host did not crash. `os::write` success implies the kernel - // will have flushed our data to the page cache. This is sufficient - // for the recovery scenarios we use this data for. - Try<int_fd> result = os::open( - path.get(), - O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - - if (result.isError()) { - error = "Failed to open '" + path.get() + - "' for status updates: " + result.error(); - return; - } - - // We keep the file open through the lifetime of the task, because it - // makes it easy to append status update records to the file. - fd = result.get(); - } -} - - -TaskStatusUpdateStream::~TaskStatusUpdateStream() -{ - if (fd.isSome()) { - Try<Nothing> close = os::close(fd.get()); - if (close.isError()) { - CHECK_SOME(path); - LOG(ERROR) << "Failed to close file '" << path.get() << "': " - << close.error(); - } - } -} - - -Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update) -{ - if (error.isSome()) { - return Error(error.get()); - } - - if (!update.has_uuid()) { - return Error("Task status update is missing 'uuid'"); - } - - // Check that this status update has not already been acknowledged. - // This could happen in the rare case when the slave received the ACK - // from the framework, died, but slave's ACK to the executor never made it! - if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) { - LOG(WARNING) << "Ignoring task status update " << update - << " that has already been acknowledged by the framework!"; - return false; - } - - // Check that this update hasn't already been received. - // This could happen if the slave receives a status update from an executor, - // then crashes after it writes it to disk but before it sends an ack. - if (received.contains(UUID::fromBytes(update.uuid()).get())) { - LOG(WARNING) << "Ignoring duplicate task status update " << update; - return false; - } - - // Handle the update, checkpointing if necessary. - Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE); - if (result.isError()) { - return Error(result.error()); - } - - return true; -} - - -Try<bool> TaskStatusUpdateStream::acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid, - const StatusUpdate& update) -{ - if (error.isSome()) { - return Error(error.get()); - } - - if (acknowledged.contains(uuid)) { - LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: " - << uuid << ") for update " << update; - return false; - } - - // This might happen if we retried a status update and got back - // acknowledgments for both the original and the retried update. - if (uuid != UUID::fromBytes(update.uuid()).get()) { - LOG(WARNING) << "Unexpected task status update acknowledgement (received " - << uuid << ", expecting " - << UUID::fromBytes(update.uuid()).get() - << ") for update " << update; - return false; - } - - // Handle the ACK, checkpointing if necessary. - Try<Nothing> result = handle(update, StatusUpdateRecord::ACK); - if (result.isError()) { - return Error(result.error()); - } - - return true; -} - - -Result<StatusUpdate> TaskStatusUpdateStream::next() -{ - if (error.isSome()) { - return Error(error.get()); - } - - if (!pending.empty()) { - return pending.front(); - } - - return None(); -} - - -Try<Nothing> TaskStatusUpdateStream::replay( - const std::vector<StatusUpdate>& updates, - const hashset<UUID>& acks) -{ - if (error.isSome()) { - return Error(error.get()); - } - - VLOG(1) << "Replaying task status update stream for task " << taskId; - - foreach (const StatusUpdate& update, updates) { - // Handle the update. - _handle(update, StatusUpdateRecord::UPDATE); - - // Check if the update has an ACK too. - if (acks.contains(UUID::fromBytes(update.uuid()).get())) { - _handle(update, StatusUpdateRecord::ACK); - } - } - - return Nothing(); -} - - -Try<Nothing> TaskStatusUpdateStream::handle( - const StatusUpdate& update, - const StatusUpdateRecord::Type& type) -{ - CHECK_NONE(error); - - // Checkpoint the update if necessary. - if (checkpoint) { - LOG(INFO) << "Checkpointing " << type << " for task status update " - << update; - - CHECK_SOME(fd); - - StatusUpdateRecord record; - record.set_type(type); - - if (type == StatusUpdateRecord::UPDATE) { - record.mutable_update()->CopyFrom(update); - } else { - record.set_uuid(update.uuid()); - } - - Try<Nothing> write = ::protobuf::write(fd.get(), record); - if (write.isError()) { - error = "Failed to write task status update " + stringify(update) + - " to '" + path.get() + "': " + write.error(); - return Error(error.get()); - } - } - - // Now actually handle the update. - _handle(update, type); - - return Nothing(); -} - - -void TaskStatusUpdateStream::_handle( - const StatusUpdate& update, - const StatusUpdateRecord::Type& type) -{ - CHECK_NONE(error); - - if (type == StatusUpdateRecord::UPDATE) { - // Record this update. - received.insert(UUID::fromBytes(update.uuid()).get()); - - // Add it to the pending updates queue. - pending.push(update); - } else { - // Record this ACK. - acknowledged.insert(UUID::fromBytes(update.uuid()).get()); - - // Remove the corresponding update from the pending queue. - pending.pop(); - - if (!terminated) { - terminated = protobuf::isTerminalState(update.status().state()); - } - } -} - -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp deleted file mode 100644 index 4f7d45d..0000000 --- a/src/slave/status_update_manager.hpp +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef __STATUS_UPDATE_MANAGER_HPP__ -#define __STATUS_UPDATE_MANAGER_HPP__ - -#include <queue> -#include <string> - -#include <mesos/mesos.hpp> - -#include <process/future.hpp> -#include <process/pid.hpp> -#include <process/timeout.hpp> - -#include <stout/hashset.hpp> -#include <stout/lambda.hpp> -#include <stout/option.hpp> -#include <stout/try.hpp> -#include <stout/uuid.hpp> - -#include "messages/messages.hpp" - -#include "slave/flags.hpp" - -namespace mesos { -namespace internal { -namespace slave { - -// Forward declarations. - -namespace state { -struct SlaveState; -} - -class TaskStatusUpdateManagerProcess; -struct TaskStatusUpdateStream; - - -// TaskStatusUpdateManager is responsible for -// 1) Reliably sending status updates to the master. -// 2) Checkpointing the update to disk (optional). -// 3) Sending ACKs to the executor (optional). -// 4) Receiving ACKs from the scheduler. -class TaskStatusUpdateManager -{ -public: - TaskStatusUpdateManager(const Flags& flags); - virtual ~TaskStatusUpdateManager(); - - // Expects a callback 'forward' which gets called whenever there is - // a new status update that needs to be forwarded to the master. - void initialize(const lambda::function<void(StatusUpdate)>& forward); - - // TODO(vinod): Come up with better names/signatures for the - // checkpointing and non-checkpointing 'update()' functions. - // Currently, it is not obvious that one version of 'update()' - // does checkpointing while the other doesn't. - - // Checkpoints the status update and reliably sends the - // update to the master (and hence the scheduler). - // @return Whether the update is handled successfully - // (e.g. checkpointed). - process::Future<Nothing> update( - const StatusUpdate& update, - const SlaveID& slaveId, - const ExecutorID& executorId, - const ContainerID& containerId); - - // Retries the update to the master (as long as the slave is - // alive), but does not checkpoint the update. - // @return Whether the update is handled successfully. - process::Future<Nothing> update( - const StatusUpdate& update, - const SlaveID& slaveId); - - // Checkpoints the status update to disk if necessary. - // Also, sends the next pending status update, if any. - // @return True if the ACK is handled successfully (e.g., checkpointed) - // and the task's status update stream is not terminated. - // False same as above except the status update stream is terminated. - // Failed if there are any errors (e.g., duplicate, checkpointing). - process::Future<bool> acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid); - - // Recover status updates. - process::Future<Nothing> recover( - const std::string& rootDir, - const Option<state::SlaveState>& state); - - - // Pause sending updates. - // This is useful when the slave is disconnected because a - // disconnected slave will drop the updates. - void pause(); - - // Unpause and resend all the pending updates right away. - // This is useful when the updates were pending because there was - // no master elected (e.g., during recovery) or framework failed over. - void resume(); - - // Closes all the status update streams corresponding to this framework. - // NOTE: This stops retrying any pending status updates for this framework. - void cleanup(const FrameworkID& frameworkId); - -private: - TaskStatusUpdateManagerProcess* process; -}; - - -// TaskStatusUpdateStream handles the status updates and acknowledgements -// of a task, checkpointing them if necessary. It also holds the information -// about received, acknowledged and pending status updates. -// NOTE: A task is expected to have a globally unique ID across the lifetime -// of a framework. In other words the tuple (taskId, frameworkId) should be -// always unique. -struct TaskStatusUpdateStream -{ - TaskStatusUpdateStream(const TaskID& _taskId, - const FrameworkID& _frameworkId, - const SlaveID& _slaveId, - const Flags& _flags, - bool _checkpoint, - const Option<ExecutorID>& executorId, - const Option<ContainerID>& containerId); - - ~TaskStatusUpdateStream(); - - // This function handles the update, checkpointing if necessary. - // @return True if the update is successfully handled. - // False if the update is a duplicate. - // Error Any errors (e.g., checkpointing). - Try<bool> update(const StatusUpdate& update); - - // This function handles the ACK, checkpointing if necessary. - // @return True if the acknowledgement is successfully handled. - // False if the acknowledgement is a duplicate. - // Error Any errors (e.g., checkpointing). - Try<bool> acknowledgement( - const TaskID& taskId, - const FrameworkID& frameworkId, - const UUID& uuid, - const StatusUpdate& update); - - // Returns the next update (or none, if empty) in the queue. - Result<StatusUpdate> next(); - - // Replays the stream by sequentially handling an update and its - // corresponding ACK, if present. - Try<Nothing> replay( - const std::vector<StatusUpdate>& updates, - const hashset<UUID>& acks); - - // TODO(vinod): Explore semantics to make these private. - const bool checkpoint; - bool terminated; - Option<process::Timeout> timeout; // Timeout for resending status update. - std::queue<StatusUpdate> pending; - -private: - // Handles the status update and writes it to disk, if necessary. - // TODO(vinod): The write has to be asynchronous to avoid status updates that - // are being checkpointed, blocking the processing of other updates. - // One solution is to wrap the protobuf::write inside async, but its probably - // too much of an overhead to spin up a new libprocess per status update? - // A better solution might be to be have async write capability for file io. - Try<Nothing> handle( - const StatusUpdate& update, - const StatusUpdateRecord::Type& type); - - void _handle( - const StatusUpdate& update, - const StatusUpdateRecord::Type& type); - - const TaskID taskId; - const FrameworkID frameworkId; - const SlaveID slaveId; - - const Flags flags; - - hashset<UUID> received; - hashset<UUID> acknowledged; - - Option<std::string> path; // File path of the update stream. - Option<int_fd> fd; // File descriptor to the update stream. - - Option<std::string> error; // Potential non-retryable error. -}; - -} // namespace slave { -} // namespace internal { -} // namespace mesos { - - -#endif // __STATUS_UPDATE_MANAGER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/task_status_update_manager.cpp b/src/slave/task_status_update_manager.cpp new file mode 100644 index 0000000..1ec6be7 --- /dev/null +++ b/src/slave/task_status_update_manager.cpp @@ -0,0 +1,898 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "slave/task_status_update_manager.hpp" + +#include <process/delay.hpp> +#include <process/id.hpp> +#include <process/process.hpp> +#include <process/timer.hpp> + +#include <stout/check.hpp> +#include <stout/foreach.hpp> +#include <stout/hashmap.hpp> +#include <stout/hashset.hpp> +#include <stout/lambda.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/path.hpp> +#include <stout/protobuf.hpp> +#include <stout/stringify.hpp> +#include <stout/utils.hpp> +#include <stout/uuid.hpp> + +#include "common/protobuf_utils.hpp" + +#include "logging/logging.hpp" + +#include "slave/constants.hpp" +#include "slave/flags.hpp" +#include "slave/slave.hpp" +#include "slave/state.hpp" + +using lambda::function; + +using std::string; + +using process::wait; // Necessary on some OS's to disambiguate. +using process::Failure; +using process::Future; +using process::PID; +using process::Timeout; +using process::UPID; + +namespace mesos { +namespace internal { +namespace slave { + +using state::SlaveState; +using state::FrameworkState; +using state::ExecutorState; +using state::RunState; +using state::TaskState; + + +class TaskStatusUpdateManagerProcess + : public ProtobufProcess<TaskStatusUpdateManagerProcess> +{ +public: + TaskStatusUpdateManagerProcess(const Flags& flags); + virtual ~TaskStatusUpdateManagerProcess(); + + // Explicitly use 'initialize' since we're overloading below. + using process::ProcessBase::initialize; + + // TaskStatusUpdateManager implementation. + void initialize(const function<void(StatusUpdate)>& forward); + + Future<Nothing> update( + const StatusUpdate& update, + const SlaveID& slaveId, + const ExecutorID& executorId, + const ContainerID& containerId); + + Future<Nothing> update( + const StatusUpdate& update, + const SlaveID& slaveId); + + Future<bool> acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid); + + Future<Nothing> recover( + const string& rootDir, + const Option<SlaveState>& state); + + void pause(); + void resume(); + + void cleanup(const FrameworkID& frameworkId); + +private: + // Helper function to handle update. + Future<Nothing> _update( + const StatusUpdate& update, + const SlaveID& slaveId, + bool checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId); + + // Status update timeout. + void timeout(const Duration& duration); + + // Forwards the status update to the master and starts a timer based + // on the 'duration' to check for ACK from the scheduler. + // NOTE: This should only be used for those messages that expect an + // ACK (e.g updates from the executor). + Timeout forward(const StatusUpdate& update, const Duration& duration); + + // Helper functions. + + // Creates a new status update stream (opening the updates file, if path is + // present) and adds it to streams. + TaskStatusUpdateStream* createStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId, + const SlaveID& slaveId, + bool checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId); + + TaskStatusUpdateStream* getStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId); + + void cleanupStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId); + + const Flags flags; + bool paused; + + function<void(StatusUpdate)> forward_; + + hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams; +}; + + +TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess( + const Flags& _flags) + : ProcessBase(process::ID::generate("task-status-update-manager")), + flags(_flags), + paused(false) +{ +} + + +TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess() +{ + foreachkey (const FrameworkID& frameworkId, streams) { + foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { + delete stream; + } + } + streams.clear(); +} + + +void TaskStatusUpdateManagerProcess::initialize( + const function<void(StatusUpdate)>& forward) +{ + forward_ = forward; +} + + +void TaskStatusUpdateManagerProcess::pause() +{ + LOG(INFO) << "Pausing sending task status updates"; + paused = true; +} + + +void TaskStatusUpdateManagerProcess::resume() +{ + LOG(INFO) << "Resuming sending task status updates"; + paused = false; + + foreachkey (const FrameworkID& frameworkId, streams) { + foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { + if (!stream->pending.empty()) { + const StatusUpdate& update = stream->pending.front(); + LOG(WARNING) << "Resending task status update " << update; + stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN); + } + } + } +} + + +Future<Nothing> TaskStatusUpdateManagerProcess::recover( + const string& rootDir, + const Option<SlaveState>& state) +{ + LOG(INFO) << "Recovering task status update manager"; + + if (state.isNone()) { + return Nothing(); + } + + foreachvalue (const FrameworkState& framework, state->frameworks) { + foreachvalue (const ExecutorState& executor, framework.executors) { + LOG(INFO) << "Recovering executor '" << executor.id + << "' of framework " << framework.id; + + if (executor.info.isNone()) { + LOG(WARNING) << "Skipping recovering task status updates of" + << " executor '" << executor.id + << "' of framework " << framework.id + << " because its info cannot be recovered"; + continue; + } + + if (executor.latest.isNone()) { + LOG(WARNING) << "Skipping recovering task status updates of" + << " executor '" << executor.id + << "' of framework " << framework.id + << " because its latest run cannot be recovered"; + continue; + } + + // We are only interested in the latest run of the executor! + const ContainerID& latest = executor.latest.get(); + Option<RunState> run = executor.runs.get(latest); + CHECK_SOME(run); + + if (run->completed) { + VLOG(1) << "Skipping recovering task status updates of" + << " executor '" << executor.id + << "' of framework " << framework.id + << " because its latest run " << latest.value() + << " is completed"; + continue; + } + + foreachvalue (const TaskState& task, run->tasks) { + // No updates were ever received for this task! + // This means either: + // 1) the executor never received this task or + // 2) executor launched it but the slave died before it got any updates. + if (task.updates.empty()) { + LOG(WARNING) << "No status updates found for task " << task.id + << " of framework " << framework.id; + continue; + } + + // Create a new status update stream. + TaskStatusUpdateStream* stream = createStatusUpdateStream( + task.id, framework.id, state->id, true, executor.id, latest); + + // Replay the stream. + Try<Nothing> replay = stream->replay(task.updates, task.acks); + if (replay.isError()) { + return Failure( + "Failed to replay status updates for task " + stringify(task.id) + + " of framework " + stringify(framework.id) + + ": " + replay.error()); + } + + // At the end of the replay, the stream is either terminated or + // contains only unacknowledged, if any, pending updates. The + // pending updates will be flushed after the slave + // re-registers with the master. + if (stream->terminated) { + cleanupStatusUpdateStream(task.id, framework.id); + } + } + } + } + + return Nothing(); +} + + +void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId) +{ + LOG(INFO) << "Closing task status update streams for framework " + << frameworkId; + + if (streams.contains(frameworkId)) { + foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) { + cleanupStatusUpdateStream(taskId, frameworkId); + } + } +} + + +Future<Nothing> TaskStatusUpdateManagerProcess::update( + const StatusUpdate& update, + const SlaveID& slaveId, + const ExecutorID& executorId, + const ContainerID& containerId) +{ + return _update(update, slaveId, true, executorId, containerId); +} + + +Future<Nothing> TaskStatusUpdateManagerProcess::update( + const StatusUpdate& update, + const SlaveID& slaveId) +{ + return _update(update, slaveId, false, None(), None()); +} + + +Future<Nothing> TaskStatusUpdateManagerProcess::_update( + const StatusUpdate& update, + const SlaveID& slaveId, + bool checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId) +{ + const TaskID& taskId = update.status().task_id(); + const FrameworkID& frameworkId = update.framework_id(); + + LOG(INFO) << "Received task status update " << update; + + // Write the status update to disk and enqueue it to send it to the master. + // Create/Get the status update stream for this task. + TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId); + if (stream == nullptr) { + stream = createStatusUpdateStream( + taskId, frameworkId, slaveId, checkpoint, executorId, containerId); + } + + // Verify that we didn't get a non-checkpointable update for a + // stream that is checkpointable, and vice-versa. + if (stream->checkpoint != checkpoint) { + return Failure( + "Mismatched checkpoint value for task status update " + + stringify(update) + " (expected checkpoint=" + + stringify(stream->checkpoint) + " actual checkpoint=" + + stringify(checkpoint) + ")"); + } + + // Handle the status update. + Try<bool> result = stream->update(update); + if (result.isError()) { + return Failure(result.error()); + } + + // We don't return a failed future here so that the slave can re-ack + // the duplicate update. + if (!result.get()) { + return Nothing(); + } + + // Forward the status update to the master if this is the first in the stream. + // Subsequent status updates will get sent in 'acknowledgement()'. + if (!paused && stream->pending.size() == 1) { + CHECK_NONE(stream->timeout); + const Result<StatusUpdate>& next = stream->next(); + if (next.isError()) { + return Failure(next.error()); + } + + CHECK_SOME(next); + stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); + } + + return Nothing(); +} + + +Timeout TaskStatusUpdateManagerProcess::forward( + const StatusUpdate& update, + const Duration& duration) +{ + CHECK(!paused); + + VLOG(1) << "Forwarding task status update " << update << " to the agent"; + + // Forward the update. + forward_(update); + + // Send a message to self to resend after some delay if no ACK is received. + return delay(duration, + self(), + &TaskStatusUpdateManagerProcess::timeout, + duration).timeout(); +} + + +Future<bool> TaskStatusUpdateManagerProcess::acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid) +{ + LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid + << ") for task " << taskId + << " of framework " << frameworkId; + + TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId); + + // This might happen if we haven't completed recovery yet or if the + // acknowledgement is for a stream that has been cleaned up. + if (stream == nullptr) { + return Failure( + "Cannot find the task status update stream for task " + + stringify(taskId) + " of framework " + stringify(frameworkId)); + } + + // Get the corresponding update for this ACK. + const Result<StatusUpdate>& update = stream->next(); + if (update.isError()) { + return Failure(update.error()); + } + + // This might happen if we retried a status update and got back + // acknowledgments for both the original and the retried update. + if (update.isNone()) { + return Failure( + "Unexpected task status update acknowledgment (UUID: " + + uuid.toString() + ") for task " + stringify(taskId) + " of framework " + + stringify(frameworkId)); + } + + // Handle the acknowledgement. + Try<bool> result = + stream->acknowledgement(taskId, frameworkId, uuid, update.get()); + + if (result.isError()) { + return Failure(result.error()); + } + + if (!result.get()) { + return Failure("Duplicate task status acknowledgement"); + } + + // Reset the timeout. + stream->timeout = None(); + + // Get the next update in the queue. + const Result<StatusUpdate>& next = stream->next(); + if (next.isError()) { + return Failure(next.error()); + } + + bool terminated = stream->terminated; + + if (terminated) { + if (next.isSome()) { + LOG(WARNING) << "Acknowledged a terminal" + << " task status update " << update.get() + << " but updates are still pending"; + } + cleanupStatusUpdateStream(taskId, frameworkId); + } else if (!paused && next.isSome()) { + // Forward the next queued status update. + stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); + } + + return !terminated; +} + + +// TODO(vinod): There should be a limit on the retries. +void TaskStatusUpdateManagerProcess::timeout(const Duration& duration) +{ + if (paused) { + return; + } + + // Check and see if we should resend any status updates. + foreachkey (const FrameworkID& frameworkId, streams) { + foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) { + CHECK_NOTNULL(stream); + if (!stream->pending.empty()) { + CHECK_SOME(stream->timeout); + if (stream->timeout->expired()) { + const StatusUpdate& update = stream->pending.front(); + LOG(WARNING) << "Resending task status update " << update; + + // Bounded exponential backoff. + Duration duration_ = + std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX); + + stream->timeout = forward(update, duration_); + } + } + } + } +} + + +TaskStatusUpdateStream* +TaskStatusUpdateManagerProcess::createStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId, + const SlaveID& slaveId, + bool checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId) +{ + VLOG(1) << "Creating StatusUpdate stream for task " << taskId + << " of framework " << frameworkId; + + TaskStatusUpdateStream* stream = new TaskStatusUpdateStream( + taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId); + + streams[frameworkId][taskId] = stream; + return stream; +} + + +TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId) +{ + if (!streams.contains(frameworkId)) { + return nullptr; + } + + if (!streams[frameworkId].contains(taskId)) { + return nullptr; + } + + return streams[frameworkId][taskId]; +} + + +void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream( + const TaskID& taskId, + const FrameworkID& frameworkId) +{ + VLOG(1) << "Cleaning up status update stream" + << " for task " << taskId + << " of framework " << frameworkId; + + CHECK(streams.contains(frameworkId)) + << "Cannot find the task status update streams for framework " + << frameworkId; + + CHECK(streams[frameworkId].contains(taskId)) + << "Cannot find the status update streams for task " << taskId; + + TaskStatusUpdateStream* stream = streams[frameworkId][taskId]; + + streams[frameworkId].erase(taskId); + if (streams[frameworkId].empty()) { + streams.erase(frameworkId); + } + + delete stream; +} + + +TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags) +{ + process = new TaskStatusUpdateManagerProcess(flags); + spawn(process); +} + + +TaskStatusUpdateManager::~TaskStatusUpdateManager() +{ + terminate(process); + wait(process); + delete process; +} + + +void TaskStatusUpdateManager::initialize( + const function<void(StatusUpdate)>& forward) +{ + dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward); +} + + +Future<Nothing> TaskStatusUpdateManager::update( + const StatusUpdate& update, + const SlaveID& slaveId, + const ExecutorID& executorId, + const ContainerID& containerId) +{ + return dispatch( + process, + &TaskStatusUpdateManagerProcess::update, + update, + slaveId, + executorId, + containerId); +} + + +Future<Nothing> TaskStatusUpdateManager::update( + const StatusUpdate& update, + const SlaveID& slaveId) +{ + return dispatch( + process, + &TaskStatusUpdateManagerProcess::update, + update, + slaveId); +} + + +Future<bool> TaskStatusUpdateManager::acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid) +{ + return dispatch( + process, + &TaskStatusUpdateManagerProcess::acknowledgement, + taskId, + frameworkId, + uuid); +} + + +Future<Nothing> TaskStatusUpdateManager::recover( + const string& rootDir, + const Option<SlaveState>& state) +{ + return dispatch( + process, &TaskStatusUpdateManagerProcess::recover, rootDir, state); +} + + +void TaskStatusUpdateManager::pause() +{ + dispatch(process, &TaskStatusUpdateManagerProcess::pause); +} + + +void TaskStatusUpdateManager::resume() +{ + dispatch(process, &TaskStatusUpdateManagerProcess::resume); +} + + +void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId) +{ + dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId); +} + + +TaskStatusUpdateStream::TaskStatusUpdateStream( + const TaskID& _taskId, + const FrameworkID& _frameworkId, + const SlaveID& _slaveId, + const Flags& _flags, + bool _checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId) + : checkpoint(_checkpoint), + terminated(false), + taskId(_taskId), + frameworkId(_frameworkId), + slaveId(_slaveId), + flags(_flags), + error(None()) +{ + if (checkpoint) { + CHECK_SOME(executorId); + CHECK_SOME(containerId); + + path = paths::getTaskUpdatesPath( + paths::getMetaRootDir(flags.work_dir), + slaveId, + frameworkId, + executorId.get(), + containerId.get(), + taskId); + + // Create the base updates directory, if it doesn't exist. + const string& dirName = Path(path.get()).dirname(); + Try<Nothing> directory = os::mkdir(dirName); + if (directory.isError()) { + error = "Failed to create '" + dirName + "': " + directory.error(); + return; + } + + // Open the updates file. + // NOTE: We don't use `O_SYNC` here because we only read this file + // if the host did not crash. `os::write` success implies the kernel + // will have flushed our data to the page cache. This is sufficient + // for the recovery scenarios we use this data for. + Try<int_fd> result = os::open( + path.get(), + O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + + if (result.isError()) { + error = "Failed to open '" + path.get() + + "' for status updates: " + result.error(); + return; + } + + // We keep the file open through the lifetime of the task, because it + // makes it easy to append status update records to the file. + fd = result.get(); + } +} + + +TaskStatusUpdateStream::~TaskStatusUpdateStream() +{ + if (fd.isSome()) { + Try<Nothing> close = os::close(fd.get()); + if (close.isError()) { + CHECK_SOME(path); + LOG(ERROR) << "Failed to close file '" << path.get() << "': " + << close.error(); + } + } +} + + +Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update) +{ + if (error.isSome()) { + return Error(error.get()); + } + + if (!update.has_uuid()) { + return Error("Task status update is missing 'uuid'"); + } + + // Check that this status update has not already been acknowledged. + // This could happen in the rare case when the slave received the ACK + // from the framework, died, but slave's ACK to the executor never made it! + if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) { + LOG(WARNING) << "Ignoring task status update " << update + << " that has already been acknowledged by the framework!"; + return false; + } + + // Check that this update hasn't already been received. + // This could happen if the slave receives a status update from an executor, + // then crashes after it writes it to disk but before it sends an ack. + if (received.contains(UUID::fromBytes(update.uuid()).get())) { + LOG(WARNING) << "Ignoring duplicate task status update " << update; + return false; + } + + // Handle the update, checkpointing if necessary. + Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE); + if (result.isError()) { + return Error(result.error()); + } + + return true; +} + + +Try<bool> TaskStatusUpdateStream::acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid, + const StatusUpdate& update) +{ + if (error.isSome()) { + return Error(error.get()); + } + + if (acknowledged.contains(uuid)) { + LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: " + << uuid << ") for update " << update; + return false; + } + + // This might happen if we retried a status update and got back + // acknowledgments for both the original and the retried update. + if (uuid != UUID::fromBytes(update.uuid()).get()) { + LOG(WARNING) << "Unexpected task status update acknowledgement (received " + << uuid << ", expecting " + << UUID::fromBytes(update.uuid()).get() + << ") for update " << update; + return false; + } + + // Handle the ACK, checkpointing if necessary. + Try<Nothing> result = handle(update, StatusUpdateRecord::ACK); + if (result.isError()) { + return Error(result.error()); + } + + return true; +} + + +Result<StatusUpdate> TaskStatusUpdateStream::next() +{ + if (error.isSome()) { + return Error(error.get()); + } + + if (!pending.empty()) { + return pending.front(); + } + + return None(); +} + + +Try<Nothing> TaskStatusUpdateStream::replay( + const std::vector<StatusUpdate>& updates, + const hashset<UUID>& acks) +{ + if (error.isSome()) { + return Error(error.get()); + } + + VLOG(1) << "Replaying task status update stream for task " << taskId; + + foreach (const StatusUpdate& update, updates) { + // Handle the update. + _handle(update, StatusUpdateRecord::UPDATE); + + // Check if the update has an ACK too. + if (acks.contains(UUID::fromBytes(update.uuid()).get())) { + _handle(update, StatusUpdateRecord::ACK); + } + } + + return Nothing(); +} + + +Try<Nothing> TaskStatusUpdateStream::handle( + const StatusUpdate& update, + const StatusUpdateRecord::Type& type) +{ + CHECK_NONE(error); + + // Checkpoint the update if necessary. + if (checkpoint) { + LOG(INFO) << "Checkpointing " << type << " for task status update " + << update; + + CHECK_SOME(fd); + + StatusUpdateRecord record; + record.set_type(type); + + if (type == StatusUpdateRecord::UPDATE) { + record.mutable_update()->CopyFrom(update); + } else { + record.set_uuid(update.uuid()); + } + + Try<Nothing> write = ::protobuf::write(fd.get(), record); + if (write.isError()) { + error = "Failed to write task status update " + stringify(update) + + " to '" + path.get() + "': " + write.error(); + return Error(error.get()); + } + } + + // Now actually handle the update. + _handle(update, type); + + return Nothing(); +} + + +void TaskStatusUpdateStream::_handle( + const StatusUpdate& update, + const StatusUpdateRecord::Type& type) +{ + CHECK_NONE(error); + + if (type == StatusUpdateRecord::UPDATE) { + // Record this update. + received.insert(UUID::fromBytes(update.uuid()).get()); + + // Add it to the pending updates queue. + pending.push(update); + } else { + // Record this ACK. + acknowledged.insert(UUID::fromBytes(update.uuid()).get()); + + // Remove the corresponding update from the pending queue. + pending.pop(); + + if (!terminated) { + terminated = protobuf::isTerminalState(update.status().state()); + } + } +} + +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/task_status_update_manager.hpp b/src/slave/task_status_update_manager.hpp new file mode 100644 index 0000000..6bdb468 --- /dev/null +++ b/src/slave/task_status_update_manager.hpp @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __TASK_STATUS_UPDATE_MANAGER_HPP__ +#define __TASK_STATUS_UPDATE_MANAGER_HPP__ + +#include <queue> +#include <string> + +#include <mesos/mesos.hpp> + +#include <process/future.hpp> +#include <process/pid.hpp> +#include <process/timeout.hpp> + +#include <stout/hashset.hpp> +#include <stout/lambda.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> +#include <stout/uuid.hpp> + +#include "messages/messages.hpp" + +#include "slave/flags.hpp" + +namespace mesos { +namespace internal { +namespace slave { + +// Forward declarations. + +namespace state { +struct SlaveState; +} + +class TaskStatusUpdateManagerProcess; +struct TaskStatusUpdateStream; + + +// TaskStatusUpdateManager is responsible for +// 1) Reliably sending status updates to the master. +// 2) Checkpointing the update to disk (optional). +// 3) Sending ACKs to the executor (optional). +// 4) Receiving ACKs from the scheduler. +class TaskStatusUpdateManager +{ +public: + TaskStatusUpdateManager(const Flags& flags); + virtual ~TaskStatusUpdateManager(); + + // Expects a callback 'forward' which gets called whenever there is + // a new status update that needs to be forwarded to the master. + void initialize(const lambda::function<void(StatusUpdate)>& forward); + + // TODO(vinod): Come up with better names/signatures for the + // checkpointing and non-checkpointing 'update()' functions. + // Currently, it is not obvious that one version of 'update()' + // does checkpointing while the other doesn't. + + // Checkpoints the status update and reliably sends the + // update to the master (and hence the scheduler). + // @return Whether the update is handled successfully + // (e.g. checkpointed). + process::Future<Nothing> update( + const StatusUpdate& update, + const SlaveID& slaveId, + const ExecutorID& executorId, + const ContainerID& containerId); + + // Retries the update to the master (as long as the slave is + // alive), but does not checkpoint the update. + // @return Whether the update is handled successfully. + process::Future<Nothing> update( + const StatusUpdate& update, + const SlaveID& slaveId); + + // Checkpoints the status update to disk if necessary. + // Also, sends the next pending status update, if any. + // @return True if the ACK is handled successfully (e.g., checkpointed) + // and the task's status update stream is not terminated. + // False same as above except the status update stream is terminated. + // Failed if there are any errors (e.g., duplicate, checkpointing). + process::Future<bool> acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid); + + // Recover status updates. + process::Future<Nothing> recover( + const std::string& rootDir, + const Option<state::SlaveState>& state); + + + // Pause sending updates. + // This is useful when the slave is disconnected because a + // disconnected slave will drop the updates. + void pause(); + + // Unpause and resend all the pending updates right away. + // This is useful when the updates were pending because there was + // no master elected (e.g., during recovery) or framework failed over. + void resume(); + + // Closes all the status update streams corresponding to this framework. + // NOTE: This stops retrying any pending status updates for this framework. + void cleanup(const FrameworkID& frameworkId); + +private: + TaskStatusUpdateManagerProcess* process; +}; + + +// TaskStatusUpdateStream handles the status updates and acknowledgements +// of a task, checkpointing them if necessary. It also holds the information +// about received, acknowledged and pending status updates. +// NOTE: A task is expected to have a globally unique ID across the lifetime +// of a framework. In other words the tuple (taskId, frameworkId) should be +// always unique. +struct TaskStatusUpdateStream +{ + TaskStatusUpdateStream(const TaskID& _taskId, + const FrameworkID& _frameworkId, + const SlaveID& _slaveId, + const Flags& _flags, + bool _checkpoint, + const Option<ExecutorID>& executorId, + const Option<ContainerID>& containerId); + + ~TaskStatusUpdateStream(); + + // This function handles the update, checkpointing if necessary. + // @return True if the update is successfully handled. + // False if the update is a duplicate. + // Error Any errors (e.g., checkpointing). + Try<bool> update(const StatusUpdate& update); + + // This function handles the ACK, checkpointing if necessary. + // @return True if the acknowledgement is successfully handled. + // False if the acknowledgement is a duplicate. + // Error Any errors (e.g., checkpointing). + Try<bool> acknowledgement( + const TaskID& taskId, + const FrameworkID& frameworkId, + const UUID& uuid, + const StatusUpdate& update); + + // Returns the next update (or none, if empty) in the queue. + Result<StatusUpdate> next(); + + // Replays the stream by sequentially handling an update and its + // corresponding ACK, if present. + Try<Nothing> replay( + const std::vector<StatusUpdate>& updates, + const hashset<UUID>& acks); + + // TODO(vinod): Explore semantics to make these private. + const bool checkpoint; + bool terminated; + Option<process::Timeout> timeout; // Timeout for resending status update. + std::queue<StatusUpdate> pending; + +private: + // Handles the status update and writes it to disk, if necessary. + // TODO(vinod): The write has to be asynchronous to avoid status updates that + // are being checkpointed, blocking the processing of other updates. + // One solution is to wrap the protobuf::write inside async, but its probably + // too much of an overhead to spin up a new libprocess per status update? + // A better solution might be to be have async write capability for file io. + Try<Nothing> handle( + const StatusUpdate& update, + const StatusUpdateRecord::Type& type); + + void _handle( + const StatusUpdate& update, + const StatusUpdateRecord::Type& type); + + const TaskID taskId; + const FrameworkID frameworkId; + const SlaveID slaveId; + + const Flags flags; + + hashset<UUID> received; + hashset<UUID> acknowledged; + + Option<std::string> path; // File path of the update stream. + Option<int_fd> fd; // File descriptor to the update stream. + + Option<std::string> error; // Potential non-retryable error. +}; + +} // namespace slave { +} // namespace internal { +} // namespace mesos { + + +#endif // __TASK_STATUS_UPDATE_MANAGER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index db5e531..8997cc0 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -117,7 +117,7 @@ set(MESOS_TESTS_SRC slave_tests.cpp slave_validation_tests.cpp sorter_tests.cpp - status_update_manager_tests.cpp + task_status_update_manager_tests.cpp uri_tests.cpp uri_fetcher_tests.cpp values_tests.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.cpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp index 6111be4..b854904 100644 --- a/src/tests/cluster.cpp +++ b/src/tests/cluster.cpp @@ -84,7 +84,7 @@ #include "slave/flags.hpp" #include "slave/gc.hpp" #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/containerizer/fetcher.hpp" http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index b3b1348..d572a09 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -58,7 +58,7 @@ #include "slave/flags.hpp" #include "slave/gc.hpp" #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/containerizer/fetcher.hpp" http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/mock_slave.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp index 5dd2188..90c4369 100644 --- a/src/tests/mock_slave.cpp +++ b/src/tests/mock_slave.cpp @@ -29,7 +29,7 @@ #include <stout/option.hpp> #include "slave/slave.hpp" -#include "slave/status_update_manager.hpp" +#include "slave/task_status_update_manager.hpp" #include "tests/mock_slave.hpp"