Repository: mesos Updated Branches: refs/heads/master 9457dce1d -> 72878f8c2
Moved Framework implementation into separate file. This change aims to reduce compile time and cognitive load when browsing the "master.hpp" header. Review: https://reviews.apache.org/r/68141/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cbba52 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cbba52 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cbba52 Branch: refs/heads/master Commit: 42cbba524f8cb4cea63919cf4eea8d2445f03a6e Parents: 9457dce Author: Benno Evers <[email protected]> Authored: Thu Aug 2 15:24:39 2018 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Thu Aug 2 15:28:07 2018 +0200 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 + src/Makefile.am | 5 +- src/master/framework.cpp | 773 ++++++++++++++++++++++++++++++++++++++++++ src/master/master.cpp | 57 ---- src/master/master.hpp | 621 ++------------------------------- 5 files changed, 808 insertions(+), 649 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 398ffdd..0e0913b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -371,6 +371,7 @@ set(LOGGING_SRC set(MASTER_SRC master/constants.cpp master/flags.cpp + master/framework.cpp master/http.cpp master/maintenance.cpp master/master.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 2ad719e..10ef977 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1016,8 +1016,9 @@ libmesos_no_3rdparty_la_SOURCES += \ local/local.cpp \ logging/flags.cpp \ logging/logging.cpp \ - master/constants.cpp \ + master/constants.cpp \ master/flags.cpp \ + master/framework.cpp \ master/http.cpp \ master/maintenance.cpp \ master/master.cpp \ @@ -1103,7 +1104,7 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/containerizer/mesos/provisioner/appc/paths.cpp \ slave/containerizer/mesos/provisioner/appc/store.cpp \ slave/containerizer/mesos/provisioner/backends/copy.cpp \ - slave/containerizer/mesos/provisioner/docker/image_tar_puller.cpp \ + slave/containerizer/mesos/provisioner/docker/image_tar_puller.cpp \ slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp \ slave/containerizer/mesos/provisioner/docker/paths.cpp \ slave/containerizer/mesos/provisioner/docker/puller.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/framework.cpp ---------------------------------------------------------------------- diff --git a/src/master/framework.cpp b/src/master/framework.cpp new file mode 100644 index 0000000..d17d7dd --- /dev/null +++ b/src/master/framework.cpp @@ -0,0 +1,773 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "master/master.hpp" + +#include "common/protobuf_utils.hpp" + +namespace mesos { +namespace internal { +namespace master { + +Framework::Framework( + Master* const master, + const Flags& masterFlags, + const FrameworkInfo& info, + const process::UPID& _pid, + const process::Time& time) + : Framework(master, masterFlags, info, ACTIVE, time) +{ + pid = _pid; +} + + +Framework::Framework( + Master* const master, + const Flags& masterFlags, + const FrameworkInfo& info, + const HttpConnection& _http, + const process::Time& time) + : Framework(master, masterFlags, info, ACTIVE, time) +{ + http = _http; +} + + +Framework::Framework( + Master* const master, + const Flags& masterFlags, + const FrameworkInfo& info) + : Framework(master, masterFlags, info, RECOVERED, process::Time()) +{} + + +Framework::~Framework() +{ + if (http.isSome()) { + closeHttpConnection(); + } +} + + +Task* Framework::getTask(const TaskID& taskId) +{ + if (tasks.count(taskId) > 0) { + return tasks[taskId]; + } + + return nullptr; +} + + +void Framework::addTask(Task* task) +{ + CHECK(!tasks.contains(task->task_id())) + << "Duplicate task " << task->task_id() + << " of framework " << task->framework_id(); + + // Verify that Resource.AllocationInfo is set, + // this should be guaranteed by the master. + foreach (const Resource& resource, task->resources()) { + CHECK(resource.has_allocation_info()); + } + + tasks[task->task_id()] = task; + + // Unreachable tasks should be added via `addUnreachableTask`. + CHECK(task->state() != TASK_UNREACHABLE) + << "Task '" << task->task_id() << "' of framework " << id() + << " added in TASK_UNREACHABLE state"; + + // Since we track terminal but unacknowledged tasks within + // `tasks` rather than `completedTasks`, we need to handle + // them here: don't count them as consuming resources. + // + // TODO(bmahler): Users currently get confused because + // terminal tasks can show up as "active" tasks in the UI and + // endpoints. Ideally, we show the terminal unacknowledged + // tasks as "completed" as well. + if (!protobuf::isTerminalState(task->state())) { + // Note that we explicitly convert from protobuf to `Resources` once + // and then use the result for calculations to avoid performance penalty + // for multiple conversions and validations implied by `+=` with protobuf + // arguments. + // Conversion is safe, as resources have already passed validation. + const Resources resources = task->resources(); + totalUsedResources += resources; + usedResources[task->slave_id()] += resources; + + // It's possible that we're not tracking the task's role for + // this framework if the role is absent from the framework's + // set of roles. In this case, we track the role's allocation + // for this framework. + CHECK(!task->resources().empty()); + const std::string& role = + task->resources().begin()->allocation_info().role(); + + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } + + metrics.incrementTaskState(task->state()); + + if (!master->subscribers.subscribed.empty()) { + master->subscribers.send( + protobuf::master::event::createTaskAdded(*task), + info); + } +} + + +void Framework::recoverResources(Task* task) +{ + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); + + totalUsedResources -= task->resources(); + usedResources[task->slave_id()] -= task->resources(); + if (usedResources[task->slave_id()].empty()) { + usedResources.erase(task->slave_id()); + } + + // If we are no longer subscribed to the role to which these resources are + // being returned to, and we have no more resources allocated to us for that + // role, stop tracking the framework under the role. + CHECK(!task->resources().empty()); + const std::string& role = + task->resources().begin()->allocation_info().role(); + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } +} + + +void Framework::addCompletedTask(Task&& task) +{ + // TODO(neilc): We currently allow frameworks to reuse the task + // IDs of completed tasks (although this is discouraged). This + // means that there might be multiple completed tasks with the + // same task ID. We should consider rejecting attempts to reuse + // task IDs (MESOS-6779). + completedTasks.push_back(process::Owned<Task>(new Task(std::move(task)))); +} + + +void Framework::addUnreachableTask(const Task& task) +{ + // TODO(adam-mesos): Check if unreachable task already exists. + unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task))); +} + + +void Framework::removeTask(Task* task, bool unreachable) +{ + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); + + // The invariant here is that the master will have already called + // `recoverResources()` prior to removing terminal or unreachable tasks. + if (!protobuf::isTerminalState(task->state()) && + task->state() != TASK_UNREACHABLE) { + recoverResources(task); + } + + if (unreachable) { + addUnreachableTask(*task); + } else { + CHECK(task->state() != TASK_UNREACHABLE); + + // TODO(bmahler): This moves a potentially non-terminal task into + // the completed list! + addCompletedTask(Task(*task)); + } + + tasks.erase(task->task_id()); +} + + +void Framework::addOffer(Offer* offer) +{ + CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); + offers.insert(offer); + totalOfferedResources += offer->resources(); + offeredResources[offer->slave_id()] += offer->resources(); +} + + +void Framework::removeOffer(Offer* offer) +{ + CHECK(offers.find(offer) != offers.end()) + << "Unknown offer " << offer->id(); + + totalOfferedResources -= offer->resources(); + offeredResources[offer->slave_id()] -= offer->resources(); + if (offeredResources[offer->slave_id()].empty()) { + offeredResources.erase(offer->slave_id()); + } + + offers.erase(offer); +} + + +void Framework::addInverseOffer(InverseOffer* inverseOffer) +{ + CHECK(!inverseOffers.contains(inverseOffer)) + << "Duplicate inverse offer " << inverseOffer->id(); + inverseOffers.insert(inverseOffer); +} + + +void Framework::removeInverseOffer(InverseOffer* inverseOffer) +{ + CHECK(inverseOffers.contains(inverseOffer)) + << "Unknown inverse offer " << inverseOffer->id(); + + inverseOffers.erase(inverseOffer); +} + + +bool Framework::hasExecutor( + const SlaveID& slaveId, + const ExecutorID& executorId) +{ + return executors.contains(slaveId) && + executors[slaveId].contains(executorId); +} + + +void Framework::addExecutor( + const SlaveID& slaveId, + const ExecutorInfo& executorInfo) +{ + CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) + << "Duplicate executor '" << executorInfo.executor_id() + << "' on agent " << slaveId; + + // Verify that Resource.AllocationInfo is set, + // this should be guaranteed by the master. + foreach (const Resource& resource, executorInfo.resources()) { + CHECK(resource.has_allocation_info()); + } + + executors[slaveId][executorInfo.executor_id()] = executorInfo; + totalUsedResources += executorInfo.resources(); + usedResources[slaveId] += executorInfo.resources(); + + // It's possible that we're not tracking the task's role for + // this framework if the role is absent from the framework's + // set of roles. In this case, we track the role's allocation + // for this framework. + if (!executorInfo.resources().empty()) { + const std::string& role = + executorInfo.resources().begin()->allocation_info().role(); + + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } +} + + +void Framework::removeExecutor( + const SlaveID& slaveId, + const ExecutorID& executorId) +{ + CHECK(hasExecutor(slaveId, executorId)) + << "Unknown executor '" << executorId + << "' of framework " << id() + << " of agent " << slaveId; + + const ExecutorInfo& executorInfo = executors[slaveId][executorId]; + + totalUsedResources -= executorInfo.resources(); + usedResources[slaveId] -= executorInfo.resources(); + if (usedResources[slaveId].empty()) { + usedResources.erase(slaveId); + } + + // If we are no longer subscribed to the role to which these resources are + // being returned to, and we have no more resources allocated to us for that + // role, stop tracking the framework under the role. + if (!executorInfo.resources().empty()) { + const std::string& role = + executorInfo.resources().begin()->allocation_info().role(); + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } + } + + executors[slaveId].erase(executorId); + if (executors[slaveId].empty()) { + executors.erase(slaveId); + } +} + + +void Framework::addOperation(Operation* operation) +{ + CHECK(operation->has_framework_id()); + + const FrameworkID& frameworkId = operation->framework_id(); + + const UUID& uuid = operation->uuid(); + + CHECK(!operations.contains(uuid)) + << "Duplicate operation '" << operation->info().id() + << "' (uuid: " << uuid << ") " + << "of framework " << frameworkId; + + operations.put(uuid, operation); + + if (operation->info().has_id()) { + operationUUIDs.put(operation->info().id(), uuid); + } + + if (!protobuf::isSpeculativeOperation(operation->info()) && + !protobuf::isTerminalState(operation->latest_status().state())) { + Try<Resources> consumed = + protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + + CHECK(operation->has_slave_id()) + << "External resource provider is not supported yet"; + + const SlaveID& slaveId = operation->slave_id(); + + totalUsedResources += consumed.get(); + usedResources[slaveId] += consumed.get(); + + // It's possible that we're not tracking the role from the + // resources in the operation for this framework if the role is + // absent from the framework's set of roles. In this case, we + // track the role's allocation for this framework. + foreachkey (const std::string& role, consumed->allocations()) { + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } + } +} + + +Option<Operation*> Framework::getOperation(const OperationID& id) +{ + Option<UUID> uuid = operationUUIDs.get(id); + + if (uuid.isNone()) { + return None(); + } + + Option<Operation*> operation = operations.get(uuid.get()); + + CHECK_SOME(operation); + + return operation; +} + + +void Framework::recoverResources(Operation* operation) +{ + CHECK(operation->has_slave_id()) + << "External resource provider is not supported yet"; + + const SlaveID& slaveId = operation->slave_id(); + + if (protobuf::isSpeculativeOperation(operation->info())) { + return; + } + + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + + CHECK(totalUsedResources.contains(consumed.get())) + << "Tried to recover resources " << consumed.get() + << " which do not seem used"; + + CHECK(usedResources[slaveId].contains(consumed.get())) + << "Tried to recover resources " << consumed.get() << " of agent " + << slaveId << " which do not seem used"; + + totalUsedResources -= consumed.get(); + usedResources[slaveId] -= consumed.get(); + if (usedResources[slaveId].empty()) { + usedResources.erase(slaveId); + } + + // If we are no longer subscribed to the role to which these + // resources are being returned to, and we have no more resources + // allocated to us for that role, stop tracking the framework + // under the role. + foreachkey (const std::string& role, consumed->allocations()) { + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } + } +} + + +void Framework::removeOperation(Operation* operation) +{ + const UUID& uuid = operation->uuid(); + + CHECK(operations.contains(uuid)) + << "Unknown operation '" << operation->info().id() + << "' (uuid: " << uuid << ") " + << "of framework " << operation->framework_id(); + + if (!protobuf::isSpeculativeOperation(operation->info()) && + !protobuf::isTerminalState(operation->latest_status().state())) { + recoverResources(operation); + } + + if (operation->info().has_id()) { + operationUUIDs.erase(operation->info().id()); + } + + operations.erase(uuid); +} + + +const FrameworkID Framework::id() const +{ + return info.id(); +} + + +void Framework::update(const FrameworkInfo& newInfo) +{ + // We only merge 'info' from the same framework 'id'. + CHECK_EQ(info.id(), newInfo.id()); + + // Save the old list of roles for later. + std::set<std::string> oldRoles = roles; + + // TODO(jmlvanre): Merge other fields as per design doc in + // MESOS-703. + + info.clear_role(); + info.clear_roles(); + + if (newInfo.has_role()) { + info.set_role(newInfo.role()); + } + + if (newInfo.roles_size() > 0) { + info.mutable_roles()->CopyFrom(newInfo.roles()); + } + + roles = protobuf::framework::getRoles(newInfo); + + if (newInfo.user() != info.user()) { + LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user() + << "' for framework " << id() << ". Check MESOS-703"; + } + + info.set_name(newInfo.name()); + + if (newInfo.has_failover_timeout()) { + info.set_failover_timeout(newInfo.failover_timeout()); + } else { + info.clear_failover_timeout(); + } + + if (newInfo.checkpoint() != info.checkpoint()) { + LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '" + << stringify(newInfo.checkpoint()) << "' for framework " + << id() << ". Check MESOS-703"; + } + + if (newInfo.has_hostname()) { + info.set_hostname(newInfo.hostname()); + } else { + info.clear_hostname(); + } + + if (newInfo.principal() != info.principal()) { + LOG(WARNING) << "Cannot update FrameworkInfo.principal to '" + << newInfo.principal() << "' for framework " << id() + << ". Check MESOS-703"; + } + + if (newInfo.has_webui_url()) { + info.set_webui_url(newInfo.webui_url()); + } else { + info.clear_webui_url(); + } + + if (newInfo.capabilities_size() > 0) { + info.mutable_capabilities()->CopyFrom(newInfo.capabilities()); + } else { + info.clear_capabilities(); + } + capabilities = protobuf::framework::Capabilities(info.capabilities()); + + if (newInfo.has_labels()) { + info.mutable_labels()->CopyFrom(newInfo.labels()); + } else { + info.clear_labels(); + } + + const std::set<std::string>& newRoles = roles; + + const std::set<std::string> removedRoles = [&]() { + std::set<std::string> result = oldRoles; + foreach (const std::string& role, newRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const std::string& role, removedRoles) { + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + // Stop tracking the framework under this role if there are + // no longer any resources allocated to it. + if (totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } + } + + const std::set<std::string> addedRoles = [&]() { + std::set<std::string> result = newRoles; + foreach (const std::string& role, oldRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const std::string& role, addedRoles) { + // NOTE: It's possible that we're already tracking this framework + // under the role because a framework can unsubscribe from a role + // while it still has resources allocated to the role. + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } +} + + +void Framework::updateConnection(const process::UPID& newPid) +{ + // Cleanup the HTTP connnection if this is a downgrade from HTTP + // to PID. Note that the connection may already be closed. + if (http.isSome()) { + closeHttpConnection(); + } + + // TODO(benh): unlink(oldPid); + pid = newPid; +} + + +void Framework::updateConnection(const HttpConnection& newHttp) +{ + if (pid.isSome()) { + // Wipe the PID if this is an upgrade from PID to HTTP. + // TODO(benh): unlink(oldPid); + pid = None(); + } else if (http.isSome()) { + // Cleanup the old HTTP connection. + // Note that master creates a new HTTP connection for every + // subscribe request, so 'newHttp' should always be different + // from 'http'. + closeHttpConnection(); + } + + CHECK_NONE(http); + + http = newHttp; +} + + +void Framework::closeHttpConnection() +{ + CHECK_SOME(http); + + if (connected() && !http->close()) { + LOG(WARNING) << "Failed to close HTTP pipe for " << *this; + } + + http = None(); + + CHECK_SOME(heartbeater); + + terminate(heartbeater->get()); + wait(heartbeater->get()); + + heartbeater = None(); +} + + +void Framework::heartbeat() +{ + CHECK_NONE(heartbeater); + CHECK_SOME(http); + + // TODO(vinod): Make heartbeat interval configurable and include + // this information in the SUBSCRIBED response. + scheduler::Event event; + event.set_type(scheduler::Event::HEARTBEAT); + + heartbeater = + new Heartbeater<scheduler::Event, v1::scheduler::Event>( + "framework " + stringify(info.id()), + event, + http.get(), + DEFAULT_HEARTBEAT_INTERVAL, + None(), + [this](const scheduler::Event& event) { + this->metrics.incrementEvent(event); + }); + + process::spawn(heartbeater->get()); +} + + +bool Framework::active() const +{ + return state == ACTIVE; +} + + +bool Framework::connected() const +{ + return state == ACTIVE || state == INACTIVE; +} + + +bool Framework::recovered() const +{ + return state == RECOVERED; +} + + +Framework::Framework( + Master* const _master, + const Flags& masterFlags, + const FrameworkInfo& _info, + State state, + const process::Time& time) + : master(_master), + info(_info), + roles(protobuf::framework::getRoles(_info)), + capabilities(_info.capabilities()), + state(state), + registeredTime(time), + reregisteredTime(time), + completedTasks(masterFlags.max_completed_tasks_per_framework), + unreachableTasks(masterFlags.max_unreachable_tasks_per_framework), + metrics(_info) +{ + CHECK(_info.has_id()); + + setFrameworkState(state); + + foreach (const std::string& role, roles) { + // NOTE: It's possible that we're already being tracked under the role + // because a framework can unsubscribe from a role while it still has + // resources allocated to the role. + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } +} + + +bool Framework::isTrackedUnderRole(const std::string& role) const +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + return master->roles.contains(role) && + master->roles.at(role)->frameworks.contains(id()); +} + + +void Framework::trackUnderRole(const std::string& role) +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + CHECK(!isTrackedUnderRole(role)); + + if (!master->roles.contains(role)) { + master->roles[role] = new Role(role); + } + master->roles.at(role)->addFramework(this); +} + + +void Framework::untrackUnderRole(const std::string& role) +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + CHECK(isTrackedUnderRole(role)); + + // NOTE: Ideally we would also `CHECK` that we're not currently subscribed + // to the role. We don't do this currently because this function is used in + // `Master::removeFramework` where we're still subscribed to `roles`. + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + CHECK(totalUsedResources.filter(allocatedToRole).empty()); + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + + master->roles.at(role)->removeFramework(this); + if (master->roles.at(role)->frameworks.empty()) { + delete master->roles.at(role); + master->roles.erase(role); + } +} + + +void Framework::setFrameworkState(const Framework::State& _state) +{ + state = _state; + metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0; +} + +} // namespace master { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 192fe82..21de973 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -408,63 +408,6 @@ struct BoundedRateLimiter }; -bool Framework::isTrackedUnderRole(const string& role) const -{ - CHECK(master->isWhitelistedRole(role)) - << "Unknown role '" << role << "'" << " of framework " << *this; - - return master->roles.contains(role) && - master->roles.at(role)->frameworks.contains(id()); -} - - -void Framework::trackUnderRole(const string& role) -{ - CHECK(master->isWhitelistedRole(role)) - << "Unknown role '" << role << "'" << " of framework " << *this; - - CHECK(!isTrackedUnderRole(role)); - - if (!master->roles.contains(role)) { - master->roles[role] = new Role(role); - } - master->roles.at(role)->addFramework(this); -} - - -void Framework::untrackUnderRole(const string& role) -{ - CHECK(master->isWhitelistedRole(role)) - << "Unknown role '" << role << "'" << " of framework " << *this; - - CHECK(isTrackedUnderRole(role)); - - // NOTE: Ideally we would also `CHECK` that we're not currently subscribed - // to the role. We don't do this currently because this function is used in - // `Master::removeFramework` where we're still subscribed to `roles`. - - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; - - CHECK(totalUsedResources.filter(allocatedToRole).empty()); - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - - master->roles.at(role)->removeFramework(this); - if (master->roles.at(role)->frameworks.empty()) { - delete master->roles.at(role); - master->roles.erase(role); - } -} - - -void Framework::setFrameworkState(const Framework::State& _state) -{ - state = _state; - metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0; -} - - void Master::initialize() { LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")" http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 45ffedb..ce1a7e1 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -67,7 +67,6 @@ #include <stout/uuid.hpp> #include "common/http.hpp" -#include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" #include "files/files.hpp" @@ -2289,101 +2288,23 @@ struct Framework const Flags& masterFlags, const FrameworkInfo& info, const process::UPID& _pid, - const process::Time& time = process::Clock::now()) - : Framework(master, masterFlags, info, ACTIVE, time) - { - pid = _pid; - } + const process::Time& time = process::Clock::now()); Framework(Master* const master, const Flags& masterFlags, const FrameworkInfo& info, const HttpConnection& _http, - const process::Time& time = process::Clock::now()) - : Framework(master, masterFlags, info, ACTIVE, time) - { - http = _http; - } + const process::Time& time = process::Clock::now()); Framework(Master* const master, const Flags& masterFlags, - const FrameworkInfo& info) - : Framework(master, masterFlags, info, RECOVERED, process::Time()) {} - - ~Framework() - { - if (http.isSome()) { - closeHttpConnection(); - } - } - - Task* getTask(const TaskID& taskId) - { - if (tasks.count(taskId) > 0) { - return tasks[taskId]; - } - - return nullptr; - } - - void addTask(Task* task) - { - CHECK(!tasks.contains(task->task_id())) - << "Duplicate task " << task->task_id() - << " of framework " << task->framework_id(); - - // Verify that Resource.AllocationInfo is set, - // this should be guaranteed by the master. - foreach (const Resource& resource, task->resources()) { - CHECK(resource.has_allocation_info()); - } + const FrameworkInfo& info); - tasks[task->task_id()] = task; + ~Framework(); - // Unreachable tasks should be added via `addUnreachableTask`. - CHECK(task->state() != TASK_UNREACHABLE) - << "Task '" << task->task_id() << "' of framework " << id() - << " added in TASK_UNREACHABLE state"; + Task* getTask(const TaskID& taskId); - // Since we track terminal but unacknowledged tasks within - // `tasks` rather than `completedTasks`, we need to handle - // them here: don't count them as consuming resources. - // - // TODO(bmahler): Users currently get confused because - // terminal tasks can show up as "active" tasks in the UI and - // endpoints. Ideally, we show the terminal unacknowledged - // tasks as "completed" as well. - if (!protobuf::isTerminalState(task->state())) { - // Note that we explicitly convert from protobuf to `Resources` once - // and then use the result for calculations to avoid performance penalty - // for multiple conversions and validations implied by `+=` with protobuf - // arguments. - // Conversion is safe, as resources have already passed validation. - const Resources resources = task->resources(); - totalUsedResources += resources; - usedResources[task->slave_id()] += resources; - - // It's possible that we're not tracking the task's role for - // this framework if the role is absent from the framework's - // set of roles. In this case, we track the role's allocation - // for this framework. - CHECK(!task->resources().empty()); - const std::string& role = - task->resources().begin()->allocation_info().role(); - - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); - } - } - - metrics.incrementTaskState(task->state()); - - if (!master->subscribers.subscribed.empty()) { - master->subscribers.send( - protobuf::master::event::createTaskAdded(*task), - info); - } - } + void addTask(Task* task); // Update framework to recover the resources that were previously // being used by `task`. @@ -2391,35 +2312,7 @@ struct Framework // TODO(bmahler): This is a hack for performance. We need to // maintain resource counters because computing task resources // functionally for all tasks is expensive, for now. - void recoverResources(Task* task) - { - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); - - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } - - // If we are no longer subscribed to the role to which these resources are - // being returned to, and we have no more resources allocated to us for that - // role, stop tracking the framework under the role. - CHECK(!task->resources().empty()); - const std::string& role = - task->resources().begin()->allocation_info().role(); - - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; - - if (roles.count(role) == 0 && - totalUsedResources.filter(allocatedToRole).empty()) { - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - untrackUnderRole(role); - } - } + void recoverResources(Task* task); // Sends a message to the connected framework. template <typename Message> @@ -2445,492 +2338,63 @@ struct Framework } } - void addCompletedTask(Task&& task) - { - // TODO(neilc): We currently allow frameworks to reuse the task - // IDs of completed tasks (although this is discouraged). This - // means that there might be multiple completed tasks with the - // same task ID. We should consider rejecting attempts to reuse - // task IDs (MESOS-6779). - completedTasks.push_back(process::Owned<Task>(new Task(std::move(task)))); - } + void addCompletedTask(Task&& task); - void addUnreachableTask(const Task& task) - { - // TODO(adam-mesos): Check if unreachable task already exists. - unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task))); - } + void addUnreachableTask(const Task& task); // Removes the task. `unreachable` indicates whether the task is removed due // to being unreachable. Note that we cannot rely on the task state because // it may not reflect unreachability due to being set to TASK_LOST for // backwards compatibility. - void removeTask(Task* task, bool unreachable) - { - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); - - // The invariant here is that the master will have already called - // `recoverResources()` prior to removing terminal or unreachable tasks. - if (!protobuf::isTerminalState(task->state()) && - task->state() != TASK_UNREACHABLE) { - recoverResources(task); - } - - if (unreachable) { - addUnreachableTask(*task); - } else { - CHECK(task->state() != TASK_UNREACHABLE); - - // TODO(bmahler): This moves a potentially non-terminal task into - // the completed list! - addCompletedTask(Task(*task)); - } - - tasks.erase(task->task_id()); - } + void removeTask(Task* task, bool unreachable); - void addOffer(Offer* offer) - { - CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); - offers.insert(offer); - totalOfferedResources += offer->resources(); - offeredResources[offer->slave_id()] += offer->resources(); - } - - void removeOffer(Offer* offer) - { - CHECK(offers.find(offer) != offers.end()) - << "Unknown offer " << offer->id(); - - totalOfferedResources -= offer->resources(); - offeredResources[offer->slave_id()] -= offer->resources(); - if (offeredResources[offer->slave_id()].empty()) { - offeredResources.erase(offer->slave_id()); - } - - offers.erase(offer); - } + void addOffer(Offer* offer); - void addInverseOffer(InverseOffer* inverseOffer) - { - CHECK(!inverseOffers.contains(inverseOffer)) - << "Duplicate inverse offer " << inverseOffer->id(); - inverseOffers.insert(inverseOffer); - } + void removeOffer(Offer* offer); - void removeInverseOffer(InverseOffer* inverseOffer) - { - CHECK(inverseOffers.contains(inverseOffer)) - << "Unknown inverse offer " << inverseOffer->id(); + void addInverseOffer(InverseOffer* inverseOffer); - inverseOffers.erase(inverseOffer); - } + void removeInverseOffer(InverseOffer* inverseOffer); bool hasExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - return executors.contains(slaveId) && - executors[slaveId].contains(executorId); - } + const ExecutorID& executorId); void addExecutor(const SlaveID& slaveId, - const ExecutorInfo& executorInfo) - { - CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) - << "Duplicate executor '" << executorInfo.executor_id() - << "' on agent " << slaveId; - - // Verify that Resource.AllocationInfo is set, - // this should be guaranteed by the master. - foreach (const Resource& resource, executorInfo.resources()) { - CHECK(resource.has_allocation_info()); - } - - executors[slaveId][executorInfo.executor_id()] = executorInfo; - totalUsedResources += executorInfo.resources(); - usedResources[slaveId] += executorInfo.resources(); - - // It's possible that we're not tracking the task's role for - // this framework if the role is absent from the framework's - // set of roles. In this case, we track the role's allocation - // for this framework. - if (!executorInfo.resources().empty()) { - const std::string& role = - executorInfo.resources().begin()->allocation_info().role(); - - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); - } - } - } + const ExecutorInfo& executorInfo); void removeExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - CHECK(hasExecutor(slaveId, executorId)) - << "Unknown executor '" << executorId - << "' of framework " << id() - << " of agent " << slaveId; - - const ExecutorInfo& executorInfo = executors[slaveId][executorId]; - - totalUsedResources -= executorInfo.resources(); - usedResources[slaveId] -= executorInfo.resources(); - if (usedResources[slaveId].empty()) { - usedResources.erase(slaveId); - } - - // If we are no longer subscribed to the role to which these resources are - // being returned to, and we have no more resources allocated to us for that - // role, stop tracking the framework under the role. - if (!executorInfo.resources().empty()) { - const std::string& role = - executorInfo.resources().begin()->allocation_info().role(); - - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; - - if (roles.count(role) == 0 && - totalUsedResources.filter(allocatedToRole).empty()) { - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - untrackUnderRole(role); - } - } - - executors[slaveId].erase(executorId); - if (executors[slaveId].empty()) { - executors.erase(slaveId); - } - } - - void addOperation(Operation* operation) - { - CHECK(operation->has_framework_id()); - - const FrameworkID& frameworkId = operation->framework_id(); - - const UUID& uuid = operation->uuid(); - - CHECK(!operations.contains(uuid)) - << "Duplicate operation '" << operation->info().id() - << "' (uuid: " << uuid << ") " - << "of framework " << frameworkId; - - operations.put(uuid, operation); - - if (operation->info().has_id()) { - operationUUIDs.put(operation->info().id(), uuid); - } - - if (!protobuf::isSpeculativeOperation(operation->info()) && - !protobuf::isTerminalState(operation->latest_status().state())) { - Try<Resources> consumed = - protobuf::getConsumedResources(operation->info()); - CHECK_SOME(consumed); - - CHECK(operation->has_slave_id()) - << "External resource provider is not supported yet"; - - const SlaveID& slaveId = operation->slave_id(); - - totalUsedResources += consumed.get(); - usedResources[slaveId] += consumed.get(); - - // It's possible that we're not tracking the role from the - // resources in the operation for this framework if the role is - // absent from the framework's set of roles. In this case, we - // track the role's allocation for this framework. - foreachkey (const std::string& role, consumed->allocations()) { - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); - } - } - } - } - - Option<Operation*> getOperation(const OperationID& id) { - Option<UUID> uuid = operationUUIDs.get(id); + const ExecutorID& executorId); - if (uuid.isNone()) { - return None(); - } - - Option<Operation*> operation = operations.get(uuid.get()); - - CHECK_SOME(operation); - - return operation; - } - - void recoverResources(Operation* operation) - { - CHECK(operation->has_slave_id()) - << "External resource provider is not supported yet"; - - const SlaveID& slaveId = operation->slave_id(); - - if (protobuf::isSpeculativeOperation(operation->info())) { - return; - } - - Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); - CHECK_SOME(consumed); - - CHECK(totalUsedResources.contains(consumed.get())) - << "Tried to recover resources " << consumed.get() - << " which do not seem used"; - - CHECK(usedResources[slaveId].contains(consumed.get())) - << "Tried to recover resources " << consumed.get() << " of agent " - << slaveId << " which do not seem used"; - - totalUsedResources -= consumed.get(); - usedResources[slaveId] -= consumed.get(); - if (usedResources[slaveId].empty()) { - usedResources.erase(slaveId); - } - - // If we are no longer subscribed to the role to which these - // resources are being returned to, and we have no more resources - // allocated to us for that role, stop tracking the framework - // under the role. - foreachkey (const std::string& role, consumed->allocations()) { - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; - - if (roles.count(role) == 0 && - totalUsedResources.filter(allocatedToRole).empty()) { - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - untrackUnderRole(role); - } - } - } - - void removeOperation(Operation* operation) - { - const UUID& uuid = operation->uuid(); - - CHECK(operations.contains(uuid)) - << "Unknown operation '" << operation->info().id() - << "' (uuid: " << uuid << ") " - << "of framework " << operation->framework_id(); + void addOperation(Operation* operation); - if (!protobuf::isSpeculativeOperation(operation->info()) && - !protobuf::isTerminalState(operation->latest_status().state())) { - recoverResources(operation); - } + Option<Operation*> getOperation(const OperationID& id); - if (operation->info().has_id()) { - operationUUIDs.erase(operation->info().id()); - } + void recoverResources(Operation* operation); - operations.erase(uuid); - } + void removeOperation(Operation* operation); - const FrameworkID id() const { return info.id(); } + const FrameworkID id() const; // Update fields in 'info' using those in 'newInfo'. Currently this // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname', // 'webui_url', 'capabilities', and 'labels'. - void update(const FrameworkInfo& newInfo) - { - // We only merge 'info' from the same framework 'id'. - CHECK_EQ(info.id(), newInfo.id()); - - // Save the old list of roles for later. - std::set<std::string> oldRoles = roles; - - // TODO(jmlvanre): Merge other fields as per design doc in - // MESOS-703. - - info.clear_role(); - info.clear_roles(); - - if (newInfo.has_role()) { - info.set_role(newInfo.role()); - } - - if (newInfo.roles_size() > 0) { - info.mutable_roles()->CopyFrom(newInfo.roles()); - } + void update(const FrameworkInfo& newInfo); - roles = protobuf::framework::getRoles(newInfo); + void updateConnection(const process::UPID& newPid); - if (newInfo.user() != info.user()) { - LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user() - << "' for framework " << id() << ". Check MESOS-703"; - } - - info.set_name(newInfo.name()); - - if (newInfo.has_failover_timeout()) { - info.set_failover_timeout(newInfo.failover_timeout()); - } else { - info.clear_failover_timeout(); - } - - if (newInfo.checkpoint() != info.checkpoint()) { - LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '" - << stringify(newInfo.checkpoint()) << "' for framework " - << id() << ". Check MESOS-703"; - } - - if (newInfo.has_hostname()) { - info.set_hostname(newInfo.hostname()); - } else { - info.clear_hostname(); - } - - if (newInfo.principal() != info.principal()) { - LOG(WARNING) << "Cannot update FrameworkInfo.principal to '" - << newInfo.principal() << "' for framework " << id() - << ". Check MESOS-703"; - } - - if (newInfo.has_webui_url()) { - info.set_webui_url(newInfo.webui_url()); - } else { - info.clear_webui_url(); - } - - if (newInfo.capabilities_size() > 0) { - info.mutable_capabilities()->CopyFrom(newInfo.capabilities()); - } else { - info.clear_capabilities(); - } - capabilities = protobuf::framework::Capabilities(info.capabilities()); - - if (newInfo.has_labels()) { - info.mutable_labels()->CopyFrom(newInfo.labels()); - } else { - info.clear_labels(); - } - - const std::set<std::string>& newRoles = roles; - - const std::set<std::string> removedRoles = [&]() { - std::set<std::string> result = oldRoles; - foreach (const std::string& role, newRoles) { - result.erase(role); - } - return result; - }(); - - foreach (const std::string& role, removedRoles) { - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; - - // Stop tracking the framework under this role if there are - // no longer any resources allocated to it. - if (totalUsedResources.filter(allocatedToRole).empty()) { - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - untrackUnderRole(role); - } - } - - const std::set<std::string> addedRoles = [&]() { - std::set<std::string> result = newRoles; - foreach (const std::string& role, oldRoles) { - result.erase(role); - } - return result; - }(); - - foreach (const std::string& role, addedRoles) { - // NOTE: It's possible that we're already tracking this framework - // under the role because a framework can unsubscribe from a role - // while it still has resources allocated to the role. - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); - } - } - } - - void updateConnection(const process::UPID& newPid) - { - // Cleanup the HTTP connnection if this is a downgrade from HTTP - // to PID. Note that the connection may already be closed. - if (http.isSome()) { - closeHttpConnection(); - } - - // TODO(benh): unlink(oldPid); - pid = newPid; - } - - void updateConnection(const HttpConnection& newHttp) - { - if (pid.isSome()) { - // Wipe the PID if this is an upgrade from PID to HTTP. - // TODO(benh): unlink(oldPid); - pid = None(); - } else if (http.isSome()) { - // Cleanup the old HTTP connection. - // Note that master creates a new HTTP connection for every - // subscribe request, so 'newHttp' should always be different - // from 'http'. - closeHttpConnection(); - } - - CHECK_NONE(http); - - http = newHttp; - } + void updateConnection(const HttpConnection& newHttp); // Closes the HTTP connection and stops the heartbeat. // // TODO(vinod): Currently `state` variable is set separately // from this method. We need to make sure these are in sync. - void closeHttpConnection() - { - CHECK_SOME(http); - - if (connected() && !http->close()) { - LOG(WARNING) << "Failed to close HTTP pipe for " << *this; - } - - http = None(); + void closeHttpConnection(); - CHECK_SOME(heartbeater); + void heartbeat(); - terminate(heartbeater->get()); - wait(heartbeater->get()); - - heartbeater = None(); - } - - void heartbeat() - { - CHECK_NONE(heartbeater); - CHECK_SOME(http); - - // TODO(vinod): Make heartbeat interval configurable and include - // this information in the SUBSCRIBED response. - scheduler::Event event; - event.set_type(scheduler::Event::HEARTBEAT); - - heartbeater = - new Heartbeater<scheduler::Event, v1::scheduler::Event>( - "framework " + stringify(info.id()), - event, - http.get(), - DEFAULT_HEARTBEAT_INTERVAL, - None(), - [this](const scheduler::Event& event) { - this->metrics.incrementEvent(event); - }); - - process::spawn(heartbeater->get()); - } - - bool active() const { return state == ACTIVE; } - bool connected() const { return state == ACTIVE || state == INACTIVE; } - bool recovered() const { return state == RECOVERED; } + bool active() const; + bool connected() const; + bool recovered() const; bool isTrackedUnderRole(const std::string& role) const; void trackUnderRole(const std::string& role); @@ -3042,30 +2506,7 @@ private: const Flags& masterFlags, const FrameworkInfo& _info, State state, - const process::Time& time) - : master(_master), - info(_info), - roles(protobuf::framework::getRoles(_info)), - capabilities(_info.capabilities()), - registeredTime(time), - reregisteredTime(time), - completedTasks(masterFlags.max_completed_tasks_per_framework), - unreachableTasks(masterFlags.max_unreachable_tasks_per_framework), - metrics(_info) - { - CHECK(_info.has_id()); - - setFrameworkState(state); - - foreach (const std::string& role, roles) { - // NOTE: It's possible that we're already being tracked under the role - // because a framework can unsubscribe from a role while it still has - // resources allocated to the role. - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); - } - } - } + const process::Time& time); Framework(const Framework&); // No copying. Framework& operator=(const Framework&); // No assigning.
