[
https://issues.apache.org/jira/browse/BEAM-2687?focusedWorklogId=169639&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169639
]
ASF GitHub Bot logged work on BEAM-2687:
----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Nov/18 06:42
Start Date: 27/Nov/18 06:42
Worklog Time Spent: 10m
Work Description: ryan-williams commented on a change in pull request
#7102: [BEAM-2687] Correctly handle read-before-write semantics for user state.
URL: https://github.com/apache/beam/pull/7102#discussion_r236535048
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
##########
@@ -306,6 +306,55 @@ def process_timer(self):
expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test is flaky on on Python 3. '
+ 'TODO: BEAM-5692')
+ def test_pardo_state_timers(self):
+ state_spec = userstate.BagStateSpec('state', beam.coders.StrUtf8Coder())
+ timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+ elements = list('abcdefgh')
+ buffer_size = 3
+
+ class BufferDoFn(beam.DoFn):
+ def process(self,
+ kv,
+ ts=beam.DoFn.TimestampParam,
+ timer=beam.DoFn.TimerParam(timer_spec),
+ state=beam.DoFn.StateParam(state_spec)):
+ _, element = kv
+ state.add(element)
+ buffer = state.read()
+ # For reall use, we'd keep track of this size separately.
+ if len(list(buffer)) >= 3:
+ state.clear()
+ yield buffer
+ else:
+ timer.set(ts + 1)
+
+ @userstate.on_timer(timer_spec)
+ def process_timer(self, state=beam.DoFn.StateParam(state_spec)):
+ buffer = state.read()
+ state.clear()
+ yield buffer
+
+ def is_buffered_correctly(actual):
+ # Issues pickling closure of self on jenkins.
+ self = FnApiRunnerTest('__init__')
+ # assert sorted(sum((list(b) for b in actual), [])) == elements
+ # assert max(len(list(buffer)) for buffer in actual) == buffer_size
Review comment:
```suggestion
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 169639)
Time Spent: 7h 40m (was: 7.5h)
> Python SDK support for Stateful Processing
> ------------------------------------------
>
> Key: BEAM-2687
> URL: https://issues.apache.org/jira/browse/BEAM-2687
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Assignee: Charles Chen
> Priority: Major
> Time Spent: 7h 40m
> Remaining Estimate: 0h
>
> Python SDK should support stateful processing
> (https://beam.apache.org/blog/2017/02/13/stateful-processing.html)
> In the meantime, runner capability matrix should be updated to show the lack
> of this feature
> (https://beam.apache.org/documentation/runners/capability-matrix/)
> Use this as an umbrella issue for all related issues.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)