[ 
https://issues.apache.org/jira/browse/BEAM-6934?focusedWorklogId=224734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-224734
 ]

ASF GitHub Bot logged work on BEAM-6934:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Apr/19 01:14
            Start Date: 09/Apr/19 01:14
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #8252: [BEAM-6934] 
Fixing timer firing timing issue
URL: https://github.com/apache/beam/pull/8252#discussion_r273293684
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/userstate_test.py
 ##########
 @@ -366,6 +367,47 @@ def expiry_callback(self, 
buffer=DoFn.StateParam(BUFFER_STATE),
         [b'A1A2A3', b'A1A2A3A4'],
         StatefulDoFnOnDirectRunnerTest.all_records)
 
+  def test_clearing_bag_state(self):
+    class BagStateClearingStatefulDoFn(beam.DoFn):
+
+      BAG_STATE = BagStateSpec('bag_state', StrUtf8Coder())
+      EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.WATERMARK)
+      CLEAR_TIMER = TimerSpec('clear_timer', TimeDomain.WATERMARK)
+
+      def process(self,
+                  element,
+                  bag_state=beam.DoFn.StateParam(BAG_STATE),
+                  emit_timer=beam.DoFn.TimerParam(EMIT_TIMER),
+                  clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)):
+        value = element[1]
+        bag_state.add(value)
+        clear_timer.set(100)
+        emit_timer.set(1000)
+
+      @on_timer(EMIT_TIMER)
+      def emit_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+        for value in bag_state.read():
+          yield value
+
+      @on_timer(CLEAR_TIMER)
+      def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
+        bag_state.clear()
+
+    with TestPipeline() as p:
+      test_stream = (TestStream()
+                     .advance_watermark_to(0)
+                     .add_elements([('key', 'value')])
+                     .advance_watermark_to(100))
+
+      _ = (p
+           | test_stream
+           | beam.ParDo(BagStateClearingStatefulDoFn())
+           | beam.ParDo(self.record_dofn()))
+
+    self.assertEqual(
 
 Review comment:
   I've added a check for that. Thanks!
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 224734)
    Time Spent: 40m  (was: 0.5h)

> Unexpected TestStream behavior when testing stateful DoFn (DirectRunner)
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6934
>                 URL: https://issues.apache.org/jira/browse/BEAM-6934
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> User reported issue:
> """
> I'm running into some unexpected behavior when trying to unit test a stateful 
> DoFn that uses watermark timers as well as bag state.  I'm following the 
> example here: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L333
> Expected behavior:
> When using TestStream, if a stateful DoFn adds value 'foo' to BagState then 
> sets two watermark timers t1 and t2, where t1 clears the bag state and t2 
> reads from bag state and emits the contents, if t1.time < t2.time then 
> nothing should get emitted when the TestPipeline is run. (bag state should be 
> cleared by timer at t1 before it is read from by timer at t2)
> Actual behavior:
> For the scenario described above, results get emitted despite t1.time being 
> less than t2.time.
> I've created a gist with a demonstration of the problem:
> https://gist.github.com/jcruelty/3bf5ce5865110372a2d1650b1421cde1
> """
> (cc: [~ccy])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to