[ 
https://issues.apache.org/jira/browse/BEAM-5264?focusedWorklogId=142219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142219
 ]

ASF GitHub Bot logged work on BEAM-5264:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Sep/18 15:17
            Start Date: 07/Sep/18 15:17
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #6304: 
[BEAM-5264] Reference DirectRunner implementation of Python User State and 
Timers API
URL: https://github.com/apache/beam/pull/6304#discussion_r215943639
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/userstate.py
 ##########
 @@ -161,3 +176,127 @@ def validate_stateful_dofn(dofn):
             ('The on_timer callback for %s is not the specified .%s method '
              'for DoFn %r (perhaps it was overwritten?).') % (
                  timer_spec, method_name, dofn))
+
+
+class RuntimeTimer(object):
+  """Timer interface object passed to user code."""
+
+  def __init__(self, timer_spec):
+    self._cleared = False
+    self._new_timestamp = None
+
+  def clear(self):
+    self._cleared = True
+    self._new_timestamp = None
+
+  def set(self, timestamp):
+    self._new_timestamp = timestamp
+
+
+class RuntimeState(object):
+  """State interface object passed to user code."""
+
+  def __init__(self, state_spec, state_tag, current_value_accessor):
+    self._state_spec = state_spec
+    self._state_tag = state_tag
+    self._current_value_accessor = current_value_accessor
+
+  @staticmethod
+  def for_spec(state_spec, state_tag, current_value_accessor):
+    if isinstance(state_spec, BagStateSpec):
+      return BagRuntimeState(state_spec, state_tag, current_value_accessor)
+    elif isinstance(state_spec, CombiningValueStateSpec):
+      return CombiningValueRuntimeState(state_spec, state_tag,
+                                        current_value_accessor)
+    else:
+      raise ValueError('Invalid state spec: %s' % state_spec)
+
+  def _encode(self, value):
+    return self._state_spec.coder.encode(value)
+
+  def _decode(self, value):
+    return self._state_spec.coder.decode(value)
+
+  def prefetch(self):
+    # The default implementation here does nothing.
+    pass
+
+
+# Sentinel designating an unread value.
+UNREAD_VALUE = object()
+
+
+class BagRuntimeState(RuntimeState):
+  """Bag state interface object passed to user code."""
+
+  def __init__(self, state_spec, state_tag, current_value_accessor):
+    super(BagRuntimeState, self).__init__(
+        state_spec, state_tag, current_value_accessor)
+    self._cached_value = UNREAD_VALUE
+    self._cleared = False
+    self._new_values = []
+
+  def read(self):
+    if self._cached_value is UNREAD_VALUE:
+      self._cached_value = self._current_value_accessor()
+    if not self._cleared:
+      encoded_values = itertools.chain(self._cached_value, self._new_values)
+    else:
+      encoded_values = self._new_values
+    return (self._decode(v) for v in encoded_values)
+
+  def add(self, value):
+    self._new_values.append(self._encode(value))
+
+  def clear(self):
+    self._cleared = True
+    self._cached_value = []
+    self._new_values = []
+
+
+class CombiningValueRuntimeState(RuntimeState):
+  """Combining value state interface object passed to user code."""
+
+  def __init__(self, state_spec, state_tag, current_value_accessor):
+    super(CombiningValueRuntimeState, self).__init__(
+        state_spec, state_tag, current_value_accessor)
+    self._current_accumulator = UNREAD_VALUE
+    self._modified = False
+    self._combine_fn = state_spec.combine_fn
+
+  def _read_initial_value(self):
+    if self._current_accumulator is UNREAD_VALUE:
+      existing_accumulators = list(
+          self._decode(a) for a in self._current_value_accessor())
+      if existing_accumulators:
+        self._current_accumulator = self._combine_fn.merge_accumulators(
+            existing_accumulators)
+      else:
+        self._current_accumulator = self._combine_fn.create_accumulator()
+
+  def read(self):
+    self._read_initial_value()
+    return self._combine_fn.extract_output(self._current_accumulator)
+
+  def add(self, value):
+    self._read_initial_value()
+    self._modified = True
+    self._current_accumulator = self._combine_fn.add_input(
+        self._current_accumulator, value)
+
+  def clear(self):
+    self._modified = True
+    self._current_accumulator = self._combine_fn.create_accumulator()
+
+
+class UserStateContext(object):
 
 Review comment:
   Perhaps this would better be named UserStateAndTimerFactory, analogous to 
how CounterFactory provides user-level objects that the user then 
queries/updates?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 142219)

> Reference DirectRunner implementation of Python user state and timers API
> -------------------------------------------------------------------------
>
>                 Key: BEAM-5264
>                 URL: https://issues.apache.org/jira/browse/BEAM-5264
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.6.0
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> This issue tracks the reference DirectRunner implementation of the Beam 
> Python User State and Timer API, described here: 
> [https://s.apache.org/beam-python-user-state-and-timers].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to