Repository: mesos Updated Branches: refs/heads/master ff19e9786 -> 2ea3819ef
Updated log to use the new interval set abstraction. Review: https://reviews.apache.org/r/18368 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2ea3819e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2ea3819e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2ea3819e Branch: refs/heads/master Commit: 2ea3819ef90c6e0c78be5059a1c385ae7cf01fbc Parents: ff19e97 Author: Jie Yu <[email protected]> Authored: Fri Feb 21 11:15:37 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Thu Mar 6 10:58:40 2014 -0800 ---------------------------------------------------------------------- src/log/catchup.cpp | 70 +++++++++++++++++++++++++++++++++----------- src/log/catchup.hpp | 5 ++-- src/log/coordinator.cpp | 9 +++--- src/log/recover.cpp | 12 ++------ src/log/replica.cpp | 52 +++++++++++++++----------------- src/log/replica.hpp | 8 +++-- src/log/storage.hpp | 13 ++++---- src/tests/log_tests.cpp | 5 ++-- 8 files changed, 99 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/catchup.cpp ---------------------------------------------------------------------- diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp index dae4619..6315a85 100644 --- a/src/log/catchup.cpp +++ b/src/log/catchup.cpp @@ -36,7 +36,6 @@ using namespace process; using std::list; -using std::vector; namespace mesos { namespace internal { @@ -180,7 +179,7 @@ public: const Shared<Replica>& _replica, const Shared<Network>& _network, uint64_t _proposal, - const vector<uint64_t>& _positions, + const Interval<uint64_t>& _positions, const Duration& _timeout) : ProcessBase(ID::generate("log-bulk-catch-up")), quorum(_quorum), @@ -202,7 +201,7 @@ protected: static_cast<void(*)(const UPID&, bool)>(terminate), self(), true)); // Catch-up sequentially. - it = positions.begin(); + current = positions.lower(); catchup(); } @@ -224,7 +223,9 @@ private: void catchup() { - if (it == positions.end()) { + if (current >= positions.upper()) { + // Stop the process if there is nothing left to catch-up. This + // also handles the case where the input interval is empty. promise.set(Nothing()); terminate(self()); return; @@ -232,7 +233,7 @@ private: // Store the future so that we can discard it if the user wants to // cancel the catch-up operation. - catching = log::catchup(quorum, replica, network, proposal, *it) + catching = log::catchup(quorum, replica, network, proposal, current) .onDiscarded(defer(self(), &Self::discarded)) .onFailed(defer(self(), &Self::failed)) .onReady(defer(self(), &Self::succeeded)); @@ -243,7 +244,7 @@ private: void discarded() { - LOG(INFO) << "Unable to catch-up position " << *it + LOG(INFO) << "Unable to catch-up position " << current << " in " << timeout << ", retrying"; catchup(); @@ -252,7 +253,7 @@ private: void failed() { promise.fail( - "Failed to catch-up position " + stringify(*it) + + "Failed to catch-up position " + stringify(current) + ": " + catching.failure()); terminate(self()); @@ -260,7 +261,7 @@ private: void succeeded() { - ++it; + ++current; // The single position catch-up function: 'log::catchup' will // return the highest proposal number seen so far. We use this @@ -275,28 +276,23 @@ private: const size_t quorum; const Shared<Replica> replica; const Shared<Network> network; - const vector<uint64_t> positions; + const Interval<uint64_t> positions; const Duration timeout; uint64_t proposal; - vector<uint64_t>::const_iterator it; + uint64_t current; process::Promise<Nothing> promise; Future<uint64_t> catching; }; -///////////////////////////////////////////////// -// Public interfaces below. -///////////////////////////////////////////////// - - -Future<Nothing> catchup( +static Future<Nothing> catchup( size_t quorum, const Shared<Replica>& replica, const Shared<Network>& network, const Option<uint64_t>& proposal, - const vector<uint64_t>& positions, + const Interval<uint64_t>& positions, const Duration& timeout) { BulkCatchUpProcess* process = @@ -313,6 +309,46 @@ Future<Nothing> catchup( return future; } + +///////////////////////////////////////////////// +// Public interfaces below. +///////////////////////////////////////////////// + + +Future<Nothing> catchup( + size_t quorum, + const Shared<Replica>& replica, + const Shared<Network>& network, + const Option<uint64_t>& proposal, + const IntervalSet<uint64_t>& positions, + const Duration& timeout) +{ + // Necessary to disambiguate overloaded functions. + Future<Nothing> (*f)( + size_t quorum, + const Shared<Replica>& replica, + const Shared<Network>& network, + const Option<uint64_t>& proposal, + const Interval<uint64_t>& positions, + const Duration& timeout) = &catchup; + + Future<Nothing> future = Nothing(); + + foreach (const Interval<uint64_t>& interval, positions) { + future = future.then( + lambda::bind( + f, + quorum, + replica, + network, + proposal, + interval, + timeout)); + } + + return future; +} + } // namespace log { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/catchup.hpp ---------------------------------------------------------------------- diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp index 45dc016..5e23b57 100644 --- a/src/log/catchup.hpp +++ b/src/log/catchup.hpp @@ -21,12 +21,11 @@ #include <stdint.h> -#include <vector> - #include <process/future.hpp> #include <process/shared.hpp> #include <stout/duration.hpp> +#include <stout/interval.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> @@ -50,7 +49,7 @@ extern process::Future<Nothing> catchup( const process::Shared<Replica>& replica, const process::Shared<Network>& network, const Option<uint64_t>& proposal, - const std::vector<uint64_t>& positions, + const IntervalSet<uint64_t>& positions, const Duration& timeout = Seconds(10)); } // namespace log { http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/coordinator.cpp ---------------------------------------------------------------------- diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp index 6bfff1e..9a2faba 100644 --- a/src/log/coordinator.cpp +++ b/src/log/coordinator.cpp @@ -38,7 +38,6 @@ using namespace process; using std::string; -using std::vector; namespace mesos { namespace internal { @@ -83,8 +82,8 @@ private: Future<Nothing> updateProposal(uint64_t promised); Future<PromiseResponse> runPromisePhase(); Future<Option<uint64_t> > checkPromisePhase(const PromiseResponse& response); - Future<vector<uint64_t> > getMissingPositions(); - Future<Nothing> catchupMissingPositions(const vector<uint64_t>& positions); + Future<IntervalSet<uint64_t> > getMissingPositions(); + Future<Nothing> catchupMissingPositions(const IntervalSet<uint64_t>& positions); Future<Option<uint64_t> > updateIndexAfterElected(); void electingFinished(const Option<uint64_t>& position); void electingFailed(); @@ -216,14 +215,14 @@ Future<Option<uint64_t> > CoordinatorProcess::checkPromisePhase( } -Future<vector<uint64_t> > CoordinatorProcess::getMissingPositions() +Future<IntervalSet<uint64_t> > CoordinatorProcess::getMissingPositions() { return replica->missing(0, index); } Future<Nothing> CoordinatorProcess::catchupMissingPositions( - const vector<uint64_t>& positions) + const IntervalSet<uint64_t>& positions) { LOG(INFO) << "Coordinator attemping to fill missing position"; http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/recover.cpp ---------------------------------------------------------------------- diff --git a/src/log/recover.cpp b/src/log/recover.cpp index 3403b47..a5819ec 100644 --- a/src/log/recover.cpp +++ b/src/log/recover.cpp @@ -20,7 +20,6 @@ #include <stdlib.h> #include <set> -#include <vector> #include <process/defer.hpp> #include <process/delay.hpp> @@ -43,7 +42,6 @@ using namespace process; using std::set; -using std::vector; namespace mesos { namespace internal { @@ -336,17 +334,13 @@ private: CHECK(highestEndPosition.isSome()); CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get()); - uint64_t begin = lowestBeginPosition.get(); - uint64_t end = highestEndPosition.get(); - LOG(INFO) << "Starting catch-up from position " << lowestBeginPosition.get() << " to " << highestEndPosition.get(); - vector<uint64_t> positions; - for (uint64_t p = begin; p <= end; ++p) { - positions.push_back(p); - } + IntervalSet<uint64_t> positions( + Bound<uint64_t>::closed(lowestBeginPosition.get()), + Bound<uint64_t>::closed(highestEndPosition.get())); // Share the ownership of the replica. From this point until the // point where the ownership of the replica is regained, we should http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/replica.cpp ---------------------------------------------------------------------- diff --git a/src/log/replica.cpp b/src/log/replica.cpp index 1f1a945..6db6d05 100644 --- a/src/log/replica.cpp +++ b/src/log/replica.cpp @@ -20,9 +20,6 @@ #include <algorithm> -#include <boost/icl/interval.hpp> -#include <boost/icl/interval_set.hpp> - #include <process/dispatch.hpp> #include <process/id.hpp> @@ -41,13 +38,10 @@ #include "log/replica.hpp" #include "log/storage.hpp" -using namespace boost::icl; - using namespace process; using std::list; using std::string; -using std::vector; namespace mesos { namespace internal { @@ -90,7 +84,7 @@ public: // Returns missing positions in the log (i.e., unlearned or holes) // within the specified range [from, to]. - vector<uint64_t> missing(uint64_t from, uint64_t to); + IntervalSet<uint64_t> missing(uint64_t from, uint64_t to); // Returns the beginning position of the log. uint64_t beginning(); @@ -148,10 +142,10 @@ private: uint64_t end; // Holes in the log. - interval_set<uint64_t> holes; + IntervalSet<uint64_t> holes; // Unlearned positions in the log. - interval_set<uint64_t> unlearned; + IntervalSet<uint64_t> unlearned; }; @@ -193,7 +187,7 @@ Result<Action> ReplicaProcess::read(uint64_t position) return Error("Attempted to read truncated position"); } else if (end < position) { return None(); // These semantics are assumed above! - } else if (contains(holes, position)) { + } else if (holes.contains(position)) { return None(); } @@ -253,22 +247,20 @@ bool ReplicaProcess::missing(uint64_t position) } else if (position > end) { return true; } else { - if (contains(unlearned, position) || contains(holes, position)) { - return true; - } else { - return false; - } + return unlearned.contains(position) || holes.contains(position); } } -vector<uint64_t> ReplicaProcess::missing(uint64_t from, uint64_t to) +// TODO(jieyu): Allow this method to take an Interval. +IntervalSet<uint64_t> ReplicaProcess::missing(uint64_t from, uint64_t to) { if (from > to) { - return vector<uint64_t>(); + // Empty interval. + return IntervalSet<uint64_t>(); } - interval_set<uint64_t> positions; + IntervalSet<uint64_t> positions; // Add unlearned positions. positions += unlearned; @@ -278,14 +270,13 @@ vector<uint64_t> ReplicaProcess::missing(uint64_t from, uint64_t to) // Add all the unknown positions beyond our end. if (to > end) { - positions += interval<uint64_t>::closed(end + 1, to); + positions += (Bound<uint64_t>::open(end), Bound<uint64_t>::closed(to)); } // Do not consider positions outside [from, to]. - positions &= interval<uint64_t>::closed(from, to); + positions &= (Bound<uint64_t>::closed(from), Bound<uint64_t>::closed(to)); - // Generate the resultant vector. - return vector<uint64_t>(elements_begin(positions), elements_end(positions)); + return positions; } @@ -678,14 +669,17 @@ bool ReplicaProcess::persist(const Action& action) // Update unlearned positions and deal with truncation actions. if (action.has_learned() && action.learned()) { unlearned -= action.position(); + if (action.has_type() && action.type() == Action::TRUNCATE) { // No longer consider truncated positions as holes (so that a // coordinator doesn't try and fill them). - holes -= interval<uint64_t>::open(0, action.truncate().to()); + holes -= (Bound<uint64_t>::open(0), + Bound<uint64_t>::open(action.truncate().to())); // No longer consider truncated positions as unlearned (so that // a coordinator doesn't try and fill them). - unlearned -= interval<uint64_t>::open(0, action.truncate().to()); + unlearned -= (Bound<uint64_t>::open(0), + Bound<uint64_t>::open(action.truncate().to())); // And update the beginning position. begin = std::max(begin, action.truncate().to()); @@ -697,7 +691,8 @@ bool ReplicaProcess::persist(const Action& action) // Update holes if we just wrote many positions past the last end. if (action.position() > end) { - holes += interval<uint64_t>::open(end, action.position()); + holes += (Bound<uint64_t>::open(end), + Bound<uint64_t>::open(action.position())); } // And update the end position. @@ -720,14 +715,14 @@ void ReplicaProcess::restore(const string& path) unlearned = state.get().unlearned; // Only use the learned positions to help determine the holes. - const interval_set<uint64_t>& learned = state.get().learned; + const IntervalSet<uint64_t>& learned = state.get().learned; // Holes are those positions in [begin, end] that are not in both // learned and unlearned sets. In the case of a brand new log (begin // and end are 0, and learned and unlearned are empty), we assume // position 0 is a hole, and a coordinator will simply fill it with // a no-op when it first gets elected. - holes += interval<uint64_t>::closed(begin, end); + holes += (Bound<uint64_t>::closed(begin), Bound<uint64_t>::closed(end)); holes -= learned; holes -= unlearned; @@ -765,7 +760,8 @@ Future<bool> Replica::missing(uint64_t position) const } -Future<vector<uint64_t> > Replica::missing(uint64_t from, uint64_t to) const +Future<IntervalSet<uint64_t> > Replica::missing( + uint64_t from, uint64_t to) const { return dispatch(process, &ReplicaProcess::missing, from, to); } http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/replica.hpp ---------------------------------------------------------------------- diff --git a/src/log/replica.hpp b/src/log/replica.hpp index 08ddcb1..6c51a58 100644 --- a/src/log/replica.hpp +++ b/src/log/replica.hpp @@ -23,12 +23,13 @@ #include <list> #include <string> -#include <vector> #include <process/future.hpp> #include <process/pid.hpp> #include <process/protobuf.hpp> +#include <stout/interval.hpp> + #include "messages/log.hpp" namespace mesos { @@ -72,8 +73,9 @@ public: process::Future<bool> missing(uint64_t position) const; // Returns missing positions in the log (i.e., unlearned or holes) - // within the specified range [from, to]. - process::Future<std::vector<uint64_t> > missing( + // within the specified range [from, to]. We use interval set, a + // more compact representation of set, to store missing positions. + process::Future<IntervalSet<uint64_t> > missing( uint64_t from, uint64_t to) const; http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/storage.hpp ---------------------------------------------------------------------- diff --git a/src/log/storage.hpp b/src/log/storage.hpp index c0eba1b..60bf37f 100644 --- a/src/log/storage.hpp +++ b/src/log/storage.hpp @@ -23,8 +23,7 @@ #include <string> -#include <boost/icl/interval_set.hpp> - +#include <stout/interval.hpp> #include <stout/nothing.hpp> #include <stout/try.hpp> @@ -44,11 +43,11 @@ public: uint64_t begin; // Beginning position of the log. uint64_t end; // Ending position of the log. - // Note that here we use boost interval set to store learned and - // unlearned positions in a more compact way. Adjacent positions - // will be merged and represented using an interval. - boost::icl::interval_set<uint64_t> learned; - boost::icl::interval_set<uint64_t> unlearned; + // Note that we use interval set here to store learned/unlearned + // positions in a more compact way. Adjacent positions will be + // merged and represented using an interval. + IntervalSet<uint64_t> learned; + IntervalSet<uint64_t> unlearned; }; virtual ~Storage() {} http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/tests/log_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp index 7f2a740..da827a1 100644 --- a/src/tests/log_tests.cpp +++ b/src/tests/log_tests.cpp @@ -69,7 +69,6 @@ using namespace process; using std::list; using std::set; using std::string; -using std::vector; using testing::_; using testing::Eq; @@ -1613,13 +1612,13 @@ TEST_F(RecoverTest, CatchupRetry) EXPECT_EQ(0u, electing.get().get()); } - vector<uint64_t> positions; + IntervalSet<uint64_t> positions; for (uint64_t position = 1; position <= 10; position++) { Future<uint64_t> appending = coord.append(stringify(position)); AWAIT_READY_FOR(appending, Seconds(10)); EXPECT_EQ(position, appending.get()); - positions.push_back(position); + positions += position; } Shared<Replica> replica3(new Replica(path3));
