[ 
https://issues.apache.org/jira/browse/BEAM-2687?focusedWorklogId=170153&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170153
 ]

ASF GitHub Bot logged work on BEAM-2687:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Nov/18 08:02
            Start Date: 28/Nov/18 08:02
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #7102: [BEAM-2687] 
Correctly handle read-before-write semantics for user state.
URL: https://github.com/apache/beam/pull/7102
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 7a8fa07c9e46..9a59b8bbc2ba 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -306,6 +306,56 @@ def process_timer(self):
       expected = [('fired', ts) for ts in (20, 200)]
       assert_that(actual, equal_to(expected))
 
+  @unittest.skipIf(sys.version_info[0] == 3 and
+                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+                   'This test is flaky on on Python 3. '
+                   'TODO: BEAM-5692')
+  def test_pardo_state_timers(self):
+    state_spec = userstate.BagStateSpec('state', beam.coders.StrUtf8Coder())
+    timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+    elements = list('abcdefgh')
+    buffer_size = 3
+
+    class BufferDoFn(beam.DoFn):
+      def process(self,
+                  kv,
+                  ts=beam.DoFn.TimestampParam,
+                  timer=beam.DoFn.TimerParam(timer_spec),
+                  state=beam.DoFn.StateParam(state_spec)):
+        _, element = kv
+        state.add(element)
+        buffer = state.read()
+        # For real use, we'd keep track of this size separately.
+        if len(list(buffer)) >= 3:
+          state.clear()
+          yield buffer
+        else:
+          timer.set(ts + 1)
+
+      @userstate.on_timer(timer_spec)
+      def process_timer(self, state=beam.DoFn.StateParam(state_spec)):
+        buffer = state.read()
+        state.clear()
+        yield buffer
+
+    def is_buffered_correctly(actual):
+      # Pickling self in the closure for asserts gives errors (only on 
jenkins).
+      self = FnApiRunnerTest('__init__')
+      # Acutal should be a grouping of the inputs into batches of size
+      # at most buffer_size, but the actual batching is nondeterministic
+      # based on ordering and trigger firing timing.
+      self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements)
+      self.assertEqual(max(len(list(buffer)) for buffer in actual), 
buffer_size)
+
+    with self.create_pipeline() as p:
+      actual = (
+          p
+          | beam.Create(elements)
+          | beam.Map(lambda x: ('key', x))
+          | beam.ParDo(BufferDoFn()))
+
+      assert_that(actual, is_buffered_correctly)
+
   def test_group_by_key(self):
     with self.create_pipeline() as p:
       res = (p
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 6164f11ccb30..92a0c0ddc7f7 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -240,6 +240,25 @@ def add(self, value):
   def clear(self):
     self._underlying_bag_state.clear()
 
+  def _commit(self):
+    self._underlying_bag_state._commit()
+
+
+class _ConcatIterable(object):
+  """An iterable that is the concatination of two iterables.
+
+  Unlike itertools.chain, this allows reiteration.
+  """
+  def __init__(self, first, second):
+    self.first = first
+    self.second = second
+
+  def __iter__(self):
+    for elem in self.first:
+      yield elem
+    for elem in self.second:
+      yield elem
+
 
 # TODO(BEAM-5428): Implement cross-bundle state caching.
 class SynchronousBagRuntimeState(userstate.RuntimeState):
@@ -247,17 +266,31 @@ def __init__(self, state_handler, state_key, value_coder):
     self._state_handler = state_handler
     self._state_key = state_key
     self._value_coder = value_coder
+    self._cleared = False
+    self._added_elements = []
 
   def read(self):
-    return _StateBackedIterable(
-        self._state_handler, self._state_key, self._value_coder)
+    return _ConcatIterable(
+        [] if self._cleared else _StateBackedIterable(
+            self._state_handler, self._state_key, self._value_coder),
+        self._added_elements)
 
   def add(self, value):
-    self._state_handler.blocking_append(
-        self._state_key, self._value_coder.encode(value))
+    self._added_elements.append(value)
 
   def clear(self):
-    self._state_handler.blocking_clear(self._state_key)
+    self._cleared = True
+    self._added_elements = []
+
+  def _commit(self):
+    if self._cleared:
+      self._state_handler.blocking_clear(self._state_key)
+    if self._added_elements:
+      value_coder_impl = self._value_coder.get_impl()
+      out = coder_impl.create_OutputStream()
+      for element in self._added_elements:
+        value_coder_impl.encode_to_stream(element, out, True)
+      self._state_handler.blocking_append(self._state_key, out.get())
 
 
 class OutputTimer(object):
@@ -285,6 +318,7 @@ def __init__(
     self._window_coder = window_coder
     self._timer_specs = timer_specs
     self._timer_receivers = None
+    self._all_states = {}
 
   def update_timer_receivers(self, receivers):
     self._timer_receivers = {}
@@ -294,7 +328,13 @@ def update_timer_receivers(self, receivers):
   def get_timer(self, timer_spec, key, window):
     return OutputTimer(key, self._timer_receivers[timer_spec.name])
 
-  def get_state(self, state_spec, key, window):
+  def get_state(self, *args):
+    state_handle = self._all_states.get(args)
+    if state_handle is None:
+      state_handle = self._all_states[args] = self._create_state(*args)
+    return state_handle
+
+  def _create_state(self, state_spec, key, window):
     if isinstance(state_spec,
                   (userstate.BagStateSpec, userstate.CombiningValueStateSpec)):
       bag_state = SynchronousBagRuntimeState(
@@ -313,9 +353,13 @@ def get_state(self, state_spec, key, window):
     else:
       raise NotImplementedError(state_spec)
 
+  def commit(self):
+    for state in self._all_states.values():
+      state._commit()
+
   def reset(self):
     # TODO(BEAM-5428): Implement cross-bundle state caching.
-    pass
+    self._all_states = {}
 
 
 def memoize(func):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 20dec1d69e93..0bbfd36375d8 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -506,6 +506,8 @@ def process_timer(self, tag, windowed_timer):
   def finish(self):
     with self.scoped_finish_state:
       self.dofn_runner.finish()
+      if self.user_state_context:
+        self.user_state_context.commit()
 
   def reset(self):
     super(DoOperation, self).reset()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170153)
    Time Spent: 8h 20m  (was: 8h 10m)

> Python SDK support for Stateful Processing
> ------------------------------------------
>
>                 Key: BEAM-2687
>                 URL: https://issues.apache.org/jira/browse/BEAM-2687
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Charles Chen
>            Priority: Major
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Python SDK should support stateful processing 
> (https://beam.apache.org/blog/2017/02/13/stateful-processing.html)
> In the meantime, runner capability matrix should be updated to show the lack 
> of this feature 
> (https://beam.apache.org/documentation/runners/capability-matrix/)
> Use this as an umbrella issue for all related issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to