Repository: mesos Updated Branches: refs/heads/master 5766058e7 -> 4f9dd54ca
Refactored coalesce() for Value::Ranges. The goal of this refactoring was to reuse the Ranges objects as much as possible, as prior there was substantial time spend in allocation/destruction (MESOS-3051). Review: https://reviews.apache.org/r/38158 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4f9dd54c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4f9dd54c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4f9dd54c Branch: refs/heads/master Commit: 4f9dd54cad125f9059cb0646d29f3e8ea680128f Parents: 5766058 Author: Joerg Schad <[email protected]> Authored: Wed Sep 23 09:42:16 2015 -0700 Committer: Michael Park <[email protected]> Committed: Wed Sep 23 10:38:56 2015 -0700 ---------------------------------------------------------------------- src/common/values.cpp | 469 +++++++++++++++++++++---------------- src/tests/resources_tests.cpp | 2 +- src/tests/values_tests.cpp | 312 +++++++++++++++++++++++- 3 files changed, 573 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4f9dd54c/src/common/values.cpp ---------------------------------------------------------------------- diff --git a/src/common/values.cpp b/src/common/values.cpp index 750264e..d32e0e6 100644 --- a/src/common/values.cpp +++ b/src/common/values.cpp @@ -18,7 +18,12 @@ #include <stdint.h> -#include <iostream> +#include <algorithm> +#include <initializer_list> +#include <ostream> +#include <string> +#include <tuple> +#include <utility> #include <vector> #include <glog/logging.h> @@ -40,98 +45,6 @@ using std::vector; namespace mesos { -namespace internal { -namespace values { - -Try<Value> parse(const std::string& text) -{ - Value value; - - // Remove any spaces from the text. - string temp; - foreach (const char c, text) { - if (c != ' ') { - temp += c; - } - } - - if (temp.length() == 0) { - return Error("Expecting non-empty string"); - } - - // TODO(ynie): Find a better way to check brackets. - if (!strings::checkBracketsMatching(temp, '{', '}') || - !strings::checkBracketsMatching(temp, '[', ']') || - !strings::checkBracketsMatching(temp, '(', ')')) { - return Error("Mismatched brackets"); - } - - size_t index = temp.find('['); - if (index == 0) { - // This is a ranges. - Value::Ranges ranges; - const vector<string>& tokens = strings::tokenize(temp, "[]-,\n"); - if (tokens.size() % 2 != 0) { - return Error("Expecting one or more \"ranges\""); - } else { - for (size_t i = 0; i < tokens.size(); i += 2) { - Value::Range *range = ranges.add_range(); - - int j = i; - try { - range->set_begin(boost::lexical_cast<uint64_t>((tokens[j++]))); - range->set_end(boost::lexical_cast<uint64_t>(tokens[j++])); - } catch (const boost::bad_lexical_cast&) { - return Error( - "Expecting non-negative integers in '" + tokens[j - 1] + "'"); - } - } - - value.set_type(Value::RANGES); - value.mutable_ranges()->MergeFrom(ranges); - return value; - } - } else if (index == string::npos) { - size_t index = temp.find('{'); - if (index == 0) { - // This is a set. - Value::Set set; - const vector<string>& tokens = strings::tokenize(temp, "{},\n"); - for (size_t i = 0; i < tokens.size(); i++) { - set.add_item(tokens[i]); - } - - value.set_type(Value::SET); - value.mutable_set()->MergeFrom(set); - return value; - } else if (index == string::npos) { - try { - Value::Scalar scalar; - scalar.set_value(boost::lexical_cast<double>(temp)); - // This is a Scalar. - value.set_type(Value::SCALAR); - value.mutable_scalar()->MergeFrom(scalar); - return value; - } catch (const boost::bad_lexical_cast&) { - // This is a Text. - Value::Text text; - text.set_value(temp); - value.set_type(Value::TEXT); - value.mutable_text()->MergeFrom(text); - return value; - } - } else { - return Error("Unexpected '{' found"); - } - } - - return Error("Unexpected '[' found"); -} - -} // namespace values { -} // namespace internal { - - ostream& operator<<(ostream& stream, const Value::Scalar& scalar) { return stream << scalar.value(); @@ -179,115 +92,190 @@ Value::Scalar& operator-=(Value::Scalar& left, const Value::Scalar& right) return left; } -namespace ranges { -static void add(Value::Ranges* result, int64_t begin, int64_t end) +namespace internal { + +struct Range +{ + uint64_t start; + uint64_t end; +}; + + +// Coalesces the vector of ranges provided and modifies `result` to contain the +// solution. +// The algorithm first sorts all the individual intervals so that we can iterate +// over them sequentially. +// The algorithm does a single pass, after the sort, and builds up the solution +// in place. It then modifies the `result` with as few steps as possible. The +// expensive part of this operation is modification of the protobuf, which is +// why we prefer to build up the solution in a temporary vector. +void coalesce(Value::Ranges* result, vector<Range> ranges) { - if (begin > end) { + // Exit early if empty. + if (ranges.empty()) { + result->clear_range(); return; } - Value::Range* range = result->add_range(); - range->set_begin(begin); - range->set_end(end); + + std::sort( + ranges.begin(), + ranges.end(), + [](const Range& left, const Range& right) { + return std::tie(left.start, left.end) < + std::tie(right.start, right.end); + }); + + // We build up initial state of the current range. + CHECK(!ranges.empty()); + int count = 1; + Range current = ranges.front(); + + // In a single pass, we compute the size of the end result, as well as modify + // in place the intermediate data structure to build up result as we + // solve it. + foreach (const Range& range, ranges) { + // Skip if this range is equivalent to the current range. + if (range.start == current.start && range.end == current.end) { + continue; + } + + // If the current range just needs to be extended on the right. + if (range.start == current.start && range.end > current.end) { + current.end = range.end; + } else if (range.start > current.start) { + // If we are starting farther ahead, then there are 2 cases: + if (range.start <= current.end + 1) { + // 1. Ranges are overlapping and we can merge them. + current.end = max(current.end, range.end); + } else { + // 2. No overlap and we are adding a new range. + ranges[count - 1] = current; + ++count; + current = range; + } + } + } + + // Record the state of the last range into of ranges vector. + ranges[count - 1] = current; + + CHECK(count <= static_cast<int>(ranges.size())); + + // Shrink result if it is too large by deleting trailing subrange. + if (count < result->range_size()) { + result->mutable_range()->DeleteSubrange( + count, result->range_size() - count); + } + + // Resize enough space so we allocate the pointer array just once. + result->mutable_range()->Reserve(count); + + // Copy the solution from ranges vector into result. + for (int i = 0; i < count; ++i) { + // result might be small and needs to be extended. + if (i >= result->range_size()) { + result->add_range(); + } + + CHECK(i < result->range_size()); + result->mutable_range(i)->set_begin(ranges[i].start); + result->mutable_range(i)->set_end(ranges[i].end); + } + + CHECK_EQ(result->range_size(), count); } -} // namespace ranges { +} // namespace internal { -// Coalesce the given 'range' into already coalesced 'ranges'. -static void coalesce(Value::Ranges* ranges, const Value::Range& range) +// Coalesce the given addedRanges into 'result' ranges. +void coalesce( + Value::Ranges* result, + std::initializer_list<Value::Ranges> addedRanges) { - // Note that we assume that ranges has already been coalesced. + size_t rangesSum = result->range_size(); + foreach (const Value::Ranges& range, addedRanges) { + rangesSum += range.range_size(); + } - Value::Ranges result; - Value::Range temp = range; - - for (int i = 0; i < ranges->range_size(); i++) { - const Value::Range& current = ranges->range(i); - - // Check if current and range overlap. Note, we only need to - // compare with range and not with temp to check for overlap - // because we expect ranges to be coalesced to begin with! - if (current.begin() <= range.end() + 1 && - current.end() >= range.begin() - 1) { - // current: | | - // range: | | - // range: | | - // Update temp with new boundaries. - temp.set_begin(min(temp.begin(), min(range.begin(), current.begin()))); - temp.set_end(max(temp.end(), max(range.end(), current.end()))); - } else { // No overlap. - result.add_range()->MergeFrom(current); + vector<internal::Range> ranges; + ranges.reserve(rangesSum); + + // Merges ranges into a vector. + auto fill = [&ranges](const Value::Ranges& inputs) { + foreach (const Value::Range& range, inputs.range()) { + ranges.push_back({range.begin(), range.end()}); } + }; + + // Merge both ranges into the vector; + fill(*result); + foreach (const Value::Ranges& range, addedRanges) { + fill(range); } - result.add_range()->MergeFrom(temp); - *ranges = result; + internal::coalesce(result, std::move(ranges)); } -// Coalesce the given un-coalesced 'uranges' into already coalesced 'ranges'. -static void coalesce(Value::Ranges* ranges, const Value::Ranges& uranges) +// Coalesce the given Value::Ranges 'ranges'. +void coalesce(Value::Ranges* result) { - // Note that we assume that ranges has already been coalesced. + coalesce(result, {Value::Ranges()}); +} - for (int i = 0; i < uranges.range_size(); i++) { - coalesce(ranges, uranges.range(i)); - } + +// Coalesce the given range 'addedRange' into 'result' ranges. +void coalesce(Value::Ranges* result, const Value::Range& addedRange) +{ + Value::Ranges ranges; + Value::Range* range = ranges.add_range(); + range->CopyFrom(addedRange); + coalesce(result, {ranges}); } -static void remove(Value::Ranges* ranges, const Value::Range& range) +// Removes a range from already coalesced ranges. +// The algorithms constructs a new vector of ranges which is then +// coalesced into a Value::Ranges instance. +static void remove(Value::Ranges* _ranges, const Value::Range& removal) { - // Note that we assume that ranges has already been coalesced. + vector<internal::Range> ranges; + ranges.reserve(_ranges->range_size()); - Value::Ranges result; + foreach (const Value::Range& range, _ranges->range()) { + // Skip if the entire range is subsumed by `removal`. + if (range.begin() >= removal.begin() && range.end() <= removal.end()) { + continue; + } - for (int i = 0; i < ranges->range_size(); i++) { - const Value::Range& current = ranges->range(i); - - // Note that these if/else if conditionals are in a particular - // order. In particular, the last two assume that the "subsumes" - // checks have already occurred. - if (range.begin() <= current.begin() && range.end() >= current.end()) { - // Range subsumes current. - // current: | | - // range: | | - // range: | | - // range: | | - // range: | | - } else if (range.begin() >= current.begin() && - range.end() <= current.end()) { - // Range is subsumed by current. - // current: | | - // range: | | - // range: | | - // range: | | - ranges::add(&result, current.begin(), range.begin() - 1); - ranges::add(&result, range.end() + 1, current.end()); - } else if (range.begin() <= current.begin() && - range.end() >= current.begin()) { - // Range overlaps to the left. - // current: | | - // range: | | - // range: | | - ranges::add(&result, range.end() + 1, current.end()); - } else if (range.begin() <= current.end() && range.end() >= current.end()) { - // Range overlaps to the right. - // current: | | - // range: | | - // range: | | - ranges::add(&result, current.begin(), range.begin() - 1); + // Divide if the range subsumes the `removal`. + if (range.begin() < removal.begin() && range.end() > removal.end()) { + // Front. + ranges.emplace_back(internal::Range{range.begin(), removal.begin() - 1}); + // Back. + ranges.emplace_back(internal::Range{removal.end() + 1, range.end()}); + } + + // Fully Emplace if the range doesn't intersect. + if (range.end() < removal.begin() || range.begin() > removal.end()) { + ranges.emplace_back(internal::Range{range.begin(), range.end()}); } else { - // Range doesn't overlap current. - // current: | | - // range: | | - // range: | | - ranges::add(&result, current.begin(), current.end()); + // Trim if the range does intersect. + if (range.end() > removal.end()) { + // Trim front. + ranges.emplace_back(internal::Range{removal.end() + 1, range.end()}); + } else { + // Trim back. + CHECK(range.begin() < removal.begin()); + ranges.emplace_back( + internal::Range{range.begin(), removal.begin() - 1}); + } } } - *ranges = result; + internal::coalesce(_ranges, std::move(ranges)); } @@ -308,10 +296,10 @@ ostream& operator<<(ostream& stream, const Value::Ranges& ranges) bool operator==(const Value::Ranges& _left, const Value::Ranges& _right) { Value::Ranges left; - coalesce(&left, _left); + coalesce(&left, {_left}); Value::Ranges right; - coalesce(&right, _right); + coalesce(&right, {_right}); if (left.range_size() == right.range_size()) { for (int i = 0; i < left.range_size(); i++) { @@ -340,10 +328,10 @@ bool operator==(const Value::Ranges& _left, const Value::Ranges& _right) bool operator<=(const Value::Ranges& _left, const Value::Ranges& _right) { Value::Ranges left; - coalesce(&left, _left); + coalesce(&left, {_left}); Value::Ranges right; - coalesce(&right, _right); + coalesce(&right, {_right}); for (int i = 0; i < left.range_size(); i++) { // Make sure this range is a subset of a range in right. @@ -367,10 +355,7 @@ bool operator<=(const Value::Ranges& _left, const Value::Ranges& _right) Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) { Value::Ranges result; - - coalesce(&result, left); - coalesce(&result, right); - + coalesce(&result, {left, right}); return result; } @@ -378,45 +363,24 @@ Value::Ranges operator+(const Value::Ranges& left, const Value::Ranges& right) Value::Ranges operator-(const Value::Ranges& left, const Value::Ranges& right) { Value::Ranges result; - - coalesce(&result, left); - coalesce(&result, right); - - for (int i = 0; i < right.range_size(); i++) { - remove(&result, right.range(i)); - } - - return result; + coalesce(&result, {left}); + return result -= right; } Value::Ranges& operator+=(Value::Ranges& left, const Value::Ranges& right) { - Value::Ranges temp; - - coalesce(&temp, left); - - left = temp; - - coalesce(&left, right); - + coalesce(&left, {right}); return left; } Value::Ranges& operator-=(Value::Ranges& left, const Value::Ranges& right) { - Value::Ranges temp; - - coalesce(&temp, left); - coalesce(&temp, right); - - left = temp; - - for (int i = 0; i < right.range_size(); i++) { + coalesce(&left); + for (int i = 0; i < right.range_size(); ++i) { remove(&left, right.range(i)); } - return left; } @@ -583,4 +547,93 @@ bool operator==(const Value::Text& left, const Value::Text& right) return left.value() == right.value(); } + +namespace internal { +namespace values { + +Try<Value> parse(const string& text) +{ + Value value; + + // Remove any spaces from the text. + string temp; + foreach (const char c, text) { + if (c != ' ') { + temp += c; + } + } + + if (temp.length() == 0) { + return Error("Expecting non-empty string"); + } + + // TODO(ynie): Find a better way to check brackets. + if (!strings::checkBracketsMatching(temp, '{', '}') || + !strings::checkBracketsMatching(temp, '[', ']') || + !strings::checkBracketsMatching(temp, '(', ')')) { + return Error("Mismatched brackets"); + } + + size_t index = temp.find('['); + if (index == 0) { + // This is a Value::Ranges. + value.set_type(Value::RANGES); + Value::Ranges* ranges = value.mutable_ranges(); + const vector<string>& tokens = strings::tokenize(temp, "[]-,\n"); + if (tokens.size() % 2 != 0) { + return Error("Expecting one or more \"ranges\""); + } else { + for (size_t i = 0; i < tokens.size(); i += 2) { + Value::Range* range = ranges->add_range(); + + int j = i; + try { + range->set_begin(boost::lexical_cast<uint64_t>((tokens[j++]))); + range->set_end(boost::lexical_cast<uint64_t>(tokens[j++])); + } catch (const boost::bad_lexical_cast&) { + return Error( + "Expecting non-negative integers in '" + tokens[j - 1] + "'"); + } + } + + coalesce(ranges); + + return value; + } + } else if (index == string::npos) { + size_t index = temp.find('{'); + if (index == 0) { + // This is a set. + value.set_type(Value::SET); + Value::Set* set = value.mutable_set(); + const vector<string>& tokens = strings::tokenize(temp, "{},\n"); + for (size_t i = 0; i < tokens.size(); i++) { + set->add_item(tokens[i]); + } + return value; + } else if (index == string::npos) { + try { + // This is a scalar. + value.set_type(Value::SCALAR); + Value::Scalar* scalar = value.mutable_scalar(); + scalar->set_value(boost::lexical_cast<double>(temp)); + return value; + } catch (const boost::bad_lexical_cast&) { + // This is a text. + value.set_type(Value::TEXT); + Value::Text* text = value.mutable_text(); + text->set_value(temp); + return value; + } + } else { + return Error("Unexpected '{' found"); + } + } + + return Error("Unexpected '[' found"); +} + +} // namespace values { +} // namespace internal { + } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/4f9dd54c/src/tests/resources_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resources_tests.cpp b/src/tests/resources_tests.cpp index 0318885..6584fc6 100644 --- a/src/tests/resources_tests.cpp +++ b/src/tests/resources_tests.cpp @@ -502,7 +502,7 @@ TEST(ResourcesTest, RangesSubset) EXPECT_EQ(1, ports2.ranges().range_size()); EXPECT_EQ(1, ports3.ranges().range_size()); EXPECT_EQ(2, ports4.ranges().range_size()); - EXPECT_EQ(2, ports5.ranges().range_size()); + EXPECT_EQ(1, ports5.ranges().range_size()); Resources r1; r1 += ports1; http://git-wip-us.apache.org/repos/asf/mesos/blob/4f9dd54c/src/tests/values_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/values_tests.cpp b/src/tests/values_tests.cpp index fc35d97..b8302f5 100644 --- a/src/tests/values_tests.cpp +++ b/src/tests/values_tests.cpp @@ -33,10 +33,14 @@ using namespace mesos::internal::values; using std::string; namespace mesos { + extern void coalesce(Value::Ranges* ranges); + extern void coalesce(Value::Ranges* ranges, const Value::Range& range); +} // namespace mesos { + +namespace mesos { namespace internal { namespace tests { - TEST(ValuesTest, ValidInput) { // Test parsing scalar type. @@ -102,6 +106,312 @@ TEST(ValuesTest, SetSubtraction) EXPECT_EQ(set3, parse("{sda4}").get().set()); } + +// Test a simple range parse. +TEST(ValuesTest, RangesParse) +{ + Value::Ranges ranges; + + Value::Range* range = ranges.add_range(); + range->set_begin(1); + range->set_end(10); + + Value::Ranges parsed = + parse("[1-10]").get().ranges(); + + EXPECT_EQ(ranges, parsed); +} + + +// Unit test coalescing given ranges. +TEST(ValuesTest, RangesCoalesce) +{ + // Multiple overlap with explicit coalesce. + // Explicitly construct [1-4, 3-5, 7-8, 8-10]. + Value::Ranges ranges; + Value::Range* range = ranges.add_range(); + range->set_begin(1); + range->set_end(4); + range = ranges.add_range(); + range->set_begin(3); + range->set_end(5); + range = ranges.add_range(); + range->set_begin(7); + range->set_end(8); + range = ranges.add_range(); + range->set_begin(8); + range->set_end(10); + + mesos::coalesce(&ranges); + + // Should be [1-5, 7-10]. + ASSERT_EQ(2, ranges.range_size()); + EXPECT_EQ(1, ranges.range(0).begin()); + EXPECT_EQ(5, ranges.range(0).end()); + EXPECT_EQ(7, ranges.range(1).begin()); + EXPECT_EQ(10, ranges.range(1).end()); +} + + +// Test coalescing given ranges via parse. +// Note: As the == operator triggers coalesce as well, we should +// check ranges_size() explicitly. +TEST(ValuesTest, RangesCoalesceParse) +{ + // Test multiple ranges against explicitly constructed. + Value::Ranges ranges; + + Value::Range* range = ranges.add_range(); + range->set_begin(1); + range->set_end(10); + Value::Ranges parsed = + parse("[4-6, 6-8, 5-9, 3-4, 2-5, 4-6, 1-1, 10-10, 4-6]").get().ranges(); + + EXPECT_EQ(ranges, parsed); + EXPECT_EQ(1, parsed.range_size()); + + // Simple overlap. + parsed = parse("[4-6, 6-8]").get().ranges(); + Value::Ranges expected = parse("[4-8]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Simple overlap unordered. + parsed = parse("[6-8, 4-6]").get().ranges(); + expected = parse("[4-8]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Subsumed range. + parsed = parse("[1-10, 8-10]").get().ranges(); + expected = parse("[1-10]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Completely subsumed. + parsed = parse("[1-11, 8-10]").get().ranges(); + expected = parse("[1-11]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Multiple overlapping ranges. + parsed = parse("[1-4, 4-5, 7-8, 8-10]").get().ranges(); + expected = parse("[1-5, 7-10]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(2, parsed.range_size()); + + // Multiple overlap mixed. + parsed = parse("[7-8, 1-4, [8-10], 4-5]").get().ranges(); + expected = parse("[1-5, 7-10]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(2, parsed.range_size()); + + // Simple neighboring with overlap. + parsed = parse("[4-6, 7-8]").get().ranges(); + expected = parse("[4-8]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Single neighbouring. + parsed = parse("[4-6, 7-7]").get().ranges(); + expected = parse("[4-7]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Multiple ranges coalescing into a single one. + parsed = parse("[4-6, 7-7, 8-8, 9-9]").get().ranges(); + expected = parse("[4-9]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); + + // Multiple ranges coalescing into multiple ranges. + parsed = parse("[4-6, 7-7, 9-10, 9-11]").get().ranges(); + expected = parse("[4-7, 9-11]").get().ranges(); + + EXPECT_EQ(parsed, expected); + EXPECT_EQ(2, parsed.range_size()); + + // Multiple duplicates. + parsed = parse("[6-8, 6-8, 4-6, 6-8]").get().ranges(); + expected = parse("[4-8]").get().ranges(); + EXPECT_EQ(parsed, expected); + EXPECT_EQ(1, parsed.range_size()); +} + + +// Test adding a new range to an existing coalesced range. +// Note: As the == operator triggers coalesce as well, we should +// check ranges_size() explicitly. +TEST(ValuesTest, AddRangeCoalesce) +{ + // Multiple overlap with explicit coalesce. + // Explicitly construct [1-4, 8-10]. + Value::Ranges ranges; + Value::Range* range = ranges.add_range(); + range->set_begin(1); + range->set_end(4); + range = ranges.add_range(); + range->set_begin(8); + range->set_end(10); + + // Range [4-8]. + Value::Range range2; + range2.set_begin(4); + range2.set_end(8); + + mesos::coalesce(&ranges, range2); + + // Should be coalesced to [1-10]. + ASSERT_EQ(1, ranges.range_size()); + EXPECT_EQ(1, ranges.range(0).begin()); + EXPECT_EQ(10, ranges.range(0).end()); + + // Multiple neighboring with explicit coalesce. + // Range [5-7]. + range2.set_begin(5); + range2.set_end(7); + + mesos::coalesce(&ranges, range2); + + // Should be coalesced to [1-10]. + ASSERT_EQ(1, ranges.range_size()); + EXPECT_EQ(1, ranges.range(0).begin()); + EXPECT_EQ(10, ranges.range(0).end()); + + // Completely subsumed. + // Range [5-7] (as before). + range2.set_begin(5); + range2.set_end(7); + + mesos::coalesce(&ranges, range2); + + // Should be coalesced to [1-10]. + ASSERT_EQ(1, ranges.range_size()); + EXPECT_EQ(1, ranges.range(0).begin()); + EXPECT_EQ(10, ranges.range(0).end()); + + // None overlapping. + // Range [5-7] (as before). + range2.set_begin(20); + range2.set_end(21); + + mesos::coalesce(&ranges, range2); + + // Should be coalesced to [1-10, 20-21]. + ASSERT_EQ(2, ranges.range_size()); + EXPECT_EQ(1, ranges.range(0).begin()); + EXPECT_EQ(10, ranges.range(0).end()); + EXPECT_EQ(20, ranges.range(1).begin()); + EXPECT_EQ(21, ranges.range(1).end()); +} + +// Test adding two ranges. +TEST(ValuesTest, RangesAddition) +{ + // Overlaps on right. + Value::Ranges ranges1 = parse("[3-8]").get().ranges(); + Value::Ranges ranges2 = parse("[4-10]").get().ranges(); + + EXPECT_EQ(parse("[3-10]").get().ranges(), ranges1 + ranges2); + + // Overlapps on right. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[1-4]").get().ranges(); + + EXPECT_EQ(parse("[1-8]").get().ranges(), ranges1 + ranges2); + + // Completely subsumed. + ranges1 = parse("[2-3]").get().ranges(); + ranges2 = parse("[1-4]").get().ranges(); + + EXPECT_EQ(parse("[1-4]").get().ranges(), ranges1 + ranges2); + + // Neighbouring right. + ranges1 = parse("[2-3]").get().ranges(); + ranges2 = parse("[4-6]").get().ranges(); + + EXPECT_EQ(parse("[2-6]").get().ranges(), ranges1 + ranges2); + + // Neighbouring left. + ranges1 = parse("[3-5]").get().ranges(); + ranges2 = parse("[1-2]").get().ranges(); + + EXPECT_EQ(parse("[1-5]").get().ranges(), ranges1 + ranges2); + + // Fills gap. + ranges1 = parse("[3-5, 7-8]").get().ranges(); + ranges2 = parse("[6-6]").get().ranges(); + + EXPECT_EQ(parse("[3-8]").get().ranges(), ranges1 + ranges2); + + // Fills double gap. + ranges1 = parse("[1-4, 9-10, 20-22, 26-30]").get().ranges(); + ranges2 = parse("[5-8, 23-25]").get().ranges(); + + EXPECT_EQ(parse("[1-10, 20-30]").get().ranges(), ranges1 + ranges2); +} + +// Test subtracting two ranges. +TEST(ValuesTest, RangesSubtraction) +{ + // Completely subsumes. + Value::Ranges ranges1 = parse("[3-8]").get().ranges(); + Value::Ranges ranges2 = parse("[1-10]").get().ranges(); + + EXPECT_EQ(parse("[]").get().ranges(), ranges1 - ranges2); + + // Subsummed on left. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[3-5]").get().ranges(); + + EXPECT_EQ(parse("[6-8]").get().ranges(), ranges1 - ranges2); + + // Subsummed on right. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[5-8]").get().ranges(); + + EXPECT_EQ(parse("[3-4]").get().ranges(), ranges1 - ranges2); + + // Subsummed in the middle. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[5-6]").get().ranges(); + + EXPECT_EQ(parse("[3-4, 7-8]").get().ranges(), ranges1 - ranges2); + + // Overlaps to the left. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[1-3]").get().ranges(); + + EXPECT_EQ(parse("[4-8]").get().ranges(), ranges1 - ranges2); + + // Overlaps to the right. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[5-10]").get().ranges(); + + EXPECT_EQ(parse("[3-4]").get().ranges(), ranges1 - ranges2); + + // Doesn't overlap right. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[9-10]").get().ranges(); + + EXPECT_EQ(parse("[3-8]").get().ranges(), ranges1 - ranges2); + + // Doesn't overlap left. + ranges1 = parse("[3-8]").get().ranges(); + ranges2 = parse("[1-2]").get().ranges(); + + EXPECT_EQ(parse("[3-8]").get().ranges(), ranges1 - ranges2); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
