Repository: mesos
Updated Branches:
  refs/heads/master 9457dce1d -> 72878f8c2


Moved Framework implementation into separate file.

This change aims to reduce compile time and cognitive
load when browsing the "master.hpp" header.

Review: https://reviews.apache.org/r/68141/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cbba52
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cbba52
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cbba52

Branch: refs/heads/master
Commit: 42cbba524f8cb4cea63919cf4eea8d2445f03a6e
Parents: 9457dce
Author: Benno Evers <[email protected]>
Authored: Thu Aug 2 15:24:39 2018 +0200
Committer: Alexander Rukletsov <[email protected]>
Committed: Thu Aug 2 15:28:07 2018 +0200

----------------------------------------------------------------------
 src/CMakeLists.txt       |   1 +
 src/Makefile.am          |   5 +-
 src/master/framework.cpp | 773 ++++++++++++++++++++++++++++++++++++++++++
 src/master/master.cpp    |  57 ----
 src/master/master.hpp    | 621 ++-------------------------------
 5 files changed, 808 insertions(+), 649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 398ffdd..0e0913b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -371,6 +371,7 @@ set(LOGGING_SRC
 set(MASTER_SRC
   master/constants.cpp
   master/flags.cpp
+  master/framework.cpp
   master/http.cpp
   master/maintenance.cpp
   master/master.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2ad719e..10ef977 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1016,8 +1016,9 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   local/local.cpp                                                      \
   logging/flags.cpp                                                    \
   logging/logging.cpp                                                  \
-  master/constants.cpp              \
+  master/constants.cpp                                                 \
   master/flags.cpp                                                     \
+  master/framework.cpp                                                 \
   master/http.cpp                                                      \
   master/maintenance.cpp                                               \
   master/master.cpp                                                    \
@@ -1103,7 +1104,7 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   slave/containerizer/mesos/provisioner/appc/paths.cpp                 \
   slave/containerizer/mesos/provisioner/appc/store.cpp                 \
   slave/containerizer/mesos/provisioner/backends/copy.cpp              \
-  slave/containerizer/mesos/provisioner/docker/image_tar_puller.cpp            
\
+  slave/containerizer/mesos/provisioner/docker/image_tar_puller.cpp    \
   slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp    \
   slave/containerizer/mesos/provisioner/docker/paths.cpp               \
   slave/containerizer/mesos/provisioner/docker/puller.cpp              \

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/framework.cpp
----------------------------------------------------------------------
diff --git a/src/master/framework.cpp b/src/master/framework.cpp
new file mode 100644
index 0000000..d17d7dd
--- /dev/null
+++ b/src/master/framework.cpp
@@ -0,0 +1,773 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "master/master.hpp"
+
+#include "common/protobuf_utils.hpp"
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+Framework::Framework(
+    Master* const master,
+    const Flags& masterFlags,
+    const FrameworkInfo& info,
+    const process::UPID& _pid,
+    const process::Time& time)
+  : Framework(master, masterFlags, info, ACTIVE, time)
+{
+  pid = _pid;
+}
+
+
+Framework::Framework(
+    Master* const master,
+    const Flags& masterFlags,
+    const FrameworkInfo& info,
+    const HttpConnection& _http,
+    const process::Time& time)
+  : Framework(master, masterFlags, info, ACTIVE, time)
+{
+  http = _http;
+}
+
+
+Framework::Framework(
+    Master* const master,
+    const Flags& masterFlags,
+    const FrameworkInfo& info)
+  : Framework(master, masterFlags, info, RECOVERED, process::Time())
+{}
+
+
+Framework::~Framework()
+{
+  if (http.isSome()) {
+    closeHttpConnection();
+  }
+}
+
+
+Task* Framework::getTask(const TaskID& taskId)
+{
+  if (tasks.count(taskId) > 0) {
+    return tasks[taskId];
+  }
+
+  return nullptr;
+}
+
+
+void Framework::addTask(Task* task)
+{
+  CHECK(!tasks.contains(task->task_id()))
+    << "Duplicate task " << task->task_id()
+    << " of framework " << task->framework_id();
+
+  // Verify that Resource.AllocationInfo is set,
+  // this should be guaranteed by the master.
+  foreach (const Resource& resource, task->resources()) {
+    CHECK(resource.has_allocation_info());
+  }
+
+  tasks[task->task_id()] = task;
+
+  // Unreachable tasks should be added via `addUnreachableTask`.
+  CHECK(task->state() != TASK_UNREACHABLE)
+    << "Task '" << task->task_id() << "' of framework " << id()
+    << " added in TASK_UNREACHABLE state";
+
+  // Since we track terminal but unacknowledged tasks within
+  // `tasks` rather than `completedTasks`, we need to handle
+  // them here: don't count them as consuming resources.
+  //
+  // TODO(bmahler): Users currently get confused because
+  // terminal tasks can show up as "active" tasks in the UI and
+  // endpoints. Ideally, we show the terminal unacknowledged
+  // tasks as "completed" as well.
+  if (!protobuf::isTerminalState(task->state())) {
+    // Note that we explicitly convert from protobuf to `Resources` once
+    // and then use the result for calculations to avoid performance penalty
+    // for multiple conversions and validations implied by `+=` with protobuf
+    // arguments.
+    // Conversion is safe, as resources have already passed validation.
+    const Resources resources = task->resources();
+    totalUsedResources += resources;
+    usedResources[task->slave_id()] += resources;
+
+    // It's possible that we're not tracking the task's role for
+    // this framework if the role is absent from the framework's
+    // set of roles. In this case, we track the role's allocation
+    // for this framework.
+    CHECK(!task->resources().empty());
+    const std::string& role =
+      task->resources().begin()->allocation_info().role();
+
+    if (!isTrackedUnderRole(role)) {
+      trackUnderRole(role);
+    }
+  }
+
+  metrics.incrementTaskState(task->state());
+
+  if (!master->subscribers.subscribed.empty()) {
+    master->subscribers.send(
+        protobuf::master::event::createTaskAdded(*task),
+        info);
+  }
+}
+
+
+void Framework::recoverResources(Task* task)
+{
+  CHECK(tasks.contains(task->task_id()))
+    << "Unknown task " << task->task_id()
+    << " of framework " << task->framework_id();
+
+  totalUsedResources -= task->resources();
+  usedResources[task->slave_id()] -= task->resources();
+  if (usedResources[task->slave_id()].empty()) {
+    usedResources.erase(task->slave_id());
+  }
+
+  // If we are no longer subscribed to the role to which these resources are
+  // being returned to, and we have no more resources allocated to us for that
+  // role, stop tracking the framework under the role.
+  CHECK(!task->resources().empty());
+  const std::string& role =
+    task->resources().begin()->allocation_info().role();
+
+  auto allocatedToRole = [&role](const Resource& resource) {
+    return resource.allocation_info().role() == role;
+  };
+
+  if (roles.count(role) == 0 &&
+      totalUsedResources.filter(allocatedToRole).empty()) {
+    CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+    untrackUnderRole(role);
+  }
+}
+
+
+void Framework::addCompletedTask(Task&& task)
+{
+  // TODO(neilc): We currently allow frameworks to reuse the task
+  // IDs of completed tasks (although this is discouraged). This
+  // means that there might be multiple completed tasks with the
+  // same task ID. We should consider rejecting attempts to reuse
+  // task IDs (MESOS-6779).
+  completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
+}
+
+
+void Framework::addUnreachableTask(const Task& task)
+{
+  // TODO(adam-mesos): Check if unreachable task already exists.
+  unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
+}
+
+
+void Framework::removeTask(Task* task, bool unreachable)
+{
+  CHECK(tasks.contains(task->task_id()))
+    << "Unknown task " << task->task_id()
+    << " of framework " << task->framework_id();
+
+  // The invariant here is that the master will have already called
+  // `recoverResources()` prior to removing terminal or unreachable tasks.
+  if (!protobuf::isTerminalState(task->state()) &&
+      task->state() != TASK_UNREACHABLE) {
+    recoverResources(task);
+  }
+
+  if (unreachable) {
+    addUnreachableTask(*task);
+  } else {
+    CHECK(task->state() != TASK_UNREACHABLE);
+
+    // TODO(bmahler): This moves a potentially non-terminal task into
+    // the completed list!
+    addCompletedTask(Task(*task));
+  }
+
+  tasks.erase(task->task_id());
+}
+
+
+void Framework::addOffer(Offer* offer)
+{
+  CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+  offers.insert(offer);
+  totalOfferedResources += offer->resources();
+  offeredResources[offer->slave_id()] += offer->resources();
+}
+
+
+void Framework::removeOffer(Offer* offer)
+{
+  CHECK(offers.find(offer) != offers.end())
+    << "Unknown offer " << offer->id();
+
+  totalOfferedResources -= offer->resources();
+  offeredResources[offer->slave_id()] -= offer->resources();
+  if (offeredResources[offer->slave_id()].empty()) {
+    offeredResources.erase(offer->slave_id());
+  }
+
+  offers.erase(offer);
+}
+
+
+void Framework::addInverseOffer(InverseOffer* inverseOffer)
+{
+  CHECK(!inverseOffers.contains(inverseOffer))
+    << "Duplicate inverse offer " << inverseOffer->id();
+  inverseOffers.insert(inverseOffer);
+}
+
+
+void Framework::removeInverseOffer(InverseOffer* inverseOffer)
+{
+  CHECK(inverseOffers.contains(inverseOffer))
+    << "Unknown inverse offer " << inverseOffer->id();
+
+  inverseOffers.erase(inverseOffer);
+}
+
+
+bool Framework::hasExecutor(
+    const SlaveID& slaveId,
+    const ExecutorID& executorId)
+{
+  return executors.contains(slaveId) &&
+    executors[slaveId].contains(executorId);
+}
+
+
+void Framework::addExecutor(
+    const SlaveID& slaveId,
+    const ExecutorInfo& executorInfo)
+{
+  CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
+    << "Duplicate executor '" << executorInfo.executor_id()
+    << "' on agent " << slaveId;
+
+  // Verify that Resource.AllocationInfo is set,
+  // this should be guaranteed by the master.
+  foreach (const Resource& resource, executorInfo.resources()) {
+    CHECK(resource.has_allocation_info());
+  }
+
+  executors[slaveId][executorInfo.executor_id()] = executorInfo;
+  totalUsedResources += executorInfo.resources();
+  usedResources[slaveId] += executorInfo.resources();
+
+  // It's possible that we're not tracking the task's role for
+  // this framework if the role is absent from the framework's
+  // set of roles. In this case, we track the role's allocation
+  // for this framework.
+  if (!executorInfo.resources().empty()) {
+    const std::string& role =
+      executorInfo.resources().begin()->allocation_info().role();
+
+    if (!isTrackedUnderRole(role)) {
+      trackUnderRole(role);
+    }
+  }
+}
+
+
+void Framework::removeExecutor(
+    const SlaveID& slaveId,
+    const ExecutorID& executorId)
+{
+  CHECK(hasExecutor(slaveId, executorId))
+    << "Unknown executor '" << executorId
+    << "' of framework " << id()
+    << " of agent " << slaveId;
+
+  const ExecutorInfo& executorInfo = executors[slaveId][executorId];
+
+  totalUsedResources -= executorInfo.resources();
+  usedResources[slaveId] -= executorInfo.resources();
+  if (usedResources[slaveId].empty()) {
+    usedResources.erase(slaveId);
+  }
+
+  // If we are no longer subscribed to the role to which these resources are
+  // being returned to, and we have no more resources allocated to us for that
+  // role, stop tracking the framework under the role.
+  if (!executorInfo.resources().empty()) {
+    const std::string& role =
+      executorInfo.resources().begin()->allocation_info().role();
+
+    auto allocatedToRole = [&role](const Resource& resource) {
+      return resource.allocation_info().role() == role;
+    };
+
+    if (roles.count(role) == 0 &&
+        totalUsedResources.filter(allocatedToRole).empty()) {
+      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+      untrackUnderRole(role);
+    }
+  }
+
+  executors[slaveId].erase(executorId);
+  if (executors[slaveId].empty()) {
+    executors.erase(slaveId);
+  }
+}
+
+
+void Framework::addOperation(Operation* operation)
+{
+  CHECK(operation->has_framework_id());
+
+  const FrameworkID& frameworkId = operation->framework_id();
+
+  const UUID& uuid = operation->uuid();
+
+  CHECK(!operations.contains(uuid))
+    << "Duplicate operation '" << operation->info().id()
+    << "' (uuid: " << uuid << ") "
+    << "of framework " << frameworkId;
+
+  operations.put(uuid, operation);
+
+  if (operation->info().has_id()) {
+    operationUUIDs.put(operation->info().id(), uuid);
+  }
+
+  if (!protobuf::isSpeculativeOperation(operation->info()) &&
+      !protobuf::isTerminalState(operation->latest_status().state())) {
+    Try<Resources> consumed =
+      protobuf::getConsumedResources(operation->info());
+    CHECK_SOME(consumed);
+
+    CHECK(operation->has_slave_id())
+      << "External resource provider is not supported yet";
+
+    const SlaveID& slaveId = operation->slave_id();
+
+    totalUsedResources += consumed.get();
+    usedResources[slaveId] += consumed.get();
+
+    // It's possible that we're not tracking the role from the
+    // resources in the operation for this framework if the role is
+    // absent from the framework's set of roles. In this case, we
+    // track the role's allocation for this framework.
+    foreachkey (const std::string& role, consumed->allocations()) {
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
+    }
+  }
+}
+
+
+Option<Operation*> Framework::getOperation(const OperationID& id)
+{
+  Option<UUID> uuid = operationUUIDs.get(id);
+
+  if (uuid.isNone()) {
+    return None();
+  }
+
+  Option<Operation*> operation = operations.get(uuid.get());
+
+  CHECK_SOME(operation);
+
+  return operation;
+}
+
+
+void Framework::recoverResources(Operation* operation)
+{
+  CHECK(operation->has_slave_id())
+    << "External resource provider is not supported yet";
+
+  const SlaveID& slaveId = operation->slave_id();
+
+  if (protobuf::isSpeculativeOperation(operation->info())) {
+    return;
+  }
+
+  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
+  CHECK_SOME(consumed);
+
+  CHECK(totalUsedResources.contains(consumed.get()))
+    << "Tried to recover resources " << consumed.get()
+    << " which do not seem used";
+
+  CHECK(usedResources[slaveId].contains(consumed.get()))
+    << "Tried to recover resources " << consumed.get() << " of agent "
+    << slaveId << " which do not seem used";
+
+  totalUsedResources -= consumed.get();
+  usedResources[slaveId] -= consumed.get();
+  if (usedResources[slaveId].empty()) {
+    usedResources.erase(slaveId);
+  }
+
+  // If we are no longer subscribed to the role to which these
+  // resources are being returned to, and we have no more resources
+  // allocated to us for that role, stop tracking the framework
+  // under the role.
+  foreachkey (const std::string& role, consumed->allocations()) {
+    auto allocatedToRole = [&role](const Resource& resource) {
+      return resource.allocation_info().role() == role;
+    };
+
+    if (roles.count(role) == 0 &&
+        totalUsedResources.filter(allocatedToRole).empty()) {
+      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+      untrackUnderRole(role);
+    }
+  }
+}
+
+
+void Framework::removeOperation(Operation* operation)
+{
+  const UUID& uuid = operation->uuid();
+
+  CHECK(operations.contains(uuid))
+    << "Unknown operation '" << operation->info().id()
+    << "' (uuid: " << uuid << ") "
+    << "of framework " << operation->framework_id();
+
+  if (!protobuf::isSpeculativeOperation(operation->info()) &&
+      !protobuf::isTerminalState(operation->latest_status().state())) {
+    recoverResources(operation);
+  }
+
+  if (operation->info().has_id()) {
+    operationUUIDs.erase(operation->info().id());
+  }
+
+  operations.erase(uuid);
+}
+
+
+const FrameworkID Framework::id() const
+{
+  return info.id();
+}
+
+
+void Framework::update(const FrameworkInfo& newInfo)
+{
+  // We only merge 'info' from the same framework 'id'.
+  CHECK_EQ(info.id(), newInfo.id());
+
+  // Save the old list of roles for later.
+  std::set<std::string> oldRoles = roles;
+
+  // TODO(jmlvanre): Merge other fields as per design doc in
+  // MESOS-703.
+
+  info.clear_role();
+  info.clear_roles();
+
+  if (newInfo.has_role()) {
+    info.set_role(newInfo.role());
+  }
+
+  if (newInfo.roles_size() > 0) {
+    info.mutable_roles()->CopyFrom(newInfo.roles());
+  }
+
+  roles = protobuf::framework::getRoles(newInfo);
+
+  if (newInfo.user() != info.user()) {
+    LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
+                 << "' for framework " << id() << ". Check MESOS-703";
+  }
+
+  info.set_name(newInfo.name());
+
+  if (newInfo.has_failover_timeout()) {
+    info.set_failover_timeout(newInfo.failover_timeout());
+  } else {
+    info.clear_failover_timeout();
+  }
+
+  if (newInfo.checkpoint() != info.checkpoint()) {
+    LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
+                 << stringify(newInfo.checkpoint()) << "' for framework "
+                 << id() << ". Check MESOS-703";
+  }
+
+  if (newInfo.has_hostname()) {
+    info.set_hostname(newInfo.hostname());
+  } else {
+    info.clear_hostname();
+  }
+
+  if (newInfo.principal() != info.principal()) {
+    LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
+                 << newInfo.principal() << "' for framework " << id()
+                 << ". Check MESOS-703";
+  }
+
+  if (newInfo.has_webui_url()) {
+    info.set_webui_url(newInfo.webui_url());
+  } else {
+    info.clear_webui_url();
+  }
+
+  if (newInfo.capabilities_size() > 0) {
+    info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
+  } else {
+    info.clear_capabilities();
+  }
+  capabilities = protobuf::framework::Capabilities(info.capabilities());
+
+  if (newInfo.has_labels()) {
+    info.mutable_labels()->CopyFrom(newInfo.labels());
+  } else {
+    info.clear_labels();
+  }
+
+  const std::set<std::string>& newRoles = roles;
+
+  const std::set<std::string> removedRoles = [&]() {
+    std::set<std::string> result = oldRoles;
+    foreach (const std::string& role, newRoles) {
+      result.erase(role);
+    }
+    return result;
+  }();
+
+  foreach (const std::string& role, removedRoles) {
+    auto allocatedToRole = [&role](const Resource& resource) {
+      return resource.allocation_info().role() == role;
+    };
+
+    // Stop tracking the framework under this role if there are
+    // no longer any resources allocated to it.
+    if (totalUsedResources.filter(allocatedToRole).empty()) {
+      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+      untrackUnderRole(role);
+    }
+  }
+
+  const std::set<std::string> addedRoles = [&]() {
+    std::set<std::string> result = newRoles;
+    foreach (const std::string& role, oldRoles) {
+      result.erase(role);
+    }
+    return result;
+  }();
+
+  foreach (const std::string& role, addedRoles) {
+    // NOTE: It's possible that we're already tracking this framework
+    // under the role because a framework can unsubscribe from a role
+    // while it still has resources allocated to the role.
+    if (!isTrackedUnderRole(role)) {
+      trackUnderRole(role);
+    }
+  }
+}
+
+
+void Framework::updateConnection(const process::UPID& newPid)
+{
+  // Cleanup the HTTP connnection if this is a downgrade from HTTP
+  // to PID. Note that the connection may already be closed.
+  if (http.isSome()) {
+    closeHttpConnection();
+  }
+
+  // TODO(benh): unlink(oldPid);
+  pid = newPid;
+}
+
+
+void Framework::updateConnection(const HttpConnection& newHttp)
+{
+  if (pid.isSome()) {
+    // Wipe the PID if this is an upgrade from PID to HTTP.
+    // TODO(benh): unlink(oldPid);
+    pid = None();
+  } else if (http.isSome()) {
+    // Cleanup the old HTTP connection.
+    // Note that master creates a new HTTP connection for every
+    // subscribe request, so 'newHttp' should always be different
+    // from 'http'.
+    closeHttpConnection();
+  }
+
+  CHECK_NONE(http);
+
+  http = newHttp;
+}
+
+
+void Framework::closeHttpConnection()
+{
+  CHECK_SOME(http);
+
+  if (connected() && !http->close()) {
+    LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
+  }
+
+  http = None();
+
+  CHECK_SOME(heartbeater);
+
+  terminate(heartbeater->get());
+  wait(heartbeater->get());
+
+  heartbeater = None();
+}
+
+
+void Framework::heartbeat()
+{
+  CHECK_NONE(heartbeater);
+  CHECK_SOME(http);
+
+  // TODO(vinod): Make heartbeat interval configurable and include
+  // this information in the SUBSCRIBED response.
+  scheduler::Event event;
+  event.set_type(scheduler::Event::HEARTBEAT);
+
+  heartbeater =
+    new Heartbeater<scheduler::Event, v1::scheduler::Event>(
+        "framework " + stringify(info.id()),
+        event,
+        http.get(),
+        DEFAULT_HEARTBEAT_INTERVAL,
+        None(),
+        [this](const scheduler::Event& event) {
+          this->metrics.incrementEvent(event);
+        });
+
+  process::spawn(heartbeater->get());
+}
+
+
+bool Framework::active() const
+{
+  return state == ACTIVE;
+}
+
+
+bool Framework::connected() const
+{
+  return state == ACTIVE || state == INACTIVE;
+}
+
+
+bool Framework::recovered() const
+{
+  return state == RECOVERED;
+}
+
+
+Framework::Framework(
+    Master* const _master,
+    const Flags& masterFlags,
+    const FrameworkInfo& _info,
+    State state,
+    const process::Time& time)
+  : master(_master),
+    info(_info),
+    roles(protobuf::framework::getRoles(_info)),
+    capabilities(_info.capabilities()),
+    state(state),
+    registeredTime(time),
+    reregisteredTime(time),
+    completedTasks(masterFlags.max_completed_tasks_per_framework),
+    unreachableTasks(masterFlags.max_unreachable_tasks_per_framework),
+    metrics(_info)
+{
+  CHECK(_info.has_id());
+
+  setFrameworkState(state);
+
+  foreach (const std::string& role, roles) {
+    // NOTE: It's possible that we're already being tracked under the role
+    // because a framework can unsubscribe from a role while it still has
+    // resources allocated to the role.
+    if (!isTrackedUnderRole(role)) {
+      trackUnderRole(role);
+    }
+  }
+}
+
+
+bool Framework::isTrackedUnderRole(const std::string& role) const
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  return master->roles.contains(role) &&
+         master->roles.at(role)->frameworks.contains(id());
+}
+
+
+void Framework::trackUnderRole(const std::string& role)
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  CHECK(!isTrackedUnderRole(role));
+
+  if (!master->roles.contains(role)) {
+    master->roles[role] = new Role(role);
+  }
+  master->roles.at(role)->addFramework(this);
+}
+
+
+void Framework::untrackUnderRole(const std::string& role)
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  CHECK(isTrackedUnderRole(role));
+
+  // NOTE: Ideally we would also `CHECK` that we're not currently subscribed
+  // to the role. We don't do this currently because this function is used in
+  // `Master::removeFramework` where we're still subscribed to `roles`.
+
+  auto allocatedToRole = [&role](const Resource& resource) {
+    return resource.allocation_info().role() == role;
+  };
+
+  CHECK(totalUsedResources.filter(allocatedToRole).empty());
+  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+
+  master->roles.at(role)->removeFramework(this);
+  if (master->roles.at(role)->frameworks.empty()) {
+    delete master->roles.at(role);
+    master->roles.erase(role);
+  }
+}
+
+
+void Framework::setFrameworkState(const Framework::State& _state)
+{
+  state = _state;
+  metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0;
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 192fe82..21de973 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -408,63 +408,6 @@ struct BoundedRateLimiter
 };
 
 
-bool Framework::isTrackedUnderRole(const string& role) const
-{
-  CHECK(master->isWhitelistedRole(role))
-    << "Unknown role '" << role << "'" << " of framework " << *this;
-
-  return master->roles.contains(role) &&
-         master->roles.at(role)->frameworks.contains(id());
-}
-
-
-void Framework::trackUnderRole(const string& role)
-{
-  CHECK(master->isWhitelistedRole(role))
-    << "Unknown role '" << role << "'" << " of framework " << *this;
-
-  CHECK(!isTrackedUnderRole(role));
-
-  if (!master->roles.contains(role)) {
-    master->roles[role] = new Role(role);
-  }
-  master->roles.at(role)->addFramework(this);
-}
-
-
-void Framework::untrackUnderRole(const string& role)
-{
-  CHECK(master->isWhitelistedRole(role))
-    << "Unknown role '" << role << "'" << " of framework " << *this;
-
-  CHECK(isTrackedUnderRole(role));
-
-  // NOTE: Ideally we would also `CHECK` that we're not currently subscribed
-  // to the role. We don't do this currently because this function is used in
-  // `Master::removeFramework` where we're still subscribed to `roles`.
-
-  auto allocatedToRole = [&role](const Resource& resource) {
-    return resource.allocation_info().role() == role;
-  };
-
-  CHECK(totalUsedResources.filter(allocatedToRole).empty());
-  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-
-  master->roles.at(role)->removeFramework(this);
-  if (master->roles.at(role)->frameworks.empty()) {
-    delete master->roles.at(role);
-    master->roles.erase(role);
-  }
-}
-
-
-void Framework::setFrameworkState(const Framework::State& _state)
-{
-  state = _state;
-  metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0;
-}
-
-
 void Master::initialize()
 {
   LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cbba52/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 45ffedb..ce1a7e1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -67,7 +67,6 @@
 #include <stout/uuid.hpp>
 
 #include "common/http.hpp"
-#include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
 #include "files/files.hpp"
@@ -2289,101 +2288,23 @@ struct Framework
             const Flags& masterFlags,
             const FrameworkInfo& info,
             const process::UPID& _pid,
-            const process::Time& time = process::Clock::now())
-    : Framework(master, masterFlags, info, ACTIVE, time)
-  {
-    pid = _pid;
-  }
+            const process::Time& time = process::Clock::now());
 
   Framework(Master* const master,
             const Flags& masterFlags,
             const FrameworkInfo& info,
             const HttpConnection& _http,
-            const process::Time& time = process::Clock::now())
-    : Framework(master, masterFlags, info, ACTIVE, time)
-  {
-    http = _http;
-  }
+            const process::Time& time = process::Clock::now());
 
   Framework(Master* const master,
             const Flags& masterFlags,
-            const FrameworkInfo& info)
-    : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
-
-  ~Framework()
-  {
-    if (http.isSome()) {
-      closeHttpConnection();
-    }
-  }
-
-  Task* getTask(const TaskID& taskId)
-  {
-    if (tasks.count(taskId) > 0) {
-      return tasks[taskId];
-    }
-
-    return nullptr;
-  }
-
-  void addTask(Task* task)
-  {
-    CHECK(!tasks.contains(task->task_id()))
-      << "Duplicate task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    // Verify that Resource.AllocationInfo is set,
-    // this should be guaranteed by the master.
-    foreach (const Resource& resource, task->resources()) {
-      CHECK(resource.has_allocation_info());
-    }
+            const FrameworkInfo& info);
 
-    tasks[task->task_id()] = task;
+  ~Framework();
 
-    // Unreachable tasks should be added via `addUnreachableTask`.
-    CHECK(task->state() != TASK_UNREACHABLE)
-      << "Task '" << task->task_id() << "' of framework " << id()
-      << " added in TASK_UNREACHABLE state";
+  Task* getTask(const TaskID& taskId);
 
-    // Since we track terminal but unacknowledged tasks within
-    // `tasks` rather than `completedTasks`, we need to handle
-    // them here: don't count them as consuming resources.
-    //
-    // TODO(bmahler): Users currently get confused because
-    // terminal tasks can show up as "active" tasks in the UI and
-    // endpoints. Ideally, we show the terminal unacknowledged
-    // tasks as "completed" as well.
-    if (!protobuf::isTerminalState(task->state())) {
-      // Note that we explicitly convert from protobuf to `Resources` once
-      // and then use the result for calculations to avoid performance penalty
-      // for multiple conversions and validations implied by `+=` with protobuf
-      // arguments.
-      // Conversion is safe, as resources have already passed validation.
-      const Resources resources = task->resources();
-      totalUsedResources += resources;
-      usedResources[task->slave_id()] += resources;
-
-      // It's possible that we're not tracking the task's role for
-      // this framework if the role is absent from the framework's
-      // set of roles. In this case, we track the role's allocation
-      // for this framework.
-      CHECK(!task->resources().empty());
-      const std::string& role =
-        task->resources().begin()->allocation_info().role();
-
-      if (!isTrackedUnderRole(role)) {
-        trackUnderRole(role);
-      }
-    }
-
-    metrics.incrementTaskState(task->state());
-
-    if (!master->subscribers.subscribed.empty()) {
-      master->subscribers.send(
-          protobuf::master::event::createTaskAdded(*task),
-          info);
-    }
-  }
+  void addTask(Task* task);
 
   // Update framework to recover the resources that were previously
   // being used by `task`.
@@ -2391,35 +2312,7 @@ struct Framework
   // TODO(bmahler): This is a hack for performance. We need to
   // maintain resource counters because computing task resources
   // functionally for all tasks is expensive, for now.
-  void recoverResources(Task* task)
-  {
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    totalUsedResources -= task->resources();
-    usedResources[task->slave_id()] -= task->resources();
-    if (usedResources[task->slave_id()].empty()) {
-      usedResources.erase(task->slave_id());
-    }
-
-    // If we are no longer subscribed to the role to which these resources are
-    // being returned to, and we have no more resources allocated to us for 
that
-    // role, stop tracking the framework under the role.
-    CHECK(!task->resources().empty());
-    const std::string& role =
-      task->resources().begin()->allocation_info().role();
-
-    auto allocatedToRole = [&role](const Resource& resource) {
-      return resource.allocation_info().role() == role;
-    };
-
-    if (roles.count(role) == 0 &&
-        totalUsedResources.filter(allocatedToRole).empty()) {
-      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-      untrackUnderRole(role);
-    }
-  }
+  void recoverResources(Task* task);
 
   // Sends a message to the connected framework.
   template <typename Message>
@@ -2445,492 +2338,63 @@ struct Framework
     }
   }
 
-  void addCompletedTask(Task&& task)
-  {
-    // TODO(neilc): We currently allow frameworks to reuse the task
-    // IDs of completed tasks (although this is discouraged). This
-    // means that there might be multiple completed tasks with the
-    // same task ID. We should consider rejecting attempts to reuse
-    // task IDs (MESOS-6779).
-    completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
-  }
+  void addCompletedTask(Task&& task);
 
-  void addUnreachableTask(const Task& task)
-  {
-    // TODO(adam-mesos): Check if unreachable task already exists.
-    unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
-  }
+  void addUnreachableTask(const Task& task);
 
   // Removes the task. `unreachable` indicates whether the task is removed due
   // to being unreachable. Note that we cannot rely on the task state because
   // it may not reflect unreachability due to being set to TASK_LOST for
   // backwards compatibility.
-  void removeTask(Task* task, bool unreachable)
-  {
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    // The invariant here is that the master will have already called
-    // `recoverResources()` prior to removing terminal or unreachable tasks.
-    if (!protobuf::isTerminalState(task->state()) &&
-        task->state() != TASK_UNREACHABLE) {
-      recoverResources(task);
-    }
-
-    if (unreachable) {
-      addUnreachableTask(*task);
-    } else {
-      CHECK(task->state() != TASK_UNREACHABLE);
-
-      // TODO(bmahler): This moves a potentially non-terminal task into
-      // the completed list!
-      addCompletedTask(Task(*task));
-    }
-
-    tasks.erase(task->task_id());
-  }
+  void removeTask(Task* task, bool unreachable);
 
-  void addOffer(Offer* offer)
-  {
-    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
-    offers.insert(offer);
-    totalOfferedResources += offer->resources();
-    offeredResources[offer->slave_id()] += offer->resources();
-  }
-
-  void removeOffer(Offer* offer)
-  {
-    CHECK(offers.find(offer) != offers.end())
-      << "Unknown offer " << offer->id();
-
-    totalOfferedResources -= offer->resources();
-    offeredResources[offer->slave_id()] -= offer->resources();
-    if (offeredResources[offer->slave_id()].empty()) {
-      offeredResources.erase(offer->slave_id());
-    }
-
-    offers.erase(offer);
-  }
+  void addOffer(Offer* offer);
 
-  void addInverseOffer(InverseOffer* inverseOffer)
-  {
-    CHECK(!inverseOffers.contains(inverseOffer))
-      << "Duplicate inverse offer " << inverseOffer->id();
-    inverseOffers.insert(inverseOffer);
-  }
+  void removeOffer(Offer* offer);
 
-  void removeInverseOffer(InverseOffer* inverseOffer)
-  {
-    CHECK(inverseOffers.contains(inverseOffer))
-      << "Unknown inverse offer " << inverseOffer->id();
+  void addInverseOffer(InverseOffer* inverseOffer);
 
-    inverseOffers.erase(inverseOffer);
-  }
+  void removeInverseOffer(InverseOffer* inverseOffer);
 
   bool hasExecutor(const SlaveID& slaveId,
-                   const ExecutorID& executorId)
-  {
-    return executors.contains(slaveId) &&
-      executors[slaveId].contains(executorId);
-  }
+                   const ExecutorID& executorId);
 
   void addExecutor(const SlaveID& slaveId,
-                   const ExecutorInfo& executorInfo)
-  {
-    CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
-      << "Duplicate executor '" << executorInfo.executor_id()
-      << "' on agent " << slaveId;
-
-    // Verify that Resource.AllocationInfo is set,
-    // this should be guaranteed by the master.
-    foreach (const Resource& resource, executorInfo.resources()) {
-      CHECK(resource.has_allocation_info());
-    }
-
-    executors[slaveId][executorInfo.executor_id()] = executorInfo;
-    totalUsedResources += executorInfo.resources();
-    usedResources[slaveId] += executorInfo.resources();
-
-    // It's possible that we're not tracking the task's role for
-    // this framework if the role is absent from the framework's
-    // set of roles. In this case, we track the role's allocation
-    // for this framework.
-    if (!executorInfo.resources().empty()) {
-      const std::string& role =
-        executorInfo.resources().begin()->allocation_info().role();
-
-      if (!isTrackedUnderRole(role)) {
-        trackUnderRole(role);
-      }
-    }
-  }
+                   const ExecutorInfo& executorInfo);
 
   void removeExecutor(const SlaveID& slaveId,
-                      const ExecutorID& executorId)
-  {
-    CHECK(hasExecutor(slaveId, executorId))
-      << "Unknown executor '" << executorId
-      << "' of framework " << id()
-      << " of agent " << slaveId;
-
-    const ExecutorInfo& executorInfo = executors[slaveId][executorId];
-
-    totalUsedResources -= executorInfo.resources();
-    usedResources[slaveId] -= executorInfo.resources();
-    if (usedResources[slaveId].empty()) {
-      usedResources.erase(slaveId);
-    }
-
-    // If we are no longer subscribed to the role to which these resources are
-    // being returned to, and we have no more resources allocated to us for 
that
-    // role, stop tracking the framework under the role.
-    if (!executorInfo.resources().empty()) {
-      const std::string& role =
-        executorInfo.resources().begin()->allocation_info().role();
-
-      auto allocatedToRole = [&role](const Resource& resource) {
-        return resource.allocation_info().role() == role;
-      };
-
-      if (roles.count(role) == 0 &&
-          totalUsedResources.filter(allocatedToRole).empty()) {
-        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-        untrackUnderRole(role);
-      }
-    }
-
-    executors[slaveId].erase(executorId);
-    if (executors[slaveId].empty()) {
-      executors.erase(slaveId);
-    }
-  }
-
-  void addOperation(Operation* operation)
-  {
-    CHECK(operation->has_framework_id());
-
-    const FrameworkID& frameworkId = operation->framework_id();
-
-    const UUID& uuid = operation->uuid();
-
-    CHECK(!operations.contains(uuid))
-      << "Duplicate operation '" << operation->info().id()
-      << "' (uuid: " << uuid << ") "
-      << "of framework " << frameworkId;
-
-    operations.put(uuid, operation);
-
-    if (operation->info().has_id()) {
-      operationUUIDs.put(operation->info().id(), uuid);
-    }
-
-    if (!protobuf::isSpeculativeOperation(operation->info()) &&
-        !protobuf::isTerminalState(operation->latest_status().state())) {
-      Try<Resources> consumed =
-        protobuf::getConsumedResources(operation->info());
-      CHECK_SOME(consumed);
-
-      CHECK(operation->has_slave_id())
-        << "External resource provider is not supported yet";
-
-      const SlaveID& slaveId = operation->slave_id();
-
-      totalUsedResources += consumed.get();
-      usedResources[slaveId] += consumed.get();
-
-      // It's possible that we're not tracking the role from the
-      // resources in the operation for this framework if the role is
-      // absent from the framework's set of roles. In this case, we
-      // track the role's allocation for this framework.
-      foreachkey (const std::string& role, consumed->allocations()) {
-        if (!isTrackedUnderRole(role)) {
-          trackUnderRole(role);
-        }
-      }
-    }
-  }
-
-  Option<Operation*> getOperation(const OperationID& id) {
-    Option<UUID> uuid = operationUUIDs.get(id);
+                      const ExecutorID& executorId);
 
-    if (uuid.isNone()) {
-      return None();
-    }
-
-    Option<Operation*> operation = operations.get(uuid.get());
-
-    CHECK_SOME(operation);
-
-    return operation;
-  }
-
-  void recoverResources(Operation* operation)
-  {
-    CHECK(operation->has_slave_id())
-      << "External resource provider is not supported yet";
-
-    const SlaveID& slaveId = operation->slave_id();
-
-    if (protobuf::isSpeculativeOperation(operation->info())) {
-      return;
-    }
-
-    Try<Resources> consumed = 
protobuf::getConsumedResources(operation->info());
-    CHECK_SOME(consumed);
-
-    CHECK(totalUsedResources.contains(consumed.get()))
-      << "Tried to recover resources " << consumed.get()
-      << " which do not seem used";
-
-    CHECK(usedResources[slaveId].contains(consumed.get()))
-      << "Tried to recover resources " << consumed.get() << " of agent "
-      << slaveId << " which do not seem used";
-
-    totalUsedResources -= consumed.get();
-    usedResources[slaveId] -= consumed.get();
-    if (usedResources[slaveId].empty()) {
-      usedResources.erase(slaveId);
-    }
-
-    // If we are no longer subscribed to the role to which these
-    // resources are being returned to, and we have no more resources
-    // allocated to us for that role, stop tracking the framework
-    // under the role.
-    foreachkey (const std::string& role, consumed->allocations()) {
-      auto allocatedToRole = [&role](const Resource& resource) {
-        return resource.allocation_info().role() == role;
-      };
-
-      if (roles.count(role) == 0 &&
-          totalUsedResources.filter(allocatedToRole).empty()) {
-        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-        untrackUnderRole(role);
-      }
-    }
-  }
-
-  void removeOperation(Operation* operation)
-  {
-    const UUID& uuid = operation->uuid();
-
-    CHECK(operations.contains(uuid))
-      << "Unknown operation '" << operation->info().id()
-      << "' (uuid: " << uuid << ") "
-      << "of framework " << operation->framework_id();
+  void addOperation(Operation* operation);
 
-    if (!protobuf::isSpeculativeOperation(operation->info()) &&
-        !protobuf::isTerminalState(operation->latest_status().state())) {
-      recoverResources(operation);
-    }
+  Option<Operation*> getOperation(const OperationID& id);
 
-    if (operation->info().has_id()) {
-      operationUUIDs.erase(operation->info().id());
-    }
+  void recoverResources(Operation* operation);
 
-    operations.erase(uuid);
-  }
+  void removeOperation(Operation* operation);
 
-  const FrameworkID id() const { return info.id(); }
+  const FrameworkID id() const;
 
   // Update fields in 'info' using those in 'newInfo'. Currently this
   // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
   // 'webui_url', 'capabilities', and 'labels'.
-  void update(const FrameworkInfo& newInfo)
-  {
-    // We only merge 'info' from the same framework 'id'.
-    CHECK_EQ(info.id(), newInfo.id());
-
-    // Save the old list of roles for later.
-    std::set<std::string> oldRoles = roles;
-
-    // TODO(jmlvanre): Merge other fields as per design doc in
-    // MESOS-703.
-
-    info.clear_role();
-    info.clear_roles();
-
-    if (newInfo.has_role()) {
-      info.set_role(newInfo.role());
-    }
-
-    if (newInfo.roles_size() > 0) {
-      info.mutable_roles()->CopyFrom(newInfo.roles());
-    }
+  void update(const FrameworkInfo& newInfo);
 
-    roles = protobuf::framework::getRoles(newInfo);
+  void updateConnection(const process::UPID& newPid);
 
-    if (newInfo.user() != info.user()) {
-      LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
-                   << "' for framework " << id() << ". Check MESOS-703";
-    }
-
-    info.set_name(newInfo.name());
-
-    if (newInfo.has_failover_timeout()) {
-      info.set_failover_timeout(newInfo.failover_timeout());
-    } else {
-      info.clear_failover_timeout();
-    }
-
-    if (newInfo.checkpoint() != info.checkpoint()) {
-      LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
-                   << stringify(newInfo.checkpoint()) << "' for framework "
-                   << id() << ". Check MESOS-703";
-    }
-
-    if (newInfo.has_hostname()) {
-      info.set_hostname(newInfo.hostname());
-    } else {
-      info.clear_hostname();
-    }
-
-    if (newInfo.principal() != info.principal()) {
-      LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
-                   << newInfo.principal() << "' for framework " << id()
-                   << ". Check MESOS-703";
-    }
-
-    if (newInfo.has_webui_url()) {
-      info.set_webui_url(newInfo.webui_url());
-    } else {
-      info.clear_webui_url();
-    }
-
-    if (newInfo.capabilities_size() > 0) {
-      info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
-    } else {
-      info.clear_capabilities();
-    }
-    capabilities = protobuf::framework::Capabilities(info.capabilities());
-
-    if (newInfo.has_labels()) {
-      info.mutable_labels()->CopyFrom(newInfo.labels());
-    } else {
-      info.clear_labels();
-    }
-
-    const std::set<std::string>& newRoles = roles;
-
-    const std::set<std::string> removedRoles = [&]() {
-      std::set<std::string> result = oldRoles;
-      foreach (const std::string& role, newRoles) {
-        result.erase(role);
-      }
-      return result;
-    }();
-
-    foreach (const std::string& role, removedRoles) {
-      auto allocatedToRole = [&role](const Resource& resource) {
-        return resource.allocation_info().role() == role;
-      };
-
-      // Stop tracking the framework under this role if there are
-      // no longer any resources allocated to it.
-      if (totalUsedResources.filter(allocatedToRole).empty()) {
-        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-        untrackUnderRole(role);
-      }
-    }
-
-    const std::set<std::string> addedRoles = [&]() {
-      std::set<std::string> result = newRoles;
-      foreach (const std::string& role, oldRoles) {
-        result.erase(role);
-      }
-      return result;
-    }();
-
-    foreach (const std::string& role, addedRoles) {
-      // NOTE: It's possible that we're already tracking this framework
-      // under the role because a framework can unsubscribe from a role
-      // while it still has resources allocated to the role.
-      if (!isTrackedUnderRole(role)) {
-        trackUnderRole(role);
-      }
-    }
-  }
-
-  void updateConnection(const process::UPID& newPid)
-  {
-    // Cleanup the HTTP connnection if this is a downgrade from HTTP
-    // to PID. Note that the connection may already be closed.
-    if (http.isSome()) {
-      closeHttpConnection();
-    }
-
-    // TODO(benh): unlink(oldPid);
-    pid = newPid;
-  }
-
-  void updateConnection(const HttpConnection& newHttp)
-  {
-    if (pid.isSome()) {
-      // Wipe the PID if this is an upgrade from PID to HTTP.
-      // TODO(benh): unlink(oldPid);
-      pid = None();
-    } else if (http.isSome()) {
-      // Cleanup the old HTTP connection.
-      // Note that master creates a new HTTP connection for every
-      // subscribe request, so 'newHttp' should always be different
-      // from 'http'.
-      closeHttpConnection();
-    }
-
-    CHECK_NONE(http);
-
-    http = newHttp;
-  }
+  void updateConnection(const HttpConnection& newHttp);
 
   // Closes the HTTP connection and stops the heartbeat.
   //
   // TODO(vinod): Currently `state` variable is set separately
   // from this method. We need to make sure these are in sync.
-  void closeHttpConnection()
-  {
-    CHECK_SOME(http);
-
-    if (connected() && !http->close()) {
-      LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
-    }
-
-    http = None();
+  void closeHttpConnection();
 
-    CHECK_SOME(heartbeater);
+  void heartbeat();
 
-    terminate(heartbeater->get());
-    wait(heartbeater->get());
-
-    heartbeater = None();
-  }
-
-  void heartbeat()
-  {
-    CHECK_NONE(heartbeater);
-    CHECK_SOME(http);
-
-    // TODO(vinod): Make heartbeat interval configurable and include
-    // this information in the SUBSCRIBED response.
-    scheduler::Event event;
-    event.set_type(scheduler::Event::HEARTBEAT);
-
-    heartbeater =
-      new Heartbeater<scheduler::Event, v1::scheduler::Event>(
-          "framework " + stringify(info.id()),
-          event,
-          http.get(),
-          DEFAULT_HEARTBEAT_INTERVAL,
-          None(),
-          [this](const scheduler::Event& event) {
-            this->metrics.incrementEvent(event);
-          });
-
-    process::spawn(heartbeater->get());
-  }
-
-  bool active() const    { return state == ACTIVE; }
-  bool connected() const { return state == ACTIVE || state == INACTIVE; }
-  bool recovered() const { return state == RECOVERED; }
+  bool active() const;
+  bool connected() const;
+  bool recovered() const;
 
   bool isTrackedUnderRole(const std::string& role) const;
   void trackUnderRole(const std::string& role);
@@ -3042,30 +2506,7 @@ private:
             const Flags& masterFlags,
             const FrameworkInfo& _info,
             State state,
-            const process::Time& time)
-    : master(_master),
-      info(_info),
-      roles(protobuf::framework::getRoles(_info)),
-      capabilities(_info.capabilities()),
-      registeredTime(time),
-      reregisteredTime(time),
-      completedTasks(masterFlags.max_completed_tasks_per_framework),
-      unreachableTasks(masterFlags.max_unreachable_tasks_per_framework),
-      metrics(_info)
-  {
-    CHECK(_info.has_id());
-
-    setFrameworkState(state);
-
-    foreach (const std::string& role, roles) {
-      // NOTE: It's possible that we're already being tracked under the role
-      // because a framework can unsubscribe from a role while it still has
-      // resources allocated to the role.
-      if (!isTrackedUnderRole(role)) {
-        trackUnderRole(role);
-      }
-    }
-  }
+            const process::Time& time);
 
   Framework(const Framework&);              // No copying.
   Framework& operator=(const Framework&); // No assigning.

Reply via email to