[
https://issues.apache.org/jira/browse/BEAM-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768916#comment-16768916
]
Thomas Weise commented on BEAM-6676:
------------------------------------
{code:java}
from __future__ import absolute_import
import logging
import sys
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import userstate
class StatefulFn(beam.DoFn):
count_state_spec = userstate.CombiningValueStateSpec(
'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
def process(self, kv, count=beam.DoFn.StateParam(count_state_spec),
timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
count.add(1)
timer.set(1000)
logging.info("###count %s, %d, window %s" % (kv, count.read(), window))
@userstate.on_timer(timer_spec)
def process_timer(self, count=beam.DoFn.StateParam(count_state_spec),
window=beam.DoFn.WindowParam):
logging.info("###count %d, window %s" % (count.read(), window))
def run(argv=None):
"""Build and run the pipeline."""
args = ["--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--streaming",
"--shutdown_sources_on_final_watermark"]
if argv:
args.extend(argv)
pipeline_options = PipelineOptions(args)
p = beam.Pipeline(options=pipeline_options)
_ = (p
| beam.Create([('k1', 1), ('k1', 2)])
| 'window' >> beam.WindowInto(window.FixedWindows(5))
| 'statefulCount' >> beam.ParDo(StatefulFn())
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(sys.argv[1:])
{code}
> Python timers only working with GlobalWindow
> --------------------------------------------
>
> Key: BEAM-6676
> URL: https://issues.apache.org/jira/browse/BEAM-6676
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.10.0
> Reporter: Thomas Weise
> Priority: Major
>
> Setting a timer with the Py SDK fails with fixed window (portable runner).
> Test case attached.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)