Repository: incubator-beam Updated Branches: refs/heads/python-sdk 8f4551c4e -> 93a95d68b
Generic ordered position range tracker. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4768227c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4768227c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4768227c Branch: refs/heads/python-sdk Commit: 4768227c62ac1c43e64bba604639bc4e80607edb Parents: 8f4551c Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri Nov 4 11:18:31 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Nov 7 17:53:06 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/range_trackers.py | 75 ++++++++++++++++++++ .../apache_beam/io/range_trackers_test.py | 73 +++++++++++++++++++ 2 files changed, 148 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4768227c/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 080e2f3..b42ff1b 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -286,6 +286,81 @@ class GroupedShuffleRangeTracker(iobase.RangeTracker): ' that are interpreted by the service') +class OrderedPositionRangeTracker(iobase.RangeTracker): + """ + An abstract base class for range trackers whose positions are comparable. + + Subclasses only need to implement the mapping from position ranges to and from the + closed interval [0, 1]. + """ + + UNSTARTED = object() + + def __init__(self, start_position=None, stop_position=None): + self._start_position = start_position + self._stop_position = stop_position + self._lock = threading.Lock() + self._last_claim = self.UNSTARTED + + def start_position(self): + return self._start_position + + def stop_position(self): + with self._lock: + return self._end_position + + def try_claim(self, position): + with self._lock: + if self._last_claim is not self.UNSTARTED and position < self._last_claim: + raise ValueError( + "Positions must be claimed in order: " + "claim '%s' attempted after claim '%s'" % ( + position, self._last_claim)) + elif self._start_position is not None and position < self._start_position: + raise ValueError("Claim '%s' is before start '%s'" % ( + position, self._start_position)) + if self._stop_position is None or position < self._stop_position: + self._last_claim = position + return True + + def position_at_fraction(self, fraction): + return self.fraction_to_position( + fraction, self._start_position, self._stop_position) + + def try_split(self, position): + with self._lock: + if ((self._stop_position is not None and position > self._stop_position) + or (self._start_position is not None + and position <= self._start_position)): + raise ValueError("Split at '%s' not in range %s" % ( + position, [self._start_position, self._stop_position])) + if self._last_claim is self.UNSTARTED or self._last_claim < position: + fraction = self.position_to_fraction( + position, start=self._start_position, end=self._stop_position) + self._stop_position = position + return position, fraction + + def fraction_consumed(self): + if self._last_claim is self.UNSTARTED: + return 0 + else: + return self.position_to_fraction( + self._last_claim, self._start_position, self._stop_position) + + def position_to_fraction(self, pos, start, end): + """ + Converts a position `pos` betweeen `start` and `end` (inclusive) to a + fraction between 0 and 1. + """ + raise NotImplementedError + + def fraction_to_position(self, fraction, start, end): + """ + Converts a fraction between 0 and 1 to a position between start and end. + """ + raise NotImplementedError + + class UnsplittableRangeTracker(iobase.RangeTracker): """A RangeTracker that always ignores split requests. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4768227c/sdks/python/apache_beam/io/range_trackers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py index c4c1e28..161103c 100644 --- a/sdks/python/apache_beam/io/range_trackers_test.py +++ b/sdks/python/apache_beam/io/range_trackers_test.py @@ -319,6 +319,79 @@ class GroupedShuffleRangeTrackerTest(unittest.TestCase): self.bytes_to_position([3, 2, 1]))) +class OrderedPositionRangeTrackerTest(unittest.TestCase): + + class DoubleRangeTracker(range_trackers.OrderedPositionRangeTracker): + + @staticmethod + def fraction_to_position(fraction, start, end): + return start + (end - start) * fraction + + @staticmethod + def position_to_fraction(pos, start, end): + return float(pos - start) / (end - start) + + def test_try_claim(self): + tracker = self.DoubleRangeTracker(10, 20) + self.assertTrue(tracker.try_claim(10)) + self.assertTrue(tracker.try_claim(15)) + self.assertFalse(tracker.try_claim(20)) + self.assertFalse(tracker.try_claim(25)) + + def test_fraction_consumed(self): + tracker = self.DoubleRangeTracker(10, 20) + self.assertEqual(0, tracker.fraction_consumed()) + tracker.try_claim(10) + self.assertEqual(0, tracker.fraction_consumed()) + tracker.try_claim(15) + self.assertEqual(.5, tracker.fraction_consumed()) + tracker.try_claim(17) + self.assertEqual(.7, tracker.fraction_consumed()) + tracker.try_claim(25) + self.assertEqual(.7, tracker.fraction_consumed()) + + def test_try_split(self): + tracker = self.DoubleRangeTracker(10, 20) + tracker.try_claim(15) + self.assertEqual(.5, tracker.fraction_consumed()) + # Split at 18. + self.assertEqual((18, 0.8), tracker.try_split(18)) + # Fraction consumed reflects smaller range. + self.assertEqual(.625, tracker.fraction_consumed()) + # We can claim anything less than 18, + self.assertTrue(tracker.try_claim(17)) + # but can't split before claimed 17, + self.assertIsNone(tracker.try_split(16)) + # nor claim anything after 18. + self.assertFalse(tracker.try_claim(19)) + + def test_claim_order(self): + tracker = self.DoubleRangeTracker(10, 20) + tracker.try_claim(12) + tracker.try_claim(15) + with self.assertRaises(ValueError): + tracker.try_claim(13) + + def test_out_of_range(self): + tracker = self.DoubleRangeTracker(10, 20) + # Can't claim before range. + with self.assertRaises(ValueError): + tracker.try_claim(-5) + # Can't split before range. + with self.assertRaises(ValueError): + tracker.try_split(-5) + # Can't split at start position. + with self.assertRaises(ValueError): + tracker.try_split(10) + # Can't split after range. + with self.assertRaises(ValueError): + tracker.try_split(25) + tracker.try_split(15) + # Can't split after modified range. + with self.assertRaises(ValueError): + tracker.try_split(17) + + class UnsplittableRangeTrackerTest(unittest.TestCase): def test_try_claim(self):