[
https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17122995#comment-17122995
]
Beam JIRA Bot commented on BEAM-6177:
-------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it
has been labeled "stale-P2". If this issue is still affecting you, we care!
Please comment and remove the label. Otherwise, in 14 days the issue will be
moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed
explanation of what these priorities mean.
> AfterProcessingTime not firing
> ------------------------------
>
> Key: BEAM-6177
> URL: https://issues.apache.org/jira/browse/BEAM-6177
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.8.0
> Reporter: Arnaud T
> Priority: P2
> Labels: stale-P2
>
> Hi,
>
> Documentation says that a AfterProcessingTime(X) trigger should fire X
> seconds after the first element is processed, but it appears that this
> trigger never fires when using a Global window on a steady influx of elements.
> Here is my pipeline:
>
> {code:java}
> (p
> | 'pubsub' >>
> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
> | 'window' >> beam.WindowInto(
> window.GlobalWindows(),
> trigger=Repeatedly(AfterProcessingTime(5)),
> accumulation_mode=AccumulationMode.DISCARDING
> )
> | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
> | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
> | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
> | 'clickhouse' >> ClickHouseSink(self._clickhouse_host,
> self._clickhouse_port,self._clickhouse_database)
> )
> {code}
>
>
> I expect that every 5 seconds (as long as elements are pouring in), the
> trigger would fire and my data would be combined. The idea of this pipeline
> is simply to get messages from PubSub, transform them into ClickHouse ORM
> models and then batch save them into ClickHouse, using as much parallelism as
> possible - we do not care about order, etc... Elements can be inserted in any
> order and are not correlated to one another.
> The potential issue is in _class
> AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in
> trigger.py_:
>
> {code:java}
> context.set_timer(
> '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
> {code}
> This will basically override the previously set timer every time a new
> element comes in, and in the case of a constant influx of elements, the
> trigger only fires once we have no more elements for X seconds.
>
>
> Please let me know if I understood the documentation right, and if I can
> further help.
>
> Thanks you,
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)