My bad, I accidentally pushed out a patch that I had applied earlier. Build
should be back to normal now!

On Wed, Aug 5, 2015 at 4:47 PM, <bmah...@apache.org> wrote:

> Repository: mesos
> Updated Branches:
>   refs/heads/master dbf35da46 -> 586307e54
>
>
> Revert "Add subscribe-> subscribed workflow for http frameworks"
>
> This reverts commit 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586307e5
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586307e5
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586307e5
>
> Branch: refs/heads/master
> Commit: 586307e546955c23aadf0ed5bfae9e00c0228f86
> Parents: dbf35da
> Author: Benjamin Mahler <benjamin.mah...@gmail.com>
> Authored: Wed Aug 5 16:47:36 2015 -0700
> Committer: Benjamin Mahler <benjamin.mah...@gmail.com>
> Committed: Wed Aug 5 16:47:36 2015 -0700
>
> ----------------------------------------------------------------------
>  src/master/http.cpp   |  21 +--
>  src/master/master.cpp | 343 ++++++---------------------------------------
>  src/master/master.hpp |  36 -----
>  3 files changed, 42 insertions(+), 358 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/http.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/http.cpp b/src/master/http.cpp
> index 181db46..76e7080 100644
> --- a/src/master/http.cpp
> +++ b/src/master/http.cpp
> @@ -70,7 +70,6 @@ using process::http::InternalServerError;
>  using process::http::NotFound;
>  using process::http::NotImplemented;
>  using process::http::OK;
> -using process::http::Pipe;
>  using process::http::TemporaryRedirect;
>  using process::http::Unauthorized;
>  using process::http::UnsupportedMediaType;
> @@ -376,24 +375,10 @@ Future<Response> Master::Http::call(const Request&
> request) const
>      responseContentType = ContentType::PROTOBUF;
>    }
>
> -  switch (call.type()) {
> -    case scheduler::Call::SUBSCRIBE: {
> -      Pipe pipe;
> -      OK ok;
> +  // Silence unused warning for now.
> +  (void)responseContentType;
>
> -      ok.type = Response::PIPE;
> -      ok.reader = pipe.reader();
> -
> -      master->subscribe(
> -          call.subscribe(),
> -          responseContentType,
> -          pipe.writer());
> -
> -      return ok;
> -    }
> -    default:
> -      break;
> -  }
> +  // TODO(anand): Handle the call.
>
>    return NotImplemented();
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.cpp b/src/master/master.cpp
> index e738607..50b9824 100644
> --- a/src/master/master.cpp
> +++ b/src/master/master.cpp
> @@ -1771,227 +1771,6 @@ void Master::reregisterFramework(
>
>
>  void Master::subscribe(
> -    const scheduler::Call::Subscribe& subscribe,
> -    ContentType contentType,
> -    Pipe::Writer writer)
> -{
> -  // TODO(anand): We need to ensure that framework is authenticated
> -  // before calling into this function. If not, we need to build the
> -  // authentication logic here before invoking subscribe(...)
> -
> -  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
> -
> -  LOG(INFO) << "Received registration request for"
> -            << " http framework '" << frameworkInfo.name() << "'";
> -
> -  // Assign a new FrameworkID.
> -  FrameworkInfo frameworkInfo_ = frameworkInfo;
> -  frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
> -
> -  Framework* framework = new Framework(
> -      this,
> -      frameworkInfo_,
> -      writer,
> -      contentType);
> -
> -  // TODO(anand): Add validation/authorization logic before we invoke
> -  // the continuation function.
> -  this->_subscribe(framework, subscribe);
> -}
> -
> -
> -void Master::_subscribe(
> -    Framework* framework,
> -    const scheduler::Call::Subscribe& subscribe)
> -{
> -  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
> -
> -  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
> -    // TODO(anand): Make '(re-)registerFramework()' also call into
> -    // this method (MESOS-3182).
> -    LOG(INFO) << "Registering framework " << *framework
> -              << " with checkpointing "
> -              << (framework->info.checkpoint() ? "enabled" : "disabled")
> -              << " and capabilities " << framework->info.capabilities();
> -
> -    // TODO(vinod): Deprecate this in favor of authorization.
> -    bool rootSubmissions = flags.root_submissions;
> -
> -    if (framework->info.user() == "root" && rootSubmissions == false) {
> -      LOG(INFO) << "Framework " << *framework << " registering as root,
> but "
> -                << "root submissions are disabled on this cluster";
> -      FrameworkErrorMessage message;
> -      message.set_message("User 'root' is not allowed to run frameworks");
> -      framework->send(message);
> -      delete framework;
> -      return;
> -    }
> -
> -    addFramework(framework);
> -
> -    FrameworkRegisteredMessage message;
> -    message.mutable_framework_id()->MergeFrom(framework->id());
> -    message.mutable_master_info()->MergeFrom(info_);
> -    framework->send(message);
> -    return;
> -  }
> -
> -  const bool force = subscribe.has_force() ? subscribe.force() : false;
> -  const bool isRegistered =
> frameworks.registered.count(frameworkInfo.id());
> -
> -  // TODO(anand): Completed frameworks might try to subscribe
> -  // again later. This is part of validation checks in the
> -  // reregister(...) function. We need to refactor those checks into a
> -  // separate function and then use them here.
> -
> -  LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
> -            << " (" << frameworkInfo.name() << ")  with checkpointing "
> -            << (frameworkInfo.checkpoint() ? "enabled" : "disabled")
> -            << " and capabilities " << frameworkInfo.capabilities();
> -
> -  if (isRegistered) {
> -    // Using the "failover" of the scheduler allows us to keep a
> -    // scheduler that got partitioned but didn't die (in ZooKeeper
> -    // speak this means didn't lose their session) and then
> -    // eventually tried to connect to this master even though
> -    // another instance of their scheduler has reconnected. This
> -    // might not be an issue in the future when the
> -    // master/allocator launches the scheduler can get restarted
> -    // (if necessary) by the master and the master will always
> -    // know which scheduler is the correct one.
> -
> -    Framework* registeredFramework =
> -      CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
> -
> -    // Update the framework's info fields based on those passed during
> -    // re-registration.
> -    LOG(INFO) << "Updating info for framework " << framework->id();
> -
> -    registeredFramework->updateFrameworkInfo(framework->info);
> -    allocator->updateFramework(
> -        registeredFramework->id(),
> -        registeredFramework->info);
> -
> -    registeredFramework->reregisteredTime = Clock::now();
> -
> -    if (force) {
> -      // TODO(benh): Should we check whether the new scheduler has
> -      // given us a different framework name, user name or executor
> -      // info?
> -      LOG(INFO) << "Framework " << *registeredFramework << " failed over";
> -      failoverFramework(registeredFramework, framework);
> -    } else {
> -      if (registeredFramework->connected) {
> -        // It's very hard to diffrentiate between retries from the
> -        // same scheduler or a old scheduler that got failed over to a
> -        // new one and is now retrying. An example scenario can be:
> -        // A scheduler sends us a SUBCRIBE request, we process it and
> -        // the stream breaks just before we could send a SUBSCRIBED
> -        // event back. If the scheduler retries now, we would see it
> -        // as connected. This scenario is no different from a old
> -        // scheduler retrying after it has already failed over.
> -        // Eventually the master would detect the disconnection, start
> -        // the failover clock and then the framework can connect. This
> -        // was slightly easier for pid frameworks as we used to check
> -        // if the pid values matched.
> -        FrameworkErrorMessage message;
> -        message.set_message("Framework is already connected");
> -        framework->send(message);
> -        delete framework;
> -        return;
> -      }
> -
> -      LOG(INFO) << "Allowing framework " << *registeredFramework
> -                << " to re-register with an already used id";
> -
> -      // Convert the framework to a http framework if it was pid based
> -      // in the past. Also, we don't need to set the readerClosed(...)
> -      // callbacks again as it was done already when the framework
> -      // object was created.
> -      if (registeredFramework->pid.isSome()) {
> -        registeredFramework->pid = None();
> -        registeredFramework->http = framework->http;
> -      } else {
> -        registeredFramework->updateWriter(framework->http.get().writer);
> -      }
> -
> -      // Remove any offers sent to this framework and reactivate it.
> -      // NOTE: We need to do this because the scheduler might have
> -      // replied to the offers but the driver might have dropped
> -      // those messages since it wasn't connected to the master.
> -      rescindOffersAndReactivate(registeredFramework);
> -
> -      FrameworkReregisteredMessage message;
> -
> message.mutable_framework_id()->MergeFrom(registeredFramework->id());
> -      message.mutable_master_info()->MergeFrom(info_);
> -      registeredFramework->send(message);
> -    }
> -  } else {
> -    // We don't have a framework with this ID, so we must be a newly
> -    // elected Mesos master to which either an existing scheduler or a
> -    // failed-over one is connecting. Add any tasks it has that have
> -    // been reported by reconnecting slaves.
> -
> -    // TODO(benh): Check for root submissions like above!
> -
> -    // Add active tasks and executors to the framework.
> -    foreachvalue (Slave* slave, slaves.registered) {
> -      foreachvalue (Task* task, slave->tasks[framework->id()]) {
> -        framework->addTask(task);
> -      }
> -      foreachvalue (const ExecutorInfo& executor,
> -                    slave->executors[framework->id()]) {
> -        framework->addExecutor(slave->id, executor);
> -      }
> -    }
> -
> -    // N.B. Need to add the framework _after_ we add its tasks
> -    // (above) so that we can properly determine the resources it's
> -    // currently using!
> -    addFramework(framework);
> -
> -    // TODO(bmahler): We have to send a registered message here for
> -    // the re-registering framework, per the API contract. Send
> -    // re-register here per MESOS-786; requires deprecation or it
> -    // will break frameworks.
> -    FrameworkRegisteredMessage message;
> -    message.mutable_framework_id()->MergeFrom(framework->id());
> -    message.mutable_master_info()->MergeFrom(info_);
> -    framework->send(message);
> -  }
> -
> -  CHECK(frameworks.registered.contains(frameworkInfo.id()))
> -    << "Unknown framework " << frameworkInfo.id()
> -    << " (" << frameworkInfo.name() << ")";
> -
> -  // Broadcast the new framework pid to all the slaves. We have to
> -  // broadcast because an executor might be running on a slave but
> -  // it currently isn't running any tasks. This could be a
> -  // potential scalability issue ...
> -  foreachvalue (Slave* slave, slaves.registered) {
> -    UpdateFrameworkMessage message;
> -    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
> -
> -    // TODO(anand): We set 'pid' to UPID() for http frameworks
> -    // as 'pid' was made optional in 0.24.0. In 0.25.0, we
> -    // no longer have to set pid here for http frameowrks.
> -    message.set_pid(framework->pid.getOrElse(UPID()));
> -    send(slave->pid, message);
> -  }
> -
> -  if (isRegistered) {
> -    // Mark the framework as disconnected before deleting so as not
> -    // to trigger closing the writer.
> -    framework->connected = false;
> -
> -    // We only updated attributes in the existing stored framework.
> -    // Delete the optimistically created framework object.
> -    delete framework;
> -  }
> -}
> -
> -
> -void Master::subscribe(
>      const UPID& from,
>      const scheduler::Call::Subscribe& subscribe)
>  {
> @@ -2202,7 +1981,7 @@ void Master::_subscribe(
>          << " because it is not expected from " << from;
>        FrameworkErrorMessage message;
>        message.set_message("Framework failed over");
> -      framework->send(message);
> +      send(from, message);
>        return;
>      } else {
>        LOG(INFO) << "Allowing framework " << *framework
> @@ -4886,11 +4665,11 @@ void Master::addFramework(Framework* framework)
>    CHECK(!frameworks.registered.contains(framework->id()))
>      << "Framework " << *framework << " already exists!";
>
> +  CHECK_SOME(framework->pid) << "adding http framework not implemented";
> +
>    frameworks.registered[framework->id()] = framework;
>
> -  if (framework->pid.isSome()) {
> -    link(framework->pid.get());
> -  }
> +  link(framework->pid.get());
>
>    // TODO(anand): For http frameworks, add a readerClosed()
>    // callback to invoke Master::exited() when the connection
> @@ -4916,87 +4695,24 @@ void Master::addFramework(Framework* framework)
>    // If the framework is authenticated, its principal should be in
>    // 'authenticated'. Otherwise look if it's supplied in
>    // FrameworkInfo.
> -  if (framework->pid.isSome()) {
> -    Option<string> principal = authenticated.get(framework->pid.get());
> -    if (principal.isNone() && framework->info.has_principal()) {
> -      principal = framework->info.principal();
> -    }
> -
> -    CHECK(!frameworks.principals.contains(framework->pid.get()));
> -    frameworks.principals.put(framework->pid.get(), principal);
> -
> -    // Export framework metrics if a principal is specified.
> -    if (principal.isSome()) {
> -      // Create new framework metrics if this framework is the first
> -      // one of this principal. Otherwise existing metrics are reused.
> -      if (!metrics->frameworks.contains(principal.get())) {
> -        metrics->frameworks.put(
> -            principal.get(),
> -            Owned<Metrics::Frameworks>(
> -              new Metrics::Frameworks(principal.get())));
> -      }
> -    }
> +  Option<string> principal = authenticated.get(framework->pid.get());
> +  if (principal.isNone() && framework->info.has_principal()) {
> +    principal = framework->info.principal();
>    }
> -}
>
> +  CHECK(!frameworks.principals.contains(framework->pid.get()));
> +  frameworks.principals.put(framework->pid.get(), principal);
>
> -void Master::rescindOffersAndReactivate(Framework* framework)
> -{
> -  // Remove the framework's offers (if they weren't removed before).
> -  // We do this after we have updated the pid and sent the framework
> -  // registered message so that the allocator can immediately re-offer
> -  // these resources to this framework if it wants.
> -  foreach (Offer* offer, utils::copy(framework->offers)) {
> -    allocator->recoverResources(
> -        offer->framework_id(), offer->slave_id(), offer->resources(),
> None());
> -    removeOffer(offer);
> -  }
> -
> -  // Reactivate the framework.
> -  // NOTE: We do this after recovering resources (above) so that
> -  // the allocator has the correct view of the framework's share.
> -  if (!framework->active) {
> -    framework->active = true;
> -    allocator->activateFramework(framework->id());
> +  // Export framework metrics if a principal is specified.
> +  if (principal.isSome()) {
> +    // Create new framework metrics if this framework is the first
> +    // one of this principal. Otherwise existing metrics are reused.
> +    if (!metrics->frameworks.contains(principal.get())) {
> +      metrics->frameworks.put(
> +          principal.get(),
> +          Owned<Metrics::Frameworks>(new
> Metrics::Frameworks(principal.get())));
> +    }
>    }
> -
> -  // Mark the framework as connected now after reactivating it.
> -  framework->connected = true;
> -}
> -
> -
> -// TODO(anand): Currently this function is only used to failover http
> -// frameworks. Extend this function to handle both pid/http.
> -// We can then get rid of the failoverFramework(...) overload for pid
> -// based frameworks.
> -void Master::failoverFramework(Framework* framework, Framework*
> newFramework)
> -{
> -  // Notify the old connected framework that it has failed over.
> -  // It might be a duplicated message too, but it is virtually
> -  // impossible for us to distinguish between them. We try to
> -  // do a best effort logic here of letting the old scheduler
> -  // know.
> -  if (framework->connected) {
> -    FrameworkErrorMessage message;
> -    message.set_message("Framework failed over");
> -    framework->send(message);
> -  }
> -
> -  // The framework was previously registered as pid based and is
> -  // now trying to failover as a http framework. Set it's pid
> -  // to None and convert it to a http framework.
> -  if (framework->pid.isSome()) {
> -    framework->pid = None();
> -    framework->http = newFramework->http;
> -  }
> -
> -  framework->updateWriter(newFramework->http.get().writer);
> -  rescindOffersAndReactivate(framework);
> -
> -  FrameworkRegisteredMessage message;
> -  message.mutable_framework_id()->MergeFrom(framework->id());
> -  message.mutable_master_info()->MergeFrom(info_);
> -  framework->send(message);
>  }
>
>
> @@ -5004,6 +4720,8 @@ void Master::failoverFramework(Framework* framework,
> Framework* newFramework)
>  // event of a scheduler failover.
>  void Master::failoverFramework(Framework* framework, const UPID& newPid)
>  {
> +  CHECK_SOME(framework->pid) << "http framework failover not implemented";
> +
>    const UPID oldPid = framework->pid.get();
>
>    // There are a few failover cases to consider:
> @@ -5026,8 +4744,7 @@ void Master::failoverFramework(Framework* framework,
> const UPID& newPid)
>
>    framework->pid = newPid;
>    link(newPid);
> -
> -  rescindOffersAndReactivate(framework);
> +  framework->connected = true;
>
>    // The scheduler driver safely ignores any duplicate registration
>    // messages, so we don't need to compare the old and new pids here.
> @@ -5036,6 +4753,24 @@ void Master::failoverFramework(Framework*
> framework, const UPID& newPid)
>    message.mutable_master_info()->MergeFrom(info_);
>    framework->send(message);
>
> +  // Remove the framework's offers (if they weren't removed before).
> +  // We do this after we have updated the pid and sent the framework
> +  // registered message so that the allocator can immediately re-offer
> +  // these resources to this framework if it wants.
> +  foreach (Offer* offer, utils::copy(framework->offers)) {
> +    allocator->recoverResources(
> +        offer->framework_id(), offer->slave_id(), offer->resources(),
> None());
> +    removeOffer(offer);
> +  }
> +
> +  // Reactivate the framework.
> +  // NOTE: We do this after recovering resources (above) so that
> +  // the allocator has the correct view of the framework's share.
> +  if (!framework->active) {
> +    framework->active = true;
> +    allocator->activateFramework(framework->id());
> +  }
> +
>    // 'Failover' the framework's metrics. i.e., change the lookup key
>    // for its metrics to 'newPid'.
>    if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.hpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.hpp b/src/master/master.hpp
> index c71e343..e441749 100644
> --- a/src/master/master.hpp
> +++ b/src/master/master.hpp
> @@ -584,14 +584,6 @@ protected:
>    // the event of a scheduler failover.
>    void failoverFramework(Framework* framework, const process::UPID&
> newPid);
>
> -  // Failover the old framework. Copies various attributes from the
> -  // new framework passed as argument.
> -  void failoverFramework(Framework* framework, Framework* newFramework);
> -
> -  // Rescinds the framework's offers and reactivates the framework
> -  // marking it as connected.
> -  void rescindOffersAndReactivate(Framework* framework);
> -
>    // Kill all of a framework's tasks, delete the framework object, and
>    // reschedule offers that were assigned to this framework.
>    void removeFramework(Framework* framework);
> @@ -706,23 +698,11 @@ private:
>        const Offer::Operation& operation,
>        const std::string& message);
>
> -  // Subscribes a http framework.
> -  void subscribe(
> -      const scheduler::Call::Subscribe& subscribe,
> -      ContentType contentType,
> -      process::http::Pipe::Writer writer);
> -
> -  // Continuation of subscribe().
> -  void _subscribe(
> -      Framework* framework,
> -      const scheduler::Call::Subscribe& subscribe);
> -
>    // Call handlers.
>    void receive(
>        const process::UPID& from,
>        const scheduler::Call& call);
>
> -  // Subscribes a pid framework.
>    void subscribe(
>        const process::UPID& from,
>        const scheduler::Call::Subscribe& subscribe);
> @@ -1523,22 +1503,6 @@ struct Framework
>      }
>    }
>
> -  void updateWriter(process::http::Pipe::Writer writer)
> -  {
> -    // Don't update with the same writer.
> -    if (http.get().writer == writer) {
> -      return;
> -    }
> -
> -    // Close the existing connection and update the writer.
> -    http.get().writer.close();
> -    http.get().writer = writer;
> -
> -    // Set up the reader closed callback.
> -    http.get().writer.readerClosed()
> -      .onAny(defer(master->self(), &Master::exited, id(), writer));
> -  }
> -
>    Master* const master;
>
>    FrameworkInfo info;
>
>

Reply via email to