robertwb commented on code in PR #32326:
URL: https://github.com/apache/beam/pull/32326#discussion_r1792518055


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:
##########
@@ -1061,24 +1074,74 @@ def get_raw(self,
           'Unknown state type: ' + state_key.WhichOneof('type'))
 
     with self._lock:
-      full_state = self._state[self._to_key(state_key)]
-      if self._use_continuation_tokens:
-        # The token is "nonce:index".
-        if not continuation_token:
-          token_base = b'token_%x' % len(self._continuations)
-          self._continuations[token_base] = tuple(full_state)
-          return b'', b'%s:0' % token_base
+      if state_key.WhichOneof('type') == 'ordered_list_user_state':
+        maybe_start = state_key.ordered_list_user_state.range.start
+        maybe_end = state_key.ordered_list_user_state.range.end
+        persistent_state_key = beam_fn_api_pb2.StateKey()
+        persistent_state_key.CopyFrom(state_key)
+        persistent_state_key.ordered_list_user_state.ClearField("range")
+
+        available_keys = self._ordered_list_keys[self._to_key(
+            persistent_state_key)]
+
+        if self._use_continuation_tokens and continuation_token:

Review Comment:
   As before, let's not re-implement different continuation token logic, as 
this is only for testing. Instead, assign `output` (as computed on line 1118) 
to `full_state` and share the rest of the code. 



##########
sdks/python/apache_beam/runners/worker/bundle_processor.py:
##########
@@ -704,6 +745,187 @@ def commit(self):
       to_await.get()
 
 
+class RangeSet:
+  """For Internal Use only. A simple range set for ranges of [x,y)."""

Review Comment:
   Thanks. +1 for consistency even if the name isn't the best.



##########
sdks/python/apache_beam/runners/worker/bundle_processor.py:
##########
@@ -704,6 +745,187 @@ def commit(self):
       to_await.get()
 
 
+class RangeSet:
+  """For Internal Use only. A simple range set for ranges of [x,y)."""
+  def __init__(self) -> None:
+    # The start points and end points are stored separately in order.
+    self._sorted_starts = SortedList()
+    self._sorted_ends = SortedList()
+
+  def add(self, start: int, end: int) -> None:
+    if start >= end:
+      return
+
+    # ranges[:min_idx] and ranges[max_idx:] is unaffected by this insertion
+    # the first range whose end point >= the start of the new range
+    min_idx = self._sorted_ends.bisect_left(start)
+    # the first range whose start point > the end point of the new range
+    max_idx = self._sorted_starts.bisect_right(end)
+
+    if min_idx >= len(self._sorted_starts) or max_idx <= 0:
+      # the new range is beyond any current ranges
+      new_start = start
+      new_end = end
+    else:
+      # the new range overlaps with ranges[min_idx:max_idx]
+      new_start = min(start, self._sorted_starts[min_idx])
+      new_end = max(end, self._sorted_ends[max_idx - 1])
+
+      del self._sorted_starts[min_idx:max_idx]
+      del self._sorted_ends[min_idx:max_idx]
+
+    self._sorted_starts.add(new_start)
+    self._sorted_ends.add(new_end)
+
+  def __contains__(self, key: int) -> bool:
+    idx = self._sorted_starts.bisect_left(key)
+    return (idx < len(self._sorted_starts) and self._sorted_starts[idx] == key
+            ) or (idx > 0 and self._sorted_ends[idx - 1] > key)
+
+  def __len__(self) -> int:
+    assert len(self._sorted_starts) == len(self._sorted_ends)
+    return len(self._sorted_starts)
+
+  def __iter__(self) -> Iterator[Tuple[int, int]]:
+    for i in zip(self._sorted_starts, self._sorted_ends):

Review Comment:
   BTW, you don't need to do 
   
   ```
   for i in foo:
     yield i
   ```
   
   when you can do just `return iter(foo)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to