This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 476e75e [BEAM-6934] Fixing timer firing timing issue (#8252)
476e75e is described below
commit 476e75e43c918fbe33af1bcc7097d5c7441f8775
Author: Pablo <[email protected]>
AuthorDate: Tue Apr 9 15:45:55 2019 -0700
[BEAM-6934] Fixing timer firing timing issue (#8252)
* Fixing timer firing timing issue
---
.../runners/direct/watermark_manager.py | 2 +
sdks/python/apache_beam/transforms/userstate.py | 7 +++-
.../apache_beam/transforms/userstate_test.py | 43 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 8b50919..c0688a4 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -160,6 +160,8 @@ class WatermarkManager(object):
for applied_ptransform, tw in self._transform_to_watermarks.items():
fired_timers, had_realtime_timer = tw.extract_transform_timers()
if fired_timers:
+ # We should sort the timer firings, so they are fired in order.
+ fired_timers.sort(key=lambda ft: ft.timestamp)
all_timers.append((applied_ptransform, fired_timers))
if (had_realtime_timer
and tw.output_watermark < WatermarkManager.WATERMARK_POS_INF):
diff --git a/sdks/python/apache_beam/transforms/userstate.py
b/sdks/python/apache_beam/transforms/userstate.py
index 0877068..891417b 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -131,7 +131,12 @@ def on_timer(timer_spec):
def get_dofn_specs(dofn):
- """Gets the state and timer specs for a DoFn, if any."""
+ """Gets the state and timer specs for a DoFn, if any.
+
+ Args:
+ dofn (apache_beam.transforms.core.DoFn): The DoFn instance to introspect
for
+ timer and state specs.
+ """
# Avoid circular import.
from apache_beam.runners.common import MethodWrapper
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py
b/sdks/python/apache_beam/transforms/userstate_test.py
index cb91cc0..6935a3a 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -25,6 +25,7 @@ import mock
import apache_beam as beam
from apache_beam.coders import BytesCoder
from apache_beam.coders import IterableCoder
+from apache_beam.coders import StrUtf8Coder
from apache_beam.coders import VarIntCoder
from apache_beam.runners.common import DoFnSignature
from apache_beam.testing.test_pipeline import TestPipeline
@@ -366,6 +367,48 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
[b'A1A2A3', b'A1A2A3A4'],
StatefulDoFnOnDirectRunnerTest.all_records)
+ def test_clearing_bag_state(self):
+ class BagStateClearingStatefulDoFn(beam.DoFn):
+
+ BAG_STATE = BagStateSpec('bag_state', StrUtf8Coder())
+ EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.WATERMARK)
+ CLEAR_TIMER = TimerSpec('clear_timer', TimeDomain.WATERMARK)
+
+ def process(self,
+ element,
+ bag_state=beam.DoFn.StateParam(BAG_STATE),
+ emit_timer=beam.DoFn.TimerParam(EMIT_TIMER),
+ clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)):
+ value = element[1]
+ bag_state.add(value)
+ clear_timer.set(100)
+ emit_timer.set(1000)
+
+ @on_timer(EMIT_TIMER)
+ def emit_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+ for value in bag_state.read():
+ yield value
+ yield 'extra'
+
+ @on_timer(CLEAR_TIMER)
+ def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+ bag_state.clear()
+
+ with TestPipeline() as p:
+ test_stream = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements([('key', 'value')])
+ .advance_watermark_to(100))
+
+ _ = (p
+ | test_stream
+ | beam.ParDo(BagStateClearingStatefulDoFn())
+ | beam.ParDo(self.record_dofn()))
+
+ self.assertEqual(
+ ['extra'],
+ StatefulDoFnOnDirectRunnerTest.all_records)
+
def test_stateful_dofn_nonkeyed_input(self):
p = TestPipeline()
values = p | beam.Create([1, 2, 3])