Repository: mesos Updated Branches: refs/heads/master 04d8d1d3a -> 077f122d5
Optimized ranges subtraction operation. The current ranges subtraction uses boost IntervalSet. Based on the profiling result of MESOS-8989, the ranges subtraction operation is about 2~3 times more expensive than that of addition. The conversion cost from Ranges to IntervalSet may be the culprit. This patch optimizes the ranges subtraction operation by converting Ranges to a vector of internal sub-ranges, sorting the vector based on sub-range start and then applying a one-pass algorithm similar to that of addition. Review: https://reviews.apache.org/r/67965/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/077f122d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/077f122d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/077f122d Branch: refs/heads/master Commit: 077f122d52671412a2ab5d992d535712cc154002 Parents: 04d8d1d Author: Meng Zhu <[email protected]> Authored: Fri Jul 27 10:26:22 2018 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Jul 27 10:26:22 2018 -0700 ---------------------------------------------------------------------- src/common/values.cpp | 115 ++++++++++++++++++++++++++++++++++++---- src/tests/values_tests.cpp | 16 +++++- src/v1/values.cpp | 115 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 224 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/common/values.cpp ---------------------------------------------------------------------- diff --git a/src/common/values.cpp b/src/common/values.cpp index afe4137..f04d115 100644 --- a/src/common/values.cpp +++ b/src/common/values.cpp @@ -229,6 +229,105 @@ void coalesce(Value::Ranges* result, vector<Range> ranges) CHECK_EQ(result->range_size(), count); } + +// Subtract `right_` from `left_`, and return the result as Value::Ranges. +Value::Ranges subtract(const Value::Ranges& left_, const Value::Ranges& right_) +{ + if (left_.range_size() == 0 || right_.range_size() == 0) { + return left_; + } + + // Convert the input `Ranges` to `vector<internal::Range>` and + // sort the vector based on the start of a range. + auto sortRanges = [](const Value::Ranges& ranges) { + vector<internal::Range> result; + result.reserve(ranges.range_size()); + + foreach (const Value::Range& range, ranges.range()) { + result.push_back({range.begin(), range.end()}); + } + + std::sort( + result.begin(), + result.end(), + [](const internal::Range& left, const internal::Range& right) { + return left.start < right.start; + }); + + return result; + }; + + Value::Ranges result; + + vector<internal::Range> left = sortRanges(left_); + vector<internal::Range> right = sortRanges(right_); + + vector<internal::Range>::iterator itLeft = left.begin(); + for (vector<internal::Range>::const_iterator itRight = right.cbegin(); + itLeft != left.end() && itRight != right.cend();) { + // Non-overlap: + // L: |___| + // R: |___| + if (itLeft->end < itRight->start) { + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itLeft->end); + + itLeft++; + continue; + } + + // Non-overlap: + // L: |___| + // R: |___| + if (itLeft->start > itRight->end) { + itRight++; + continue; + } + + if (itLeft->start < itRight->start) { + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itRight->start - 1); + + if (itLeft->end <= itRight->end) { + // L: |_____| + // R: |____| + itLeft++; + } else { + // L: |________| + // R: |___| + itLeft->start = itRight->end + 1; + itRight++; + } + } else { // itLeft->start >= itRight->start + if (itLeft->end <= itRight->end) { + // L: |____| + // R: |________| + itLeft++; + } else { + // L: |_____| + // R: |____| + itLeft->start = itRight->end + 1; + itRight++; + } + } + } + + // Traverse what's left in the `left`, if any. + while (itLeft != left.end()) { + // TODO(mzhu): Consider reserving the exact size. + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itLeft->end); + + itLeft++; + } + + return result; +} + + } // namespace internal { @@ -352,6 +451,7 @@ bool operator<=(const Value::Ranges& _left, const Value::Ranges& _right) } +// TODO(mzhu): Make this consistent with how we do subtraction. Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) { Value::Ranges result; @@ -362,12 +462,11 @@ Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) Value::Ranges operator-(const Value::Ranges& left, const Value::Ranges& right) { - Value::Ranges result; - coalesce(&result, {left}); - return result -= right; + return internal::subtract(left, right); } +// TODO(mzhu): Make this consistent with how we do subtraction. Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right) { coalesce(&left, {right}); @@ -375,15 +474,11 @@ Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right) } -Value::Ranges& operator-=(Value::Ranges& _left, const Value::Ranges& _right) +Value::Ranges& operator-=(Value::Ranges& left, const Value::Ranges& right) { - IntervalSet<uint64_t> left, right; + left = internal::subtract(left, right); - left = rangesToIntervalSet<uint64_t>(_left).get(); - right = rangesToIntervalSet<uint64_t>(_right).get(); - _left = intervalSetToRanges(left - right); - - return _left; + return left; } http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/tests/values_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/values_tests.cpp b/src/tests/values_tests.cpp index 2f5abed..e4fcf98 100644 --- a/src/tests/values_tests.cpp +++ b/src/tests/values_tests.cpp @@ -428,12 +428,24 @@ TEST(ValuesTest, RangesAddition) // Test subtracting two ranges. TEST(ValuesTest, RangesSubtraction) { - // Completely subsumes. - Value::Ranges ranges1 = parse("[3-8]")->ranges(); + // Ranges1 is empty. + Value::Ranges ranges1 = parse("[]")->ranges(); Value::Ranges ranges2 = parse("[1-10]")->ranges(); EXPECT_EQ(parse("[]")->ranges(), ranges1 - ranges2); + // Ranges2 is empty. + ranges1 = parse("[3-8]")->ranges(); + ranges2 = parse("[]")->ranges(); + + EXPECT_EQ(parse("[3-8]")->ranges(), ranges1 - ranges2); + + // Completely subsumes. + ranges1 = parse("[3-8]")->ranges(); + ranges2 = parse("[1-10]")->ranges(); + + EXPECT_EQ(parse("[]")->ranges(), ranges1 - ranges2); + // Subsummed on left. ranges1 = parse("[3-8]")->ranges(); ranges2 = parse("[3-5]")->ranges(); http://git-wip-us.apache.org/repos/asf/mesos/blob/077f122d/src/v1/values.cpp ---------------------------------------------------------------------- diff --git a/src/v1/values.cpp b/src/v1/values.cpp index d2c31f6..1be9945 100644 --- a/src/v1/values.cpp +++ b/src/v1/values.cpp @@ -228,6 +228,105 @@ void coalesce(Value::Ranges* result, vector<Range> ranges) CHECK_EQ(result->range_size(), count); } + +// Subtract `right_` from `left_`, and return the result as Value::Ranges. +Value::Ranges subtract(const Value::Ranges& left_, const Value::Ranges& right_) +{ + if (left_.range_size() == 0 || right_.range_size() == 0) { + return left_; + } + + // Convert the input `Ranges` to `vector<internal::Range>` and + // sort the vector based on the start of a range. + auto sortRanges = [](const Value::Ranges& ranges) { + vector<internal::Range> result; + result.reserve(ranges.range_size()); + + foreach (const Value::Range& range, ranges.range()) { + result.push_back({range.begin(), range.end()}); + } + + std::sort( + result.begin(), + result.end(), + [](const internal::Range& left, const internal::Range& right) { + return left.start < right.start; + }); + + return result; + }; + + Value::Ranges result; + + vector<internal::Range> left = sortRanges(left_); + vector<internal::Range> right = sortRanges(right_); + + vector<internal::Range>::iterator itLeft = left.begin(); + for (vector<internal::Range>::const_iterator itRight = right.cbegin(); + itLeft != left.end() && itRight != right.cend();) { + // Non-overlap: + // L: |___| + // R: |___| + if (itLeft->end < itRight->start) { + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itLeft->end); + + itLeft++; + continue; + } + + // Non-overlap: + // L: |___| + // R: |___| + if (itLeft->start > itRight->end) { + itRight++; + continue; + } + + if (itLeft->start < itRight->start) { + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itRight->start - 1); + + if (itLeft->end <= itRight->end) { + // L: |_____| + // R: |____| + itLeft++; + } else { + // L: |________| + // R: |___| + itLeft->start = itRight->end + 1; + itRight++; + } + } else { // itLeft->start >= itRight->start + if (itLeft->end <= itRight->end) { + // L: |____| + // R: |________| + itLeft++; + } else { + // L: |_____| + // R: |____| + itLeft->start = itRight->end + 1; + itRight++; + } + } + } + + // Traverse what's left in the `left`, if any. + while (itLeft != left.end()) { + // TODO(mzhu): Consider reserving the exact size. + Value::Range* newRange = result.add_range(); + newRange->set_begin(itLeft->start); + newRange->set_end(itLeft->end); + + itLeft++; + } + + return result; +} + + } // namespace internal { @@ -380,6 +479,7 @@ bool operator<=(const Value::Ranges& _left, const Value::Ranges& _right) } +// TODO(mzhu): Make this consistent with how we do subtraction. Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) { Value::Ranges result; @@ -390,12 +490,11 @@ Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) Value::Ranges operator-(const Value::Ranges& left, const Value::Ranges& right) { - Value::Ranges result; - coalesce(&result, {left}); - return result -= right; + return internal::subtract(left, right); } +// TODO(mzhu): Make this consistent with how we do subtraction. Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right) { coalesce(&left, {right}); @@ -403,15 +502,11 @@ Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right) } -Value::Ranges& operator-=(Value::Ranges& _left, const Value::Ranges& _right) +Value::Ranges& operator-=(Value::Ranges& left, const Value::Ranges& right) { - IntervalSet<uint64_t> left, right; + left = internal::subtract(left, right); - left = rangesToIntervalSet(_left); - right = rangesToIntervalSet(_right); - _left = intervalSetToRanges(left - right); - - return _left; + return left; }
