This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 476e75e  [BEAM-6934] Fixing timer firing timing issue (#8252)
476e75e is described below

commit 476e75e43c918fbe33af1bcc7097d5c7441f8775
Author: Pablo <[email protected]>
AuthorDate: Tue Apr 9 15:45:55 2019 -0700

    [BEAM-6934] Fixing timer firing timing issue (#8252)
    
    * Fixing timer firing timing issue
---
 .../runners/direct/watermark_manager.py            |  2 +
 sdks/python/apache_beam/transforms/userstate.py    |  7 +++-
 .../apache_beam/transforms/userstate_test.py       | 43 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 8b50919..c0688a4 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -160,6 +160,8 @@ class WatermarkManager(object):
     for applied_ptransform, tw in self._transform_to_watermarks.items():
       fired_timers, had_realtime_timer = tw.extract_transform_timers()
       if fired_timers:
+        # We should sort the timer firings, so they are fired in order.
+        fired_timers.sort(key=lambda ft: ft.timestamp)
         all_timers.append((applied_ptransform, fired_timers))
       if (had_realtime_timer
           and tw.output_watermark < WatermarkManager.WATERMARK_POS_INF):
diff --git a/sdks/python/apache_beam/transforms/userstate.py 
b/sdks/python/apache_beam/transforms/userstate.py
index 0877068..891417b 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -131,7 +131,12 @@ def on_timer(timer_spec):
 
 
 def get_dofn_specs(dofn):
-  """Gets the state and timer specs for a DoFn, if any."""
+  """Gets the state and timer specs for a DoFn, if any.
+
+  Args:
+    dofn (apache_beam.transforms.core.DoFn): The DoFn instance to introspect 
for
+      timer and state specs.
+  """
 
   # Avoid circular import.
   from apache_beam.runners.common import MethodWrapper
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
index cb91cc0..6935a3a 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -25,6 +25,7 @@ import mock
 import apache_beam as beam
 from apache_beam.coders import BytesCoder
 from apache_beam.coders import IterableCoder
+from apache_beam.coders import StrUtf8Coder
 from apache_beam.coders import VarIntCoder
 from apache_beam.runners.common import DoFnSignature
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -366,6 +367,48 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
         [b'A1A2A3', b'A1A2A3A4'],
         StatefulDoFnOnDirectRunnerTest.all_records)
 
+  def test_clearing_bag_state(self):
+    class BagStateClearingStatefulDoFn(beam.DoFn):
+
+      BAG_STATE = BagStateSpec('bag_state', StrUtf8Coder())
+      EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.WATERMARK)
+      CLEAR_TIMER = TimerSpec('clear_timer', TimeDomain.WATERMARK)
+
+      def process(self,
+                  element,
+                  bag_state=beam.DoFn.StateParam(BAG_STATE),
+                  emit_timer=beam.DoFn.TimerParam(EMIT_TIMER),
+                  clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)):
+        value = element[1]
+        bag_state.add(value)
+        clear_timer.set(100)
+        emit_timer.set(1000)
+
+      @on_timer(EMIT_TIMER)
+      def emit_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+        for value in bag_state.read():
+          yield value
+        yield 'extra'
+
+      @on_timer(CLEAR_TIMER)
+      def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+        bag_state.clear()
+
+    with TestPipeline() as p:
+      test_stream = (TestStream()
+                     .advance_watermark_to(0)
+                     .add_elements([('key', 'value')])
+                     .advance_watermark_to(100))
+
+      _ = (p
+           | test_stream
+           | beam.ParDo(BagStateClearingStatefulDoFn())
+           | beam.ParDo(self.record_dofn()))
+
+    self.assertEqual(
+        ['extra'],
+        StatefulDoFnOnDirectRunnerTest.all_records)
+
   def test_stateful_dofn_nonkeyed_input(self):
     p = TestPipeline()
     values = p | beam.Create([1, 2, 3])

Reply via email to