Repository: mesos
Updated Branches:
  refs/heads/master 04d8d1d3a -> 077f122d5


Optimized ranges subtraction operation.

The current ranges subtraction uses boost IntervalSet.
Based on the profiling result of MESOS-8989, the ranges
subtraction operation is about 2~3 times more expensive
than that of addition. The conversion cost from Ranges
to IntervalSet may be the culprit.

This patch optimizes the ranges subtraction operation by
converting Ranges to a vector of internal sub-ranges, sorting
the vector based on sub-range start and then applying a
one-pass algorithm similar to that of addition.

Review: https://reviews.apache.org/r/67965/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/077f122d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/077f122d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/077f122d

Branch: refs/heads/master
Commit: 077f122d52671412a2ab5d992d535712cc154002
Parents: 04d8d1d
Author: Meng Zhu <[email protected]>
Authored: Fri Jul 27 10:26:22 2018 -0700
Committer: Vinod Kone <[email protected]>
Committed: Fri Jul 27 10:26:22 2018 -0700

----------------------------------------------------------------------
 src/common/values.cpp      | 115 ++++++++++++++++++++++++++++++++++++----
 src/tests/values_tests.cpp |  16 +++++-
 src/v1/values.cpp          | 115 ++++++++++++++++++++++++++++++++++++----
 3 files changed, 224 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/common/values.cpp
----------------------------------------------------------------------
diff --git a/src/common/values.cpp b/src/common/values.cpp
index afe4137..f04d115 100644
--- a/src/common/values.cpp
+++ b/src/common/values.cpp
@@ -229,6 +229,105 @@ void coalesce(Value::Ranges* result, vector<Range> ranges)
   CHECK_EQ(result->range_size(), count);
 }
 
+
+// Subtract `right_` from `left_`, and return the result as Value::Ranges.
+Value::Ranges subtract(const Value::Ranges& left_, const Value::Ranges& right_)
+{
+  if (left_.range_size() == 0 || right_.range_size() == 0) {
+    return left_;
+  }
+
+  // Convert the input `Ranges` to `vector<internal::Range>` and
+  // sort the vector based on the start of a range.
+  auto sortRanges = [](const Value::Ranges& ranges) {
+    vector<internal::Range> result;
+    result.reserve(ranges.range_size());
+
+    foreach (const Value::Range& range, ranges.range()) {
+      result.push_back({range.begin(), range.end()});
+    }
+
+    std::sort(
+        result.begin(),
+        result.end(),
+        [](const internal::Range& left, const internal::Range& right) {
+          return left.start < right.start;
+        });
+
+    return result;
+  };
+
+  Value::Ranges result;
+
+  vector<internal::Range> left = sortRanges(left_);
+  vector<internal::Range> right = sortRanges(right_);
+
+  vector<internal::Range>::iterator itLeft = left.begin();
+  for (vector<internal::Range>::const_iterator itRight = right.cbegin();
+       itLeft != left.end() && itRight != right.cend();) {
+    // Non-overlap:
+    // L: |___|
+    // R:       |___|
+    if (itLeft->end < itRight->start) {
+      Value::Range* newRange = result.add_range();
+      newRange->set_begin(itLeft->start);
+      newRange->set_end(itLeft->end);
+
+      itLeft++;
+      continue;
+    }
+
+    // Non-overlap:
+    // L:       |___|
+    // R: |___|
+    if (itLeft->start > itRight->end) {
+      itRight++;
+      continue;
+    }
+
+    if (itLeft->start < itRight->start) {
+      Value::Range* newRange = result.add_range();
+      newRange->set_begin(itLeft->start);
+      newRange->set_end(itRight->start - 1);
+
+      if (itLeft->end <= itRight->end) {
+        // L: |_____|
+        // R:    |____|
+        itLeft++;
+      } else {
+        // L: |________|
+        // R:    |___|
+        itLeft->start = itRight->end + 1;
+        itRight++;
+      }
+    } else { // itLeft->start >= itRight->start
+      if (itLeft->end <= itRight->end) {
+        // L:   |____|
+        // R: |________|
+        itLeft++;
+      } else {
+        // L:   |_____|
+        // R: |____|
+        itLeft->start = itRight->end + 1;
+        itRight++;
+      }
+    }
+  }
+
+  // Traverse what's left in the `left`, if any.
+  while (itLeft != left.end()) {
+    // TODO(mzhu): Consider reserving the exact size.
+    Value::Range* newRange = result.add_range();
+    newRange->set_begin(itLeft->start);
+    newRange->set_end(itLeft->end);
+
+    itLeft++;
+  }
+
+  return result;
+}
+
+
 } // namespace internal {
 
 
@@ -352,6 +451,7 @@ bool operator<=(const Value::Ranges& _left, const 
Value::Ranges& _right)
 }
 
 
+// TODO(mzhu): Make this consistent with how we do subtraction.
 Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right)
 {
   Value::Ranges result;
@@ -362,12 +462,11 @@ Value::Ranges operator+(const Value::Ranges& left, const 
Value::Ranges& right)
 
 Value::Ranges operator-(const Value::Ranges& left, const Value::Ranges& right)
 {
-  Value::Ranges result;
-  coalesce(&result, {left});
-  return result -= right;
+  return internal::subtract(left, right);
 }
 
 
+// TODO(mzhu): Make this consistent with how we do subtraction.
 Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right)
 {
   coalesce(&left, {right});
@@ -375,15 +474,11 @@ Value::Ranges& operator+=(Value::Ranges& left, const 
Value::Ranges& right)
 }
 
 
-Value::Ranges& operator-=(Value::Ranges& _left, const Value::Ranges& _right)
+Value::Ranges& operator-=(Value::Ranges& left, const Value::Ranges& right)
 {
-  IntervalSet<uint64_t> left, right;
+  left = internal::subtract(left, right);
 
-  left = rangesToIntervalSet<uint64_t>(_left).get();
-  right = rangesToIntervalSet<uint64_t>(_right).get();
-  _left = intervalSetToRanges(left - right);
-
-  return _left;
+  return left;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/tests/values_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/values_tests.cpp b/src/tests/values_tests.cpp
index 2f5abed..e4fcf98 100644
--- a/src/tests/values_tests.cpp
+++ b/src/tests/values_tests.cpp
@@ -428,12 +428,24 @@ TEST(ValuesTest, RangesAddition)
 // Test subtracting two ranges.
 TEST(ValuesTest, RangesSubtraction)
 {
-  // Completely subsumes.
-  Value::Ranges ranges1 = parse("[3-8]")->ranges();
+  // Ranges1 is empty.
+  Value::Ranges ranges1 = parse("[]")->ranges();
   Value::Ranges ranges2 = parse("[1-10]")->ranges();
 
   EXPECT_EQ(parse("[]")->ranges(), ranges1 - ranges2);
 
+  // Ranges2 is empty.
+  ranges1 = parse("[3-8]")->ranges();
+  ranges2 = parse("[]")->ranges();
+
+  EXPECT_EQ(parse("[3-8]")->ranges(), ranges1 - ranges2);
+
+  // Completely subsumes.
+  ranges1 = parse("[3-8]")->ranges();
+  ranges2 = parse("[1-10]")->ranges();
+
+  EXPECT_EQ(parse("[]")->ranges(), ranges1 - ranges2);
+
   // Subsummed on left.
   ranges1 = parse("[3-8]")->ranges();
   ranges2 = parse("[3-5]")->ranges();

http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/v1/values.cpp
----------------------------------------------------------------------
diff --git a/src/v1/values.cpp b/src/v1/values.cpp
index d2c31f6..1be9945 100644
--- a/src/v1/values.cpp
+++ b/src/v1/values.cpp
@@ -228,6 +228,105 @@ void coalesce(Value::Ranges* result, vector<Range> ranges)
   CHECK_EQ(result->range_size(), count);
 }
 
+
+// Subtract `right_` from `left_`, and return the result as Value::Ranges.
+Value::Ranges subtract(const Value::Ranges& left_, const Value::Ranges& right_)
+{
+  if (left_.range_size() == 0 || right_.range_size() == 0) {
+    return left_;
+  }
+
+  // Convert the input `Ranges` to `vector<internal::Range>` and
+  // sort the vector based on the start of a range.
+  auto sortRanges = [](const Value::Ranges& ranges) {
+    vector<internal::Range> result;
+    result.reserve(ranges.range_size());
+
+    foreach (const Value::Range& range, ranges.range()) {
+      result.push_back({range.begin(), range.end()});
+    }
+
+    std::sort(
+        result.begin(),
+        result.end(),
+        [](const internal::Range& left, const internal::Range& right) {
+          return left.start < right.start;
+        });
+
+    return result;
+  };
+
+  Value::Ranges result;
+
+  vector<internal::Range> left = sortRanges(left_);
+  vector<internal::Range> right = sortRanges(right_);
+
+  vector<internal::Range>::iterator itLeft = left.begin();
+  for (vector<internal::Range>::const_iterator itRight = right.cbegin();
+       itLeft != left.end() && itRight != right.cend();) {
+    // Non-overlap:
+    // L: |___|
+    // R:       |___|
+    if (itLeft->end < itRight->start) {
+      Value::Range* newRange = result.add_range();
+      newRange->set_begin(itLeft->start);
+      newRange->set_end(itLeft->end);
+
+      itLeft++;
+      continue;
+    }
+
+    // Non-overlap:
+    // L:       |___|
+    // R: |___|
+    if (itLeft->start > itRight->end) {
+      itRight++;
+      continue;
+    }
+
+    if (itLeft->start < itRight->start) {
+      Value::Range* newRange = result.add_range();
+      newRange->set_begin(itLeft->start);
+      newRange->set_end(itRight->start - 1);
+
+      if (itLeft->end <= itRight->end) {
+        // L: |_____|
+        // R:    |____|
+        itLeft++;
+      } else {
+        // L: |________|
+        // R:    |___|
+        itLeft->start = itRight->end + 1;
+        itRight++;
+      }
+    } else { // itLeft->start >= itRight->start
+      if (itLeft->end <= itRight->end) {
+        // L:   |____|
+        // R: |________|
+        itLeft++;
+      } else {
+        // L:   |_____|
+        // R: |____|
+        itLeft->start = itRight->end + 1;
+        itRight++;
+      }
+    }
+  }
+
+  // Traverse what's left in the `left`, if any.
+  while (itLeft != left.end()) {
+    // TODO(mzhu): Consider reserving the exact size.
+    Value::Range* newRange = result.add_range();
+    newRange->set_begin(itLeft->start);
+    newRange->set_end(itLeft->end);
+
+    itLeft++;
+  }
+
+  return result;
+}
+
+
 } // namespace internal {
 
 
@@ -380,6 +479,7 @@ bool operator<=(const Value::Ranges& _left, const 
Value::Ranges& _right)
 }
 
 
+// TODO(mzhu): Make this consistent with how we do subtraction.
 Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right)
 {
   Value::Ranges result;
@@ -390,12 +490,11 @@ Value::Ranges operator+(const Value::Ranges& left, const 
Value::Ranges& right)
 
 Value::Ranges operator-(const Value::Ranges& left, const Value::Ranges& right)
 {
-  Value::Ranges result;
-  coalesce(&result, {left});
-  return result -= right;
+  return internal::subtract(left, right);
 }
 
 
+// TODO(mzhu): Make this consistent with how we do subtraction.
 Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right)
 {
   coalesce(&left, {right});
@@ -403,15 +502,11 @@ Value::Ranges& operator+=(Value::Ranges& left, const 
Value::Ranges& right)
 }
 
 
-Value::Ranges& operator-=(Value::Ranges& _left, const Value::Ranges& _right)
+Value::Ranges& operator-=(Value::Ranges& left, const Value::Ranges& right)
 {
-  IntervalSet<uint64_t> left, right;
+  left = internal::subtract(left, right);
 
-  left = rangesToIntervalSet(_left);
-  right = rangesToIntervalSet(_right);
-  _left = intervalSetToRanges(left - right);
-
-  return _left;
+  return left;
 }
 
 

Reply via email to