Repository: beam Updated Branches: refs/heads/master f0467b72f -> 3746d4cad
Remove GroupedShuffleRangeTracker which is unused in the SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6d0d798 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6d0d798 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6d0d798 Branch: refs/heads/master Commit: c6d0d7983b19ce2e01b7b06a12f704fef17a00cc Parents: f0467b7 Author: chamik...@google.com <chamik...@google.com> Authored: Wed Jun 21 10:37:11 2017 -0700 Committer: chamik...@google.com <chamik...@google.com> Committed: Wed Jun 21 14:05:17 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/range_trackers.py | 130 ------------- .../apache_beam/io/range_trackers_test.py | 186 ------------------- 2 files changed, 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c6d0d798/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 9cb36e7..bef77d4 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -193,136 +193,6 @@ class OffsetRangeTracker(iobase.RangeTracker): self._split_points_unclaimed_callback = callback -class GroupedShuffleRangeTracker(iobase.RangeTracker): - """For internal use only; no backwards-compatibility guarantees. - - A 'RangeTracker' for positions used by'GroupedShuffleReader'. - - These positions roughly correspond to hashes of keys. In case of hash - collisions, multiple groups can have the same position. In that case, the - first group at a particular position is considered a split point (because - it is the first to be returned when reading a position range starting at this - position), others are not. - """ - - def __init__(self, decoded_start_pos, decoded_stop_pos): - super(GroupedShuffleRangeTracker, self).__init__() - self._decoded_start_pos = decoded_start_pos - self._decoded_stop_pos = decoded_stop_pos - self._decoded_last_group_start = None - self._last_group_was_at_a_split_point = False - self._split_points_seen = 0 - self._lock = threading.Lock() - - def start_position(self): - return self._decoded_start_pos - - def stop_position(self): - return self._decoded_stop_pos - - def last_group_start(self): - return self._decoded_last_group_start - - def _validate_decoded_group_start(self, decoded_group_start, split_point): - if self.start_position() and decoded_group_start < self.start_position(): - raise ValueError('Trying to return record at %r which is before the' - ' starting position at %r' % - (decoded_group_start, self.start_position())) - - if (self.last_group_start() and - decoded_group_start < self.last_group_start()): - raise ValueError('Trying to return group at %r which is before the' - ' last-returned group at %r' % - (decoded_group_start, self.last_group_start())) - if (split_point and self.last_group_start() and - self.last_group_start() == decoded_group_start): - raise ValueError('Trying to return a group at a split point with ' - 'same position as the previous group: both at %r, ' - 'last group was %sat a split point.' % - (decoded_group_start, - ('' if self._last_group_was_at_a_split_point - else 'not '))) - if not split_point: - if self.last_group_start() is None: - raise ValueError('The first group [at %r] must be at a split point' % - decoded_group_start) - if self.last_group_start() != decoded_group_start: - # This case is not a violation of general RangeTracker semantics, but it - # is contrary to how GroupingShuffleReader in particular works. Hitting - # it would mean it's behaving unexpectedly. - raise ValueError('Trying to return a group not at a split point, but ' - 'with a different position than the previous group: ' - 'last group was %r at %r, current at a %s split' - ' point.' % - (self.last_group_start() - , decoded_group_start - , ('' if self._last_group_was_at_a_split_point - else 'non-'))) - - def try_claim(self, decoded_group_start): - with self._lock: - self._validate_decoded_group_start(decoded_group_start, True) - if (self.stop_position() - and decoded_group_start >= self.stop_position()): - return False - - self._decoded_last_group_start = decoded_group_start - self._last_group_was_at_a_split_point = True - self._split_points_seen += 1 - return True - - def set_current_position(self, decoded_group_start): - with self._lock: - self._validate_decoded_group_start(decoded_group_start, False) - self._decoded_last_group_start = decoded_group_start - self._last_group_was_at_a_split_point = False - - def try_split(self, decoded_split_position): - with self._lock: - if self.last_group_start() is None: - logging.info('Refusing to split %r at %r: unstarted' - , self, decoded_split_position) - return - - if decoded_split_position <= self.last_group_start(): - logging.info('Refusing to split %r at %r: already past proposed split ' - 'position' - , self, decoded_split_position) - return - - if ((self.stop_position() - and decoded_split_position >= self.stop_position()) - or (self.start_position() - and decoded_split_position <= self.start_position())): - logging.error('Refusing to split %r at %r: proposed split position out ' - 'of range', self, decoded_split_position) - return - - logging.debug('Agreeing to split %r at %r' - , self, decoded_split_position) - self._decoded_stop_pos = decoded_split_position - - # Since GroupedShuffleRangeTracker cannot determine relative sizes of the - # two splits, returning 0.5 as the fraction below so that the framework - # assumes the splits to be of the same size. - return self._decoded_stop_pos, 0.5 - - def fraction_consumed(self): - # GroupingShuffle sources have special support on the service and the - # service will estimate progress from positions for us. - raise RuntimeError('GroupedShuffleRangeTracker does not measure fraction' - ' consumed due to positions being opaque strings' - ' that are interpreted by the service') - - def split_points(self): - with self._lock: - splits_points_consumed = ( - 0 if self._split_points_seen <= 1 else (self._split_points_seen - 1)) - - return (splits_points_consumed, - iobase.RangeTracker.SPLIT_POINTS_UNKNOWN) - - class OrderedPositionRangeTracker(iobase.RangeTracker): """ An abstract base class for range trackers whose positions are comparable. http://git-wip-us.apache.org/repos/asf/beam/blob/c6d0d798/sdks/python/apache_beam/io/range_trackers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py index edb6386..3e92663 100644 --- a/sdks/python/apache_beam/io/range_trackers_test.py +++ b/sdks/python/apache_beam/io/range_trackers_test.py @@ -17,14 +17,11 @@ """Unit tests for the range_trackers module.""" -import array import copy import logging import math import unittest - -from apache_beam.io import iobase from apache_beam.io import range_trackers @@ -189,189 +186,6 @@ class OffsetRangeTrackerTest(unittest.TestCase): (3, 41)) -class GroupedShuffleRangeTrackerTest(unittest.TestCase): - - def bytes_to_position(self, bytes_array): - return array.array('B', bytes_array).tostring() - - def test_try_return_record_in_infinite_range(self): - tracker = range_trackers.GroupedShuffleRangeTracker('', '') - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 3]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 5]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 6, 8, 10]))) - - def test_try_return_record_finite_range(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0])) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 3]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 5]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 6, 8, 10]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([4, 255, 255, 255]))) - # Should fail for positions that are lexicographically equal to or larger - # than the defined stop position. - self.assertFalse(copy.copy(tracker).try_claim( - self.bytes_to_position([5, 0, 0]))) - self.assertFalse(copy.copy(tracker).try_claim( - self.bytes_to_position([5, 0, 1]))) - self.assertFalse(copy.copy(tracker).try_claim( - self.bytes_to_position([6, 0, 0]))) - - def test_try_return_record_with_non_split_point(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0])) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 3]))) - tracker.set_current_position(self.bytes_to_position([1, 2, 3])) - tracker.set_current_position(self.bytes_to_position([1, 2, 3])) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 5]))) - tracker.set_current_position(self.bytes_to_position([1, 2, 5])) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 6, 8, 10]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([4, 255, 255, 255]))) - - def test_first_record_non_split_point(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0])) - with self.assertRaises(ValueError): - tracker.set_current_position(self.bytes_to_position([3, 4, 5])) - - def test_non_split_point_record_with_different_position(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0])) - self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5]))) - with self.assertRaises(ValueError): - tracker.set_current_position(self.bytes_to_position([3, 4, 6])) - - def test_try_return_record_before_start(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0])) - with self.assertRaises(ValueError): - tracker.try_claim(self.bytes_to_position([1, 2, 3])) - - def test_try_return_non_monotonic(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0])) - self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5]))) - self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 6]))) - with self.assertRaises(ValueError): - tracker.try_claim(self.bytes_to_position([3, 2, 1])) - - def test_try_return_identical_positions(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0])) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 4, 5]))) - with self.assertRaises(ValueError): - tracker.try_claim(self.bytes_to_position([3, 4, 5])) - - def test_try_split_at_position_infinite_range(self): - tracker = range_trackers.GroupedShuffleRangeTracker('', '') - # Should fail before first record is returned. - self.assertFalse(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 3]))) - - # Should now succeed. - self.assertIsNotNone(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - # Should not split at same or larger position. - self.assertIsNone(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - self.assertIsNone(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6, 7]))) - self.assertIsNone(tracker.try_split( - self.bytes_to_position([4, 5, 6, 7]))) - - # Should split at smaller position. - self.assertIsNotNone(tracker.try_split( - self.bytes_to_position([3, 2, 1]))) - - self.assertTrue(tracker.try_claim( - self.bytes_to_position([2, 3, 4]))) - - # Should not split at a position we're already past. - self.assertIsNone(tracker.try_split( - self.bytes_to_position([2, 3, 4]))) - self.assertIsNone(tracker.try_split( - self.bytes_to_position([2, 3, 3]))) - - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 2, 0]))) - self.assertFalse(tracker.try_claim( - self.bytes_to_position([3, 2, 1]))) - - def test_try_test_split_at_position_finite_range(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([0, 0, 0]), - self.bytes_to_position([10, 20, 30])) - # Should fail before first record is returned. - self.assertFalse(tracker.try_split( - self.bytes_to_position([0, 0, 0]))) - self.assertFalse(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - - self.assertTrue(tracker.try_claim( - self.bytes_to_position([1, 2, 3]))) - - # Should now succeed. - self.assertTrue(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - # Should not split at same or larger position. - self.assertFalse(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6]))) - self.assertFalse(tracker.try_split( - self.bytes_to_position([3, 4, 5, 6, 7]))) - self.assertFalse(tracker.try_split( - self.bytes_to_position([4, 5, 6, 7]))) - - # Should split at smaller position. - self.assertTrue(tracker.try_split( - self.bytes_to_position([3, 2, 1]))) - # But not at a position at or before last returned record. - self.assertFalse(tracker.try_split( - self.bytes_to_position([1, 2, 3]))) - - self.assertTrue(tracker.try_claim( - self.bytes_to_position([2, 3, 4]))) - self.assertTrue(tracker.try_claim( - self.bytes_to_position([3, 2, 0]))) - self.assertFalse(tracker.try_claim( - self.bytes_to_position([3, 2, 1]))) - - def test_split_points(self): - tracker = range_trackers.GroupedShuffleRangeTracker( - self.bytes_to_position([1, 0, 0]), - self.bytes_to_position([5, 0, 0])) - self.assertEqual(tracker.split_points(), - (0, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - self.assertTrue(tracker.try_claim(self.bytes_to_position([1, 2, 3]))) - self.assertEqual(tracker.split_points(), - (0, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - self.assertTrue(tracker.try_claim(self.bytes_to_position([1, 2, 5]))) - self.assertEqual(tracker.split_points(), - (1, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 6, 8]))) - self.assertEqual(tracker.split_points(), - (2, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - self.assertTrue(tracker.try_claim(self.bytes_to_position([4, 255, 255]))) - self.assertEqual(tracker.split_points(), - (3, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - self.assertFalse(tracker.try_claim(self.bytes_to_position([5, 1, 0]))) - self.assertEqual(tracker.split_points(), - (3, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)) - - class OrderedPositionRangeTrackerTest(unittest.TestCase): class DoubleRangeTracker(range_trackers.OrderedPositionRangeTracker):