[
https://issues.apache.org/jira/browse/BEAM-2687?focusedWorklogId=145598&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145598
]
ASF GitHub Bot logged work on BEAM-2687:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/18 08:53
Start Date: 19/Sep/18 08:53
Worklog Time Spent: 10m
Work Description: charlesccychen commented on a change in pull request
#6349: [BEAM-2687] Implement State over the Fn API
URL: https://github.com/apache/beam/pull/6349#discussion_r218718655
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -198,6 +203,85 @@ def is_globally_windowed(self):
== sideinputs._global_window_mapping_fn)
+class CombiningValueRuntimeState(userstate.RuntimeState):
+ def __init__(self, underlying_bag_state, combinefn):
+ self._combinefn = combinefn
+ self._underlying_bag_state = underlying_bag_state
+
+ def _read_accumulator(self, rewrite=True):
+ merged_accumulator = self._combinefn.merge_accumulators(
+ self._underlying_bag_state.read())
+ if rewrite:
+ self._underlying_bag_state.clear()
+ self._underlying_bag_state.add(merged_accumulator)
+ return merged_accumulator
+
+ def read(self):
+ return self._combinefn.extract_output(self._read_accumulator())
+
+ def add(self, value):
+ # Prefer blind writes, but don't let them grow unboundedly.
+ if random.random() < 0.5:
+ accumulator = self._read_accumulator(False)
+ self._underlying_bag_state.clear()
+ else:
+ accumulator = self._combinefn.create_accumulator()
+ self._underlying_bag_state.add(
+ self._combinefn.add_input(accumulator, value))
+
+ def clear(self):
+ self._underlying_bag_state.clear()
+
+
+# TODO(BEAM-5428): Implement cross-bundle state caching.
+class SynchronousBagRuntimeState(userstate.RuntimeState):
+ def __init__(self, state_handler, state_key, value_coder):
+ self._state_handler = state_handler
+ self._state_key = state_key
+ self._value_coder = value_coder
+
+ def read(self):
+ return IterableState(
+ self._state_handler, self._state_key, self._value_coder)
+
+ def add(self, value):
+ self._state_handler.blocking_append(
+ self._state_key, self._value_coder.encode(value))
+
+ def clear(self):
+ self._state_handler.blocking_clear(self._state_key)
+
+
+class UserStateContext(userstate.UserStateContext):
Review comment:
Should we name this something else (maybe `FnApiUserStateContext`?) to
differentiate it from the base?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145598)
Time Spent: 1h 10m (was: 1h)
> Python SDK support for Stateful Processing
> ------------------------------------------
>
> Key: BEAM-2687
> URL: https://issues.apache.org/jira/browse/BEAM-2687
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Assignee: Charles Chen
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Python SDK should support stateful processing
> (https://beam.apache.org/blog/2017/02/13/stateful-processing.html)
> In the meantime, runner capability matrix should be updated to show the lack
> of this feature
> (https://beam.apache.org/documentation/runners/capability-matrix/)
> Use this as an umbrella issue for all related issues.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)