Moved contender and detector definitions into separate directories. Updated Makefile.am.
Review: https://reviews.apache.org/r/44544/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cfbca013 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cfbca013 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cfbca013 Branch: refs/heads/master Commit: cfbca0136af5fc07f87651bb0080f85a767a9925 Parents: a1d3d6b Author: Anurag Singh <anurag.prakash.si...@gmail.com> Authored: Wed Apr 6 15:08:18 2016 -0400 Committer: Kapil Arya <ka...@mesosphere.io> Committed: Wed Apr 6 18:36:18 2016 -0400 ---------------------------------------------------------------------- src/Makefile.am | 6 +- src/master/contender/contender.cpp | 255 ++++++++++++++++ src/master/contender/contender.hpp | 89 ++++++ src/master/detector/detector.cpp | 522 ++++++++++++++++++++++++++++++++ src/master/detector/detector.hpp | 98 ++++++ 5 files changed, 966 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index ba9cc8b..d095b98 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -624,8 +624,6 @@ libmesos_no_3rdparty_la_SOURCES += \ local/local.cpp \ logging/flags.cpp \ logging/logging.cpp \ - master/contender.cpp \ - master/detector.cpp \ master/flags.cpp \ master/http.cpp \ master/maintenance.cpp \ @@ -642,6 +640,8 @@ libmesos_no_3rdparty_la_SOURCES += \ master/allocator/mesos/hierarchical.cpp \ master/allocator/mesos/metrics.cpp \ master/allocator/sorter/drf/sorter.cpp \ + master/contender/contender.cpp \ + master/detector/detector.cpp \ messages/messages.cpp \ module/manager.cpp \ sched/sched.cpp \ @@ -739,8 +739,6 @@ libmesos_no_3rdparty_la_SOURCES += \ logging/flags.hpp \ logging/logging.hpp \ master/constants.hpp \ - master/contender.hpp \ - master/detector.hpp \ master/flags.hpp \ master/machine.hpp \ master/maintenance.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.cpp ---------------------------------------------------------------------- diff --git a/src/master/contender/contender.cpp b/src/master/contender/contender.cpp new file mode 100644 index 0000000..95cec3e --- /dev/null +++ b/src/master/contender/contender.cpp @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <mesos/master/contender.hpp> + +#include <process/defer.hpp> +#include <process/id.hpp> +#include <process/process.hpp> + +#include <stout/check.hpp> +#include <stout/lambda.hpp> +#include <stout/protobuf.hpp> + +#include "master/constants.hpp" +#include "master/contender.hpp" +#include "master/master.hpp" + +#include "zookeeper/contender.hpp" +#include "zookeeper/detector.hpp" +#include "zookeeper/group.hpp" +#include "zookeeper/url.hpp" + +using std::string; + +using namespace process; +using namespace zookeeper; + +namespace mesos { +namespace master { +namespace contender { + +using namespace internal; + +const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10); + + +class ZooKeeperMasterContenderProcess + : public Process<ZooKeeperMasterContenderProcess> +{ +public: + explicit ZooKeeperMasterContenderProcess(const zookeeper::URL& url); + explicit ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group); + virtual ~ZooKeeperMasterContenderProcess(); + + // Explicitely use 'initialize' since we're overloading below. + using process::ProcessBase::initialize; + + void initialize(const MasterInfo& masterInfo); + + // MasterContender implementation. + virtual Future<Future<Nothing>> contend(); + +private: + Owned<zookeeper::Group> group; + LeaderContender* contender; + + // The master this contender contends on behalf of. + Option<MasterInfo> masterInfo; + Option<Future<Future<Nothing>>> candidacy; +}; + + +Try<MasterContender*> MasterContender::create(const Option<string>& _mechanism) +{ + if (_mechanism.isNone()) { + return new StandaloneMasterContender(); + } + + string mechanism = _mechanism.get(); + + if (strings::startsWith(mechanism, "zk://")) { + Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism); + if (url.isError()) { + return Error(url.error()); + } + if (url.get().path == "/") { + return Error( + "Expecting a (chroot) path for ZooKeeper ('/' is not supported)"); + } + return new ZooKeeperMasterContender(url.get()); + } else if (strings::startsWith(mechanism, "file://")) { + // Load the configuration out of a file. While Mesos and related + // programs always use <stout/flags> to process the command line + // arguments (and therefore file://) this entrypoint is exposed by + // libmesos, with frameworks currently calling it and expecting it + // to do the argument parsing for them which roughly matches the + // argument parsing Mesos will do. + // TODO(cmaloney): Rework the libmesos exposed APIs to expose + // A "flags" endpoint where the framework can pass the command + // line arguments and they will be parsed by <stout/flags> and the + // needed flags extracted, and then change this interface to + // require final values from the flags. This means that a + // framework doesn't need to know how the flags are passed to + // match mesos' command line arguments if it wants, but if it + // needs to inspect/manipulate arguments, it can. + LOG(WARNING) << "Specifying master election mechanism / ZooKeeper URL to " + "be read out of a file via 'file://' is deprecated inside " + "Mesos and will be removed in a future release."; + const string& path = mechanism.substr(7); + const Try<string> read = os::read(path); + if (read.isError()) { + return Error("Failed to read from file at '" + path + "'"); + } + + return create(strings::trim(read.get())); + } + + CHECK(!strings::startsWith(mechanism, "file://")); + + return Error("Failed to parse '" + mechanism + "'"); +} + + +MasterContender::~MasterContender() {} + + +StandaloneMasterContender::~StandaloneMasterContender() +{ + if (promise != NULL) { + promise->set(Nothing()); // Leadership lost. + delete promise; + } +} + + +void StandaloneMasterContender::initialize(const MasterInfo& masterInfo) +{ + // We don't really need to store the master in this basic + // implementation so we just restore an 'initialized' flag to make + // sure it is called. + initialized = true; +} + + +Future<Future<Nothing>> StandaloneMasterContender::contend() +{ + if (!initialized) { + return Failure("Initialize the contender first"); + } + + if (promise != NULL) { + LOG(INFO) << "Withdrawing the previous membership before recontending"; + promise->set(Nothing()); + delete promise; + } + + // Directly return a future that is always pending because it + // represents a membership/leadership that is not going to be lost + // until we 'withdraw'. + promise = new Promise<Nothing>(); + return promise->future(); +} + + +ZooKeeperMasterContender::ZooKeeperMasterContender(const zookeeper::URL& url) +{ + process = new ZooKeeperMasterContenderProcess(url); + spawn(process); +} + + +ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group) +{ + process = new ZooKeeperMasterContenderProcess(group); + spawn(process); +} + + +ZooKeeperMasterContender::~ZooKeeperMasterContender() +{ + terminate(process); + process::wait(process); + delete process; +} + + +void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo) +{ + process->initialize(masterInfo); +} + + +Future<Future<Nothing>> ZooKeeperMasterContender::contend() +{ + return dispatch(process, &ZooKeeperMasterContenderProcess::contend); +} + + +ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess( + const zookeeper::URL& url) + : ZooKeeperMasterContenderProcess(Owned<Group>( + new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT))) {} + + +ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess( + Owned<Group> _group) + : ProcessBase(ID::generate("zookeeper-master-contender")), + group(_group), + contender(NULL) {} + + +ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess() +{ + delete contender; +} + +void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo) +{ + masterInfo = _masterInfo; +} + + +Future<Future<Nothing>> ZooKeeperMasterContenderProcess::contend() +{ + if (masterInfo.isNone()) { + return Failure("Initialize the contender first"); + } + + // Should not recontend if the last election is still ongoing. + if (candidacy.isSome() && candidacy.get().isPending()) { + return candidacy.get(); + } + + if (contender != NULL) { + LOG(INFO) << "Withdrawing the previous membership before recontending"; + delete contender; + } + + // Serialize the MasterInfo to JSON. + JSON::Object json = JSON::protobuf(masterInfo.get()); + + contender = new LeaderContender( + group.get(), + stringify(json), + mesos::internal::master::MASTER_INFO_JSON_LABEL); + candidacy = contender->contend(); + return candidacy.get(); +} + +} // namespace contender { +} // namespace master { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.hpp ---------------------------------------------------------------------- diff --git a/src/master/contender/contender.hpp b/src/master/contender/contender.hpp new file mode 100644 index 0000000..ba05551 --- /dev/null +++ b/src/master/contender/contender.hpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __MASTER_CONTENDER_HPP__ +#define __MASTER_CONTENDER_HPP__ + +#include <mesos/master/contender.hpp> + +#include <process/defer.hpp> +#include <process/future.hpp> +#include <process/owned.hpp> +#include <process/pid.hpp> + +#include <stout/lambda.hpp> +#include <stout/nothing.hpp> + +#include "messages/messages.hpp" + +#include "zookeeper/contender.hpp" +#include "zookeeper/group.hpp" +#include "zookeeper/url.hpp" + +namespace mesos { +namespace internal { + +extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT; + +class ZooKeeperMasterContenderProcess; + +// A basic implementation which assumes only one master is +// contending. +class StandaloneMasterContender : public MasterContender +{ +public: + StandaloneMasterContender() + : initialized(false), + promise(NULL) {} + + virtual ~StandaloneMasterContender(); + + // MasterContender implementation. + virtual void initialize(const MasterInfo& masterInfo); + + // In this basic implementation the outer Future directly returns + // and inner Future stays pending because there is only one + // contender in the contest. + virtual process::Future<process::Future<Nothing> > contend(); + +private: + bool initialized; + process::Promise<Nothing>* promise; +}; + + +class ZooKeeperMasterContender : public MasterContender +{ +public: + // Creates a contender that uses ZooKeeper to determine (i.e., + // elect) a leading master. + explicit ZooKeeperMasterContender(const zookeeper::URL& url); + explicit ZooKeeperMasterContender(process::Owned<zookeeper::Group> group); + + virtual ~ZooKeeperMasterContender(); + + // MasterContender implementation. + virtual void initialize(const MasterInfo& masterInfo); + virtual process::Future<process::Future<Nothing> > contend(); + +private: + ZooKeeperMasterContenderProcess* process; +}; + +} // namespace internal { +} // namespace mesos { + +#endif // __MASTER_CONTENDER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.cpp ---------------------------------------------------------------------- diff --git a/src/master/detector/detector.cpp b/src/master/detector/detector.cpp new file mode 100644 index 0000000..ad9c209 --- /dev/null +++ b/src/master/detector/detector.cpp @@ -0,0 +1,522 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <set> +#include <string> + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/future.hpp> +#include <process/id.hpp> +#include <process/logging.hpp> +#include <process/pid.hpp> +#include <process/process.hpp> + +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/lambda.hpp> +#include <stout/protobuf.hpp> + +#include <mesos/master/detector.hpp> + +#include "common/protobuf_utils.hpp" + +#include "master/constants.hpp" +#include "master/detector.hpp" +#include "master/master.hpp" + +#include "messages/messages.hpp" + +#include "zookeeper/detector.hpp" +#include "zookeeper/group.hpp" +#include "zookeeper/url.hpp" + +using namespace process; +using namespace zookeeper; + +using std::set; +using std::string; + +namespace mesos { +namespace master { +namespace detector { + +const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10); + +// TODO(bmahler): Consider moving these kinds of helpers into +// libprocess or a common header within mesos. +namespace promises { + +// Helper for setting a set of Promises. +template <typename T> +void set(std::set<Promise<T>* >* promises, const T& t) +{ + foreach (Promise<T>* promise, *promises) { + promise->set(t); + delete promise; + } + promises->clear(); +} + + +// Helper for failing a set of Promises. +template <typename T> +void fail(std::set<Promise<T>* >* promises, const string& failure) +{ + foreach (Promise<Option<MasterInfo> >* promise, *promises) { + promise->fail(failure); + delete promise; + } + promises->clear(); +} + + +// Helper for discarding a set of Promises. +template <typename T> +void discard(std::set<Promise<T>* >* promises) +{ + foreach (Promise<T>* promise, *promises) { + promise->discard(); + delete promise; + } + promises->clear(); +} + + +// Helper for discarding an individual promise in the set. +template <typename T> +void discard(std::set<Promise<T>* >* promises, const Future<T>& future) +{ + foreach (Promise<T>* promise, *promises) { + if (promise->future() == future) { + promise->discard(); + promises->erase(promise); + delete promise; + return; + } + } +} + +} // namespace promises { + + +class StandaloneMasterDetectorProcess + : public Process<StandaloneMasterDetectorProcess> +{ +public: + StandaloneMasterDetectorProcess() + : ProcessBase(ID::generate("standalone-master-detector")) {} + explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader) + : ProcessBase(ID::generate("standalone-master-detector")), + leader(_leader) {} + + ~StandaloneMasterDetectorProcess() + { + promises::discard(&promises); + } + + void appoint(const Option<MasterInfo>& leader_) + { + leader = leader_; + + promises::set(&promises, leader); + } + + Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()) + { + if (leader != previous) { + return leader; + } + + Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); + + promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); + + promises.insert(promise); + return promise->future(); + } + +private: + void discard(const Future<Option<MasterInfo> >& future) + { + // Discard the promise holding this future. + promises::discard(&promises, future); + } + + Option<MasterInfo> leader; // The appointed master. + set<Promise<Option<MasterInfo> >*> promises; +}; + + +class ZooKeeperMasterDetectorProcess + : public Process<ZooKeeperMasterDetectorProcess> +{ +public: + explicit ZooKeeperMasterDetectorProcess(const zookeeper::URL& url); + explicit ZooKeeperMasterDetectorProcess(Owned<Group> group); + ~ZooKeeperMasterDetectorProcess(); + + virtual void initialize(); + Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous); + +private: + void discard(const Future<Option<MasterInfo> >& future); + + // Invoked when the group leadership has changed. + void detected(const Future<Option<Group::Membership> >& leader); + + // Invoked when we have fetched the data associated with the leader. + void fetched( + const Group::Membership& membership, + const Future<Option<string> >& data); + + Owned<Group> group; + LeaderDetector detector; + + // The leading Master. + Option<MasterInfo> leader; + set<Promise<Option<MasterInfo> >*> promises; + + // Potential non-retryable error. + Option<Error> error; +}; + + +Try<MasterDetector*> MasterDetector::create(const Option<string>& _mechanism) +{ + if (_mechanism.isNone()) { + return new StandaloneMasterDetector(); + } + + string mechanism = _mechanism.get(); + + if (strings::startsWith(mechanism, "zk://")) { + Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism); + if (url.isError()) { + return Error(url.error()); + } + if (url.get().path == "/") { + return Error( + "Expecting a (chroot) path for ZooKeeper ('/' is not supported)"); + } + return new ZooKeeperMasterDetector(url.get()); + } else if (strings::startsWith(mechanism, "file://")) { + // Load the configuration out of a file. While Mesos and related + // programs always use <stout/flags> to process the command line + // arguments (and therefore file://) this entrypoint is exposed by + // libmesos, with frameworks currently calling it and expecting it + // to do the argument parsing for them which roughly matches the + // argument parsing Mesos will do. + // TODO(cmaloney): Rework the libmesos exposed APIs to expose + // A "flags" endpoint where the framework can pass the command + // line arguments and they will be parsed by <stout/flags> and the + // needed flags extracted, and then change this interface to + // require final values from the flags. This means that a + // framework doesn't need to know how the flags are passed to + // match mesos' command line arguments if it wants, but if it + // needs to inspect/manipulate arguments, it can. + LOG(WARNING) << "Specifying master detection mechanism / ZooKeeper URL to " + "be read out of a file via 'file://' is deprecated inside " + "Mesos and will be removed in a future release."; + const string& path = mechanism.substr(7); + const Try<string> read = os::read(path); + if (read.isError()) { + return Error("Failed to read from file at '" + path + "'"); + } + + return create(strings::trim(read.get())); + } + + CHECK(!strings::startsWith(mechanism, "file://")); + + // Okay, try and parse what we got as a PID. + UPID pid = mechanism.find("master@") == 0 + ? UPID(mechanism) + : UPID("master@" + mechanism); + + if (!pid) { + return Error("Failed to parse '" + mechanism + "'"); + } + + return new StandaloneMasterDetector( + internal::protobuf::createMasterInfo(pid)); +} + + +MasterDetector::~MasterDetector() {} + + +StandaloneMasterDetector::StandaloneMasterDetector() +{ + process = new StandaloneMasterDetectorProcess(); + spawn(process); +} + + +StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader) +{ + process = new StandaloneMasterDetectorProcess(leader); + spawn(process); +} + + +StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader) +{ + process = new StandaloneMasterDetectorProcess( + mesos::internal::protobuf::createMasterInfo(leader)); + + spawn(process); +} + + +StandaloneMasterDetector::~StandaloneMasterDetector() +{ + terminate(process); + process::wait(process); + delete process; +} + + +void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader) +{ + dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader); +} + + +void StandaloneMasterDetector::appoint(const UPID& leader) +{ + dispatch(process, + &StandaloneMasterDetectorProcess::appoint, + mesos::internal::protobuf::createMasterInfo(leader)); +} + + +Future<Option<MasterInfo> > StandaloneMasterDetector::detect( + const Option<MasterInfo>& previous) +{ + return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous); +} + + +// TODO(benh): Get ZooKeeper timeout from configuration. +ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess( + const zookeeper::URL& url) + : ZooKeeperMasterDetectorProcess(Owned<Group>( + new Group(url.servers, + MASTER_DETECTOR_ZK_SESSION_TIMEOUT, + url.path, + url.authentication))) {} + + +ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess( + Owned<Group> _group) + : ProcessBase(ID::generate("zookeeper-master-detector")), + group(_group), + detector(group.get()), + leader(None()) {} + + +ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess() +{ + promises::discard(&promises); +} + + +void ZooKeeperMasterDetectorProcess::initialize() +{ + detector.detect() + .onAny(defer(self(), &Self::detected, lambda::_1)); +} + + +void ZooKeeperMasterDetectorProcess::discard( + const Future<Option<MasterInfo> >& future) +{ + // Discard the promise holding this future. + promises::discard(&promises, future); +} + + +Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect( + const Option<MasterInfo>& previous) +{ + // Return immediately if the detector is no longer operational due + // to a non-retryable error. + if (error.isSome()) { + return Failure(error.get().message); + } + + if (leader != previous) { + return leader; + } + + Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); + + promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); + + promises.insert(promise); + return promise->future(); +} + + +void ZooKeeperMasterDetectorProcess::detected( + const Future<Option<Group::Membership> >& _leader) +{ + CHECK(!_leader.isDiscarded()); + + if (_leader.isFailed()) { + LOG(ERROR) << "Failed to detect the leader: " << _leader.failure(); + + // Setting this error stops the detection loop and the detector + // transitions to an erroneous state. Further calls to detect() + // will directly fail as a result. + error = Error(_leader.failure()); + leader = None(); + + promises::fail(&promises, _leader.failure()); + + return; + } + + if (_leader.get().isNone()) { + leader = None(); + + promises::set(&promises, leader); + } else { + // Fetch the data associated with the leader. + group->data(_leader.get().get()) + .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1)); + } + + // Keep trying to detect leadership changes. + detector.detect(_leader.get()) + .onAny(defer(self(), &Self::detected, lambda::_1)); +} + + +void ZooKeeperMasterDetectorProcess::fetched( + const Group::Membership& membership, + const Future<Option<string> >& data) +{ + CHECK(!data.isDiscarded()); + + if (data.isFailed()) { + leader = None(); + promises::fail(&promises, data.failure()); + return; + } else if (data.get().isNone()) { + // Membership is gone before we can read its data. + leader = None(); + promises::set(&promises, leader); + return; + } + + // Parse the data based on the membership label and cache the + // leader for subsequent requests. + Option<string> label = membership.label(); + if (label.isNone()) { + // If we are here it means some masters are still creating znodes + // with the old format. + UPID pid = UPID(data.get().get()); + LOG(WARNING) << "Leading master " << pid << " has data in old format"; + leader = mesos::internal::protobuf::createMasterInfo(pid); + } else if (label.isSome() && + label.get() == mesos::internal::master::MASTER_INFO_LABEL) { + MasterInfo info; + if (!info.ParseFromString(data.get().get())) { + leader = None(); + promises::fail(&promises, "Failed to parse data into MasterInfo"); + return; + } + LOG(WARNING) << "Leading master " << info.pid() + << " is using a Protobuf binary format when registering with " + << "ZooKeeper (" << label.get() << "): this will be deprecated" + << " as of Mesos 0.24 (see MESOS-2340)"; + leader = info; + } else if (label.isSome() && + label.get() == mesos::internal::master::MASTER_INFO_JSON_LABEL) { + Try<JSON::Object> object = JSON::parse<JSON::Object>(data.get().get()); + + if (object.isError()) { + leader = None(); + promises::fail( + &promises, + "Failed to parse data into valid JSON: " + object.error()); + return; + } + + Try<mesos::MasterInfo> info = + ::protobuf::parse<mesos::MasterInfo>(object.get()); + + if (info.isError()) { + leader = None(); + promises::fail( + &promises, + "Failed to parse JSON into a valid MasterInfo protocol buffer: " + + info.error()); + return; + } + + leader = info.get(); + } else { + leader = None(); + promises::fail( + &promises, + "Failed to parse data of unknown label '" + label.get() + "'"); + return; + } + + LOG(INFO) << "A new leading master (UPID=" + << UPID(leader.get().pid()) << ") is detected"; + + promises::set(&promises, leader); +} + + +ZooKeeperMasterDetector::ZooKeeperMasterDetector(const zookeeper::URL& url) +{ + process = new ZooKeeperMasterDetectorProcess(url); + spawn(process); +} + + +ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group) +{ + process = new ZooKeeperMasterDetectorProcess(group); + spawn(process); +} + + +ZooKeeperMasterDetector::~ZooKeeperMasterDetector() +{ + terminate(process); + process::wait(process); + delete process; +} + + +Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect( + const Option<MasterInfo>& previous) +{ + return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous); +} + +} // namespace detector { +} // namespace master { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.hpp ---------------------------------------------------------------------- diff --git a/src/master/detector/detector.hpp b/src/master/detector/detector.hpp new file mode 100644 index 0000000..8400265 --- /dev/null +++ b/src/master/detector/detector.hpp @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __MASTER_DETECTOR_HPP__ +#define __MASTER_DETECTOR_HPP__ + +#include <string> + +#include <process/future.hpp> +#include <process/owned.hpp> + +#include <stout/option.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> + +#include "messages/messages.hpp" + +#include "zookeeper/detector.hpp" +#include "zookeeper/group.hpp" +#include "zookeeper/url.hpp" + +namespace mesos { +namespace internal { + +extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT; + +// Forward declarations. +class StandaloneMasterDetectorProcess; +class ZooKeeperMasterDetectorProcess; + +// A standalone implementation of the MasterDetector with no external +// discovery mechanism so the user has to manually appoint a leader +// to the detector for it to be detected. +class StandaloneMasterDetector : public MasterDetector +{ +public: + StandaloneMasterDetector(); + // Use this constructor if the leader is known beforehand so it is + // unnecessary to call 'appoint()' separately. + explicit StandaloneMasterDetector(const MasterInfo& leader); + + // Same as above but takes UPID as the parameter. + explicit StandaloneMasterDetector(const process::UPID& leader); + + virtual ~StandaloneMasterDetector(); + + // Appoint the leading master so it can be *detected*. + void appoint(const Option<MasterInfo>& leader); + + // Same as above but takes 'UPID' as the parameter. + void appoint(const process::UPID& leader); + + virtual process::Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()); + +private: + StandaloneMasterDetectorProcess* process; +}; + + +class ZooKeeperMasterDetector : public MasterDetector +{ +public: + // Creates a detector which uses ZooKeeper to determine (i.e., + // elect) a leading master. + explicit ZooKeeperMasterDetector(const zookeeper::URL& url); + // Used for testing purposes. + explicit ZooKeeperMasterDetector(process::Owned<zookeeper::Group> group); + virtual ~ZooKeeperMasterDetector(); + + // MasterDetector implementation. + // The detector transparently tries to recover from retryable + // errors until the group session expires, in which case the Future + // returns None. + virtual process::Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()); + +private: + ZooKeeperMasterDetectorProcess* process; +}; + +} // namespace internal { +} // namespace mesos { + +#endif // __MASTER_DETECTOR_HPP__