Repository: mesos
Updated Branches:
  refs/heads/master ff19e9786 -> 2ea3819ef


Updated log to use the new interval set abstraction.

Review: https://reviews.apache.org/r/18368


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2ea3819e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2ea3819e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2ea3819e

Branch: refs/heads/master
Commit: 2ea3819ef90c6e0c78be5059a1c385ae7cf01fbc
Parents: ff19e97
Author: Jie Yu <[email protected]>
Authored: Fri Feb 21 11:15:37 2014 -0800
Committer: Jie Yu <[email protected]>
Committed: Thu Mar 6 10:58:40 2014 -0800

----------------------------------------------------------------------
 src/log/catchup.cpp     | 70 +++++++++++++++++++++++++++++++++-----------
 src/log/catchup.hpp     |  5 ++--
 src/log/coordinator.cpp |  9 +++---
 src/log/recover.cpp     | 12 ++------
 src/log/replica.cpp     | 52 +++++++++++++++-----------------
 src/log/replica.hpp     |  8 +++--
 src/log/storage.hpp     | 13 ++++----
 src/tests/log_tests.cpp |  5 ++--
 8 files changed, 99 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index dae4619..6315a85 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -36,7 +36,6 @@
 using namespace process;
 
 using std::list;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -180,7 +179,7 @@ public:
       const Shared<Replica>& _replica,
       const Shared<Network>& _network,
       uint64_t _proposal,
-      const vector<uint64_t>& _positions,
+      const Interval<uint64_t>& _positions,
       const Duration& _timeout)
     : ProcessBase(ID::generate("log-bulk-catch-up")),
       quorum(_quorum),
@@ -202,7 +201,7 @@ protected:
         static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
 
     // Catch-up sequentially.
-    it = positions.begin();
+    current = positions.lower();
 
     catchup();
   }
@@ -224,7 +223,9 @@ private:
 
   void catchup()
   {
-    if (it == positions.end()) {
+    if (current >= positions.upper()) {
+      // Stop the process if there is nothing left to catch-up. This
+      // also handles the case where the input interval is empty.
       promise.set(Nothing());
       terminate(self());
       return;
@@ -232,7 +233,7 @@ private:
 
     // Store the future so that we can discard it if the user wants to
     // cancel the catch-up operation.
-    catching = log::catchup(quorum, replica, network, proposal, *it)
+    catching = log::catchup(quorum, replica, network, proposal, current)
       .onDiscarded(defer(self(), &Self::discarded))
       .onFailed(defer(self(), &Self::failed))
       .onReady(defer(self(), &Self::succeeded));
@@ -243,7 +244,7 @@ private:
 
   void discarded()
   {
-    LOG(INFO) << "Unable to catch-up position " << *it
+    LOG(INFO) << "Unable to catch-up position " << current
               << " in " << timeout << ", retrying";
 
     catchup();
@@ -252,7 +253,7 @@ private:
   void failed()
   {
     promise.fail(
-        "Failed to catch-up position " + stringify(*it) +
+        "Failed to catch-up position " + stringify(current) +
         ": " + catching.failure());
 
     terminate(self());
@@ -260,7 +261,7 @@ private:
 
   void succeeded()
   {
-    ++it;
+    ++current;
 
     // The single position catch-up function: 'log::catchup' will
     // return the highest proposal number seen so far. We use this
@@ -275,28 +276,23 @@ private:
   const size_t quorum;
   const Shared<Replica> replica;
   const Shared<Network> network;
-  const vector<uint64_t> positions;
+  const Interval<uint64_t> positions;
   const Duration timeout;
 
   uint64_t proposal;
-  vector<uint64_t>::const_iterator it;
+  uint64_t current;
 
   process::Promise<Nothing> promise;
   Future<uint64_t> catching;
 };
 
 
-/////////////////////////////////////////////////
-// Public interfaces below.
-/////////////////////////////////////////////////
-
-
-Future<Nothing> catchup(
+static Future<Nothing> catchup(
     size_t quorum,
     const Shared<Replica>& replica,
     const Shared<Network>& network,
     const Option<uint64_t>& proposal,
-    const vector<uint64_t>& positions,
+    const Interval<uint64_t>& positions,
     const Duration& timeout)
 {
   BulkCatchUpProcess* process =
@@ -313,6 +309,46 @@ Future<Nothing> catchup(
   return future;
 }
 
+
+/////////////////////////////////////////////////
+// Public interfaces below.
+/////////////////////////////////////////////////
+
+
+Future<Nothing> catchup(
+    size_t quorum,
+    const Shared<Replica>& replica,
+    const Shared<Network>& network,
+    const Option<uint64_t>& proposal,
+    const IntervalSet<uint64_t>& positions,
+    const Duration& timeout)
+{
+  // Necessary to disambiguate overloaded functions.
+  Future<Nothing> (*f)(
+      size_t quorum,
+      const Shared<Replica>& replica,
+      const Shared<Network>& network,
+      const Option<uint64_t>& proposal,
+      const Interval<uint64_t>& positions,
+      const Duration& timeout) = &catchup;
+
+  Future<Nothing> future = Nothing();
+
+  foreach (const Interval<uint64_t>& interval, positions) {
+    future = future.then(
+        lambda::bind(
+            f,
+            quorum,
+            replica,
+            network,
+            proposal,
+            interval,
+            timeout));
+  }
+
+  return future;
+}
+
 } // namespace log {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/catchup.hpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp
index 45dc016..5e23b57 100644
--- a/src/log/catchup.hpp
+++ b/src/log/catchup.hpp
@@ -21,12 +21,11 @@
 
 #include <stdint.h>
 
-#include <vector>
-
 #include <process/future.hpp>
 #include <process/shared.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/interval.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 
@@ -50,7 +49,7 @@ extern process::Future<Nothing> catchup(
     const process::Shared<Replica>& replica,
     const process::Shared<Network>& network,
     const Option<uint64_t>& proposal,
-    const std::vector<uint64_t>& positions,
+    const IntervalSet<uint64_t>& positions,
     const Duration& timeout = Seconds(10));
 
 } // namespace log {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/coordinator.cpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp
index 6bfff1e..9a2faba 100644
--- a/src/log/coordinator.cpp
+++ b/src/log/coordinator.cpp
@@ -38,7 +38,6 @@
 using namespace process;
 
 using std::string;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -83,8 +82,8 @@ private:
   Future<Nothing> updateProposal(uint64_t promised);
   Future<PromiseResponse> runPromisePhase();
   Future<Option<uint64_t> > checkPromisePhase(const PromiseResponse& response);
-  Future<vector<uint64_t> > getMissingPositions();
-  Future<Nothing> catchupMissingPositions(const vector<uint64_t>& positions);
+  Future<IntervalSet<uint64_t> > getMissingPositions();
+  Future<Nothing> catchupMissingPositions(const IntervalSet<uint64_t>& 
positions);
   Future<Option<uint64_t> > updateIndexAfterElected();
   void electingFinished(const Option<uint64_t>& position);
   void electingFailed();
@@ -216,14 +215,14 @@ Future<Option<uint64_t> > 
CoordinatorProcess::checkPromisePhase(
 }
 
 
-Future<vector<uint64_t> > CoordinatorProcess::getMissingPositions()
+Future<IntervalSet<uint64_t> > CoordinatorProcess::getMissingPositions()
 {
   return replica->missing(0, index);
 }
 
 
 Future<Nothing> CoordinatorProcess::catchupMissingPositions(
-    const vector<uint64_t>& positions)
+    const IntervalSet<uint64_t>& positions)
 {
   LOG(INFO) << "Coordinator attemping to fill missing position";
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index 3403b47..a5819ec 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -20,7 +20,6 @@
 #include <stdlib.h>
 
 #include <set>
-#include <vector>
 
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -43,7 +42,6 @@
 using namespace process;
 
 using std::set;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -336,17 +334,13 @@ private:
     CHECK(highestEndPosition.isSome());
     CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
 
-    uint64_t begin = lowestBeginPosition.get();
-    uint64_t end = highestEndPosition.get();
-
     LOG(INFO) << "Starting catch-up from position "
               << lowestBeginPosition.get() << " to "
               << highestEndPosition.get();
 
-    vector<uint64_t> positions;
-    for (uint64_t p = begin; p <= end; ++p) {
-      positions.push_back(p);
-    }
+    IntervalSet<uint64_t> positions(
+        Bound<uint64_t>::closed(lowestBeginPosition.get()),
+        Bound<uint64_t>::closed(highestEndPosition.get()));
 
     // Share the ownership of the replica. From this point until the
     // point where the ownership of the replica is regained, we should

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/replica.cpp
----------------------------------------------------------------------
diff --git a/src/log/replica.cpp b/src/log/replica.cpp
index 1f1a945..6db6d05 100644
--- a/src/log/replica.cpp
+++ b/src/log/replica.cpp
@@ -20,9 +20,6 @@
 
 #include <algorithm>
 
-#include <boost/icl/interval.hpp>
-#include <boost/icl/interval_set.hpp>
-
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 
@@ -41,13 +38,10 @@
 #include "log/replica.hpp"
 #include "log/storage.hpp"
 
-using namespace boost::icl;
-
 using namespace process;
 
 using std::list;
 using std::string;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -90,7 +84,7 @@ public:
 
   // Returns missing positions in the log (i.e., unlearned or holes)
   // within the specified range [from, to].
-  vector<uint64_t> missing(uint64_t from, uint64_t to);
+  IntervalSet<uint64_t> missing(uint64_t from, uint64_t to);
 
   // Returns the beginning position of the log.
   uint64_t beginning();
@@ -148,10 +142,10 @@ private:
   uint64_t end;
 
   // Holes in the log.
-  interval_set<uint64_t> holes;
+  IntervalSet<uint64_t> holes;
 
   // Unlearned positions in the log.
-  interval_set<uint64_t> unlearned;
+  IntervalSet<uint64_t> unlearned;
 };
 
 
@@ -193,7 +187,7 @@ Result<Action> ReplicaProcess::read(uint64_t position)
     return Error("Attempted to read truncated position");
   } else if (end < position) {
     return None(); // These semantics are assumed above!
-  } else if (contains(holes, position)) {
+  } else if (holes.contains(position)) {
     return None();
   }
 
@@ -253,22 +247,20 @@ bool ReplicaProcess::missing(uint64_t position)
   } else if (position > end) {
     return true;
   } else {
-    if (contains(unlearned, position) || contains(holes, position)) {
-      return true;
-    } else {
-      return false;
-    }
+    return unlearned.contains(position) || holes.contains(position);
   }
 }
 
 
-vector<uint64_t> ReplicaProcess::missing(uint64_t from, uint64_t to)
+// TODO(jieyu): Allow this method to take an Interval.
+IntervalSet<uint64_t> ReplicaProcess::missing(uint64_t from, uint64_t to)
 {
   if (from > to) {
-    return vector<uint64_t>();
+    // Empty interval.
+    return IntervalSet<uint64_t>();
   }
 
-  interval_set<uint64_t> positions;
+  IntervalSet<uint64_t> positions;
 
   // Add unlearned positions.
   positions += unlearned;
@@ -278,14 +270,13 @@ vector<uint64_t> ReplicaProcess::missing(uint64_t from, 
uint64_t to)
 
   // Add all the unknown positions beyond our end.
   if (to > end) {
-    positions += interval<uint64_t>::closed(end + 1, to);
+    positions += (Bound<uint64_t>::open(end), Bound<uint64_t>::closed(to));
   }
 
   // Do not consider positions outside [from, to].
-  positions &= interval<uint64_t>::closed(from, to);
+  positions &= (Bound<uint64_t>::closed(from), Bound<uint64_t>::closed(to));
 
-  // Generate the resultant vector.
-  return vector<uint64_t>(elements_begin(positions), elements_end(positions));
+  return positions;
 }
 
 
@@ -678,14 +669,17 @@ bool ReplicaProcess::persist(const Action& action)
   // Update unlearned positions and deal with truncation actions.
   if (action.has_learned() && action.learned()) {
     unlearned -= action.position();
+
     if (action.has_type() && action.type() == Action::TRUNCATE) {
       // No longer consider truncated positions as holes (so that a
       // coordinator doesn't try and fill them).
-      holes -= interval<uint64_t>::open(0, action.truncate().to());
+      holes -= (Bound<uint64_t>::open(0),
+                Bound<uint64_t>::open(action.truncate().to()));
 
       // No longer consider truncated positions as unlearned (so that
       // a coordinator doesn't try and fill them).
-      unlearned -= interval<uint64_t>::open(0, action.truncate().to());
+      unlearned -= (Bound<uint64_t>::open(0),
+                    Bound<uint64_t>::open(action.truncate().to()));
 
       // And update the beginning position.
       begin = std::max(begin, action.truncate().to());
@@ -697,7 +691,8 @@ bool ReplicaProcess::persist(const Action& action)
 
   // Update holes if we just wrote many positions past the last end.
   if (action.position() > end) {
-    holes += interval<uint64_t>::open(end, action.position());
+    holes += (Bound<uint64_t>::open(end),
+              Bound<uint64_t>::open(action.position()));
   }
 
   // And update the end position.
@@ -720,14 +715,14 @@ void ReplicaProcess::restore(const string& path)
   unlearned = state.get().unlearned;
 
   // Only use the learned positions to help determine the holes.
-  const interval_set<uint64_t>& learned = state.get().learned;
+  const IntervalSet<uint64_t>& learned = state.get().learned;
 
   // Holes are those positions in [begin, end] that are not in both
   // learned and unlearned sets. In the case of a brand new log (begin
   // and end are 0, and learned and unlearned are empty), we assume
   // position 0 is a hole, and a coordinator will simply fill it with
   // a no-op when it first gets elected.
-  holes += interval<uint64_t>::closed(begin, end);
+  holes += (Bound<uint64_t>::closed(begin), Bound<uint64_t>::closed(end));
   holes -= learned;
   holes -= unlearned;
 
@@ -765,7 +760,8 @@ Future<bool> Replica::missing(uint64_t position) const
 }
 
 
-Future<vector<uint64_t> > Replica::missing(uint64_t from, uint64_t to) const
+Future<IntervalSet<uint64_t> > Replica::missing(
+    uint64_t from, uint64_t to) const
 {
   return dispatch(process, &ReplicaProcess::missing, from, to);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/replica.hpp
----------------------------------------------------------------------
diff --git a/src/log/replica.hpp b/src/log/replica.hpp
index 08ddcb1..6c51a58 100644
--- a/src/log/replica.hpp
+++ b/src/log/replica.hpp
@@ -23,12 +23,13 @@
 
 #include <list>
 #include <string>
-#include <vector>
 
 #include <process/future.hpp>
 #include <process/pid.hpp>
 #include <process/protobuf.hpp>
 
+#include <stout/interval.hpp>
+
 #include "messages/log.hpp"
 
 namespace mesos {
@@ -72,8 +73,9 @@ public:
   process::Future<bool> missing(uint64_t position) const;
 
   // Returns missing positions in the log (i.e., unlearned or holes)
-  // within the specified range [from, to].
-  process::Future<std::vector<uint64_t> > missing(
+  // within the specified range [from, to]. We use interval set, a
+  // more compact representation of set, to store missing positions.
+  process::Future<IntervalSet<uint64_t> > missing(
       uint64_t from,
       uint64_t to) const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/log/storage.hpp
----------------------------------------------------------------------
diff --git a/src/log/storage.hpp b/src/log/storage.hpp
index c0eba1b..60bf37f 100644
--- a/src/log/storage.hpp
+++ b/src/log/storage.hpp
@@ -23,8 +23,7 @@
 
 #include <string>
 
-#include <boost/icl/interval_set.hpp>
-
+#include <stout/interval.hpp>
 #include <stout/nothing.hpp>
 #include <stout/try.hpp>
 
@@ -44,11 +43,11 @@ public:
     uint64_t begin; // Beginning position of the log.
     uint64_t end; // Ending position of the log.
 
-    // Note that here we use boost interval set to store learned and
-    // unlearned positions in a more compact way. Adjacent positions
-    // will be merged and represented using an interval.
-    boost::icl::interval_set<uint64_t> learned;
-    boost::icl::interval_set<uint64_t> unlearned;
+    // Note that we use interval set here to store learned/unlearned
+    // positions in a more compact way. Adjacent positions will be
+    // merged and represented using an interval.
+    IntervalSet<uint64_t> learned;
+    IntervalSet<uint64_t> unlearned;
   };
 
   virtual ~Storage() {}

http://git-wip-us.apache.org/repos/asf/mesos/blob/2ea3819e/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index 7f2a740..da827a1 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -69,7 +69,6 @@ using namespace process;
 using std::list;
 using std::set;
 using std::string;
-using std::vector;
 
 using testing::_;
 using testing::Eq;
@@ -1613,13 +1612,13 @@ TEST_F(RecoverTest, CatchupRetry)
     EXPECT_EQ(0u, electing.get().get());
   }
 
-  vector<uint64_t> positions;
+  IntervalSet<uint64_t> positions;
 
   for (uint64_t position = 1; position <= 10; position++) {
     Future<uint64_t> appending = coord.append(stringify(position));
     AWAIT_READY_FOR(appending, Seconds(10));
     EXPECT_EQ(position, appending.get());
-    positions.push_back(position);
+    positions += position;
   }
 
   Shared<Replica> replica3(new Replica(path3));

Reply via email to