Repository: beam
Updated Branches:
  refs/heads/master f0467b72f -> 3746d4cad


Remove GroupedShuffleRangeTracker which is unused in the SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6d0d798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6d0d798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6d0d798

Branch: refs/heads/master
Commit: c6d0d7983b19ce2e01b7b06a12f704fef17a00cc
Parents: f0467b7
Author: chamik...@google.com <chamik...@google.com>
Authored: Wed Jun 21 10:37:11 2017 -0700
Committer: chamik...@google.com <chamik...@google.com>
Committed: Wed Jun 21 14:05:17 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/range_trackers.py    | 130 -------------
 .../apache_beam/io/range_trackers_test.py       | 186 -------------------
 2 files changed, 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c6d0d798/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py 
b/sdks/python/apache_beam/io/range_trackers.py
index 9cb36e7..bef77d4 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -193,136 +193,6 @@ class OffsetRangeTracker(iobase.RangeTracker):
     self._split_points_unclaimed_callback = callback
 
 
-class GroupedShuffleRangeTracker(iobase.RangeTracker):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  A 'RangeTracker' for positions used by'GroupedShuffleReader'.
-
-  These positions roughly correspond to hashes of keys. In case of hash
-  collisions, multiple groups can have the same position. In that case, the
-  first group at a particular position is considered a split point (because
-  it is the first to be returned when reading a position range starting at this
-  position), others are not.
-  """
-
-  def __init__(self, decoded_start_pos, decoded_stop_pos):
-    super(GroupedShuffleRangeTracker, self).__init__()
-    self._decoded_start_pos = decoded_start_pos
-    self._decoded_stop_pos = decoded_stop_pos
-    self._decoded_last_group_start = None
-    self._last_group_was_at_a_split_point = False
-    self._split_points_seen = 0
-    self._lock = threading.Lock()
-
-  def start_position(self):
-    return self._decoded_start_pos
-
-  def stop_position(self):
-    return self._decoded_stop_pos
-
-  def last_group_start(self):
-    return self._decoded_last_group_start
-
-  def _validate_decoded_group_start(self, decoded_group_start, split_point):
-    if self.start_position() and decoded_group_start < self.start_position():
-      raise ValueError('Trying to return record at %r which is before the'
-                       ' starting position at %r' %
-                       (decoded_group_start, self.start_position()))
-
-    if (self.last_group_start() and
-        decoded_group_start < self.last_group_start()):
-      raise ValueError('Trying to return group at %r which is before the'
-                       ' last-returned group at %r' %
-                       (decoded_group_start, self.last_group_start()))
-    if (split_point and self.last_group_start() and
-        self.last_group_start() == decoded_group_start):
-      raise ValueError('Trying to return a group at a split point with '
-                       'same position as the previous group: both at %r, '
-                       'last group was %sat a split point.' %
-                       (decoded_group_start,
-                        ('' if self._last_group_was_at_a_split_point
-                         else 'not ')))
-    if not split_point:
-      if self.last_group_start() is None:
-        raise ValueError('The first group [at %r] must be at a split point' %
-                         decoded_group_start)
-      if self.last_group_start() != decoded_group_start:
-        # This case is not a violation of general RangeTracker semantics, but 
it
-        # is contrary to how GroupingShuffleReader in particular works. Hitting
-        # it would mean it's behaving unexpectedly.
-        raise ValueError('Trying to return a group not at a split point, but '
-                         'with a different position than the previous group: '
-                         'last group was %r at %r, current at a %s split'
-                         ' point.' %
-                         (self.last_group_start()
-                          , decoded_group_start
-                          , ('' if self._last_group_was_at_a_split_point
-                             else 'non-')))
-
-  def try_claim(self, decoded_group_start):
-    with self._lock:
-      self._validate_decoded_group_start(decoded_group_start, True)
-      if (self.stop_position()
-          and decoded_group_start >= self.stop_position()):
-        return False
-
-      self._decoded_last_group_start = decoded_group_start
-      self._last_group_was_at_a_split_point = True
-      self._split_points_seen += 1
-      return True
-
-  def set_current_position(self, decoded_group_start):
-    with self._lock:
-      self._validate_decoded_group_start(decoded_group_start, False)
-      self._decoded_last_group_start = decoded_group_start
-      self._last_group_was_at_a_split_point = False
-
-  def try_split(self, decoded_split_position):
-    with self._lock:
-      if self.last_group_start() is None:
-        logging.info('Refusing to split %r at %r: unstarted'
-                     , self, decoded_split_position)
-        return
-
-      if decoded_split_position <= self.last_group_start():
-        logging.info('Refusing to split %r at %r: already past proposed split '
-                     'position'
-                     , self, decoded_split_position)
-        return
-
-      if ((self.stop_position()
-           and decoded_split_position >= self.stop_position())
-          or (self.start_position()
-              and decoded_split_position <= self.start_position())):
-        logging.error('Refusing to split %r at %r: proposed split position out 
'
-                      'of range', self, decoded_split_position)
-        return
-
-      logging.debug('Agreeing to split %r at %r'
-                    , self, decoded_split_position)
-      self._decoded_stop_pos = decoded_split_position
-
-      # Since GroupedShuffleRangeTracker cannot determine relative sizes of the
-      # two splits, returning 0.5 as the fraction below so that the framework
-      # assumes the splits to be of the same size.
-      return self._decoded_stop_pos, 0.5
-
-  def fraction_consumed(self):
-    # GroupingShuffle sources have special support on the service and the
-    # service will estimate progress from positions for us.
-    raise RuntimeError('GroupedShuffleRangeTracker does not measure fraction'
-                       ' consumed due to positions being opaque strings'
-                       ' that are interpreted by the service')
-
-  def split_points(self):
-    with self._lock:
-      splits_points_consumed = (
-          0 if self._split_points_seen <= 1 else (self._split_points_seen - 1))
-
-      return (splits_points_consumed,
-              iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)
-
-
 class OrderedPositionRangeTracker(iobase.RangeTracker):
   """
   An abstract base class for range trackers whose positions are comparable.

http://git-wip-us.apache.org/repos/asf/beam/blob/c6d0d798/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py 
b/sdks/python/apache_beam/io/range_trackers_test.py
index edb6386..3e92663 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -17,14 +17,11 @@
 
 """Unit tests for the range_trackers module."""
 
-import array
 import copy
 import logging
 import math
 import unittest
 
-
-from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 
 
@@ -189,189 +186,6 @@ class OffsetRangeTrackerTest(unittest.TestCase):
                      (3, 41))
 
 
-class GroupedShuffleRangeTrackerTest(unittest.TestCase):
-
-  def bytes_to_position(self, bytes_array):
-    return array.array('B', bytes_array).tostring()
-
-  def test_try_return_record_in_infinite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker('', '')
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-
-  def test_try_return_record_finite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([4, 255, 255, 255])))
-    # Should fail for positions that are lexicographically equal to or larger
-    # than the defined stop position.
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([5, 0, 0])))
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([5, 0, 1])))
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([6, 0, 0])))
-
-  def test_try_return_record_with_non_split_point(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 3]))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 3]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 5]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([4, 255, 255, 255])))
-
-  def test_first_record_non_split_point(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    with self.assertRaises(ValueError):
-      tracker.set_current_position(self.bytes_to_position([3, 4, 5]))
-
-  def test_non_split_point_record_with_different_position(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5])))
-    with self.assertRaises(ValueError):
-      tracker.set_current_position(self.bytes_to_position([3, 4, 6]))
-
-  def test_try_return_record_before_start(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([1, 2, 3]))
-
-  def test_try_return_non_monotonic(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5])))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 6])))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([3, 2, 1]))
-
-  def test_try_return_identical_positions(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 4, 5])))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([3, 4, 5]))
-
-  def test_try_split_at_position_infinite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker('', '')
-    # Should fail before first record is returned.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-
-    # Should now succeed.
-    self.assertIsNotNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    # Should not split at same or larger position.
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6, 7])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([4, 5, 6, 7])))
-
-    # Should split at smaller position.
-    self.assertIsNotNone(tracker.try_split(
-        self.bytes_to_position([3, 2, 1])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([2, 3, 4])))
-
-    # Should not split at a position we're already past.
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([2, 3, 4])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([2, 3, 3])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 2, 0])))
-    self.assertFalse(tracker.try_claim(
-        self.bytes_to_position([3, 2, 1])))
-
-  def test_try_test_split_at_position_finite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([0, 0, 0]),
-        self.bytes_to_position([10, 20, 30]))
-    # Should fail before first record is returned.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([0, 0, 0])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-
-    # Should now succeed.
-    self.assertTrue(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    # Should not split at same or larger position.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6, 7])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([4, 5, 6, 7])))
-
-    # Should split at smaller position.
-    self.assertTrue(tracker.try_split(
-        self.bytes_to_position([3, 2, 1])))
-    # But not at a position at or before last returned record.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([1, 2, 3])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([2, 3, 4])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 2, 0])))
-    self.assertFalse(tracker.try_claim(
-        self.bytes_to_position([3, 2, 1])))
-
-  def test_split_points(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([1, 0, 0]),
-        self.bytes_to_position([5, 0, 0]))
-    self.assertEqual(tracker.split_points(),
-                     (0, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([1, 2, 3])))
-    self.assertEqual(tracker.split_points(),
-                     (0, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([1, 2, 5])))
-    self.assertEqual(tracker.split_points(),
-                     (1, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 6, 8])))
-    self.assertEqual(tracker.split_points(),
-                     (2, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([4, 255, 255])))
-    self.assertEqual(tracker.split_points(),
-                     (3, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-    self.assertFalse(tracker.try_claim(self.bytes_to_position([5, 1, 0])))
-    self.assertEqual(tracker.split_points(),
-                     (3, iobase.RangeTracker.SPLIT_POINTS_UNKNOWN))
-
-
 class OrderedPositionRangeTrackerTest(unittest.TestCase):
 
   class DoubleRangeTracker(range_trackers.OrderedPositionRangeTracker):

Reply via email to