Arnaud T created BEAM-6177:
------------------------------

             Summary: AfterProcessingTime not firing
                 Key: BEAM-6177
                 URL: https://issues.apache.org/jira/browse/BEAM-6177
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.8.0
            Reporter: Arnaud T
            Assignee: Ahmet Altay


Hi,

 

Documentation says that a AfterProcessingTime(X) trigger should fire X seconds 
after the first element is processed, but it appears that this trigger never 
fires when using a Global window on a steady influx of elements.

Here is my pipeline:

 
{code:java}
(p
| 'pubsub' >> 
beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
| 'window' >> beam.WindowInto(
      window.GlobalWindows(),
      trigger=Repeatedly(AfterProcessingTime(5)),
      accumulation_mode=AccumulationMode.DISCARDING
      )
| 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
| 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
| 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
| 'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
self._clickhouse_port,self._clickhouse_database)
)
{code}
 

 

I expect that every 5 seconds (as long as elements are pouring in), the trigger 
would fire and my data would be combined. The idea of this pipeline is simply 
to get messages from PubSub, transform them into ClickHouse ORM models and then 
batch save them into ClickHouse, using as much parallelism as possible - we do 
not care about order, etc... Elements can be inserted in any order and are not 
correlated to one another.

The potential issue is in _class 
AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
trigger.py_:

 
{code:java}
context.set_timer(
    '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
{code}
This will basically override the previously set timer every time a new element 
comes in, and in the case of a constant influx of elements, the trigger only 
fires once we have no more elements for X seconds.

 

 

Please let me know if I understood the documentation right, and if I can 
further help.

 

Thanks you,

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to