kennknowles opened a new issue, #19133:
URL: https://github.com/apache/beam/issues/19133

   Hi,
   
    
   
   Documentation says that a AfterProcessingTime(X) trigger should fire X 
seconds after the first element is processed, but it appears that this trigger 
never fires when using a Global window on a steady influx of elements.
   
   Here is my pipeline:
   
    
   ```
   
   (p
   | 'pubsub' >> 
beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
   | 'window'
   >> beam.WindowInto(
         window.GlobalWindows(),
         trigger=Repeatedly(AfterProcessingTime(5)),
        
   accumulation_mode=AccumulationMode.DISCARDING
         )
   | 'decode' >> 
beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
   |
   'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
   | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
   |
   'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
self._clickhouse_port,self._clickhouse_database)
   )
   
   ```
   
    
   
    
   
   I expect that every 5 seconds (as long as elements are pouring in), the 
trigger would fire and my data would be combined. The idea of this pipeline is 
simply to get messages from PubSub, transform them into ClickHouse ORM models 
and then batch save them into ClickHouse, using as much parallelism as possible 
- we do not care about order, etc... Elements can be inserted in any order and 
are not correlated to one another.
   
   The potential issue is in _class 
AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
trigger.py_:
   
    
   ```
   
   context.set_timer(
       '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
   
   ```
   
   This will basically override the previously set timer every time a new 
element comes in, and in the case of a constant influx of elements, the trigger 
only fires once we have no more elements for X seconds.
   
    
   
    
   
   Please let me know if I understood the documentation right, and if I can 
further help.
   
    
   
   Thanks you,
   
    
   
   Imported from Jira 
[BEAM-6177](https://issues.apache.org/jira/browse/BEAM-6177). Original Jira may 
contain additional context.
   Reported by: atellier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to