robertwb commented on code in PR #32326:
URL: https://github.com/apache/beam/pull/32326#discussion_r1792502125


##########
sdks/python/apache_beam/runners/worker/bundle_processor.py:
##########
@@ -704,6 +745,187 @@ def commit(self):
       to_await.get()
 
 
+class RangeSet:
+  """For Internal Use only. A simple range set for ranges of [x,y)."""
+  def __init__(self) -> None:
+    # The start points and end points are stored separately in order.
+    self._sorted_starts = SortedList()
+    self._sorted_ends = SortedList()
+
+  def add(self, start: int, end: int) -> None:
+    if start >= end:
+      return
+
+    # ranges[:min_idx] and ranges[max_idx:] is unaffected by this insertion
+    # the first range whose end point >= the start of the new range
+    min_idx = self._sorted_ends.bisect_left(start)
+    # the first range whose start point > the end point of the new range
+    max_idx = self._sorted_starts.bisect_right(end)
+
+    if min_idx >= len(self._sorted_starts) or max_idx <= 0:
+      # the new range is beyond any current ranges
+      new_start = start
+      new_end = end
+    else:
+      # the new range overlaps with ranges[min_idx:max_idx]
+      new_start = min(start, self._sorted_starts[min_idx])
+      new_end = max(end, self._sorted_ends[max_idx - 1])
+
+      del self._sorted_starts[min_idx:max_idx]
+      del self._sorted_ends[min_idx:max_idx]
+
+    self._sorted_starts.add(new_start)
+    self._sorted_ends.add(new_end)
+
+  def __contains__(self, key: int) -> bool:
+    idx = self._sorted_starts.bisect_left(key)
+    return (idx < len(self._sorted_starts) and self._sorted_starts[idx] == key
+            ) or (idx > 0 and self._sorted_ends[idx - 1] > key)
+
+  def __len__(self) -> int:
+    assert len(self._sorted_starts) == len(self._sorted_ends)
+    return len(self._sorted_starts)
+
+  def __iter__(self) -> Iterator[Tuple[int, int]]:
+    for i in zip(self._sorted_starts, self._sorted_ends):
+      yield i
+
+  def __str__(self) -> str:
+    return str(list(zip(self._sorted_starts, self._sorted_ends)))
+
+
+class SynchronousOrderedListRuntimeState(userstate.OrderedListRuntimeState):
+  RANGE_MIN = -(1 << 63)
+  RANGE_MAX = (1 << 63) - 1
+
+  def __init__(
+      self,
+      state_handler: sdk_worker.CachingStateHandler,
+      state_key: beam_fn_api_pb2.StateKey,
+      value_coder: coders.Coder) -> None:
+    self._state_handler = state_handler
+    self._state_key = state_key
+    self._elem_coder = beam.coders.TupleCoder(
+        [coders.VarIntCoder(), coders.coders.LengthPrefixCoder(value_coder)])

Review Comment:
   I don't think a user should generally be referencing this one directly, it's 
more of an internal coder. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to