[ 
https://issues.apache.org/jira/browse/BEAM-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-7853:
-------------------------------
    Status: Open  (was: Triage Needed)

> PubsubIO and FixedWindows
> -------------------------
>
>                 Key: BEAM-7853
>                 URL: https://issues.apache.org/jira/browse/BEAM-7853
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Gregory Parsons
>            Priority: Major
>
> Hi all,
> I am having a potential issue with Windowing on cloud PubsubIO.
> I am finding that FixedWindows do not trigger on either DirectRunner or 
> DataflowRunner after running a GroupBy transform.
> A basic pipeline with my use case would look like:
>  
> {code:java}
> Pipeline p = Pipeline.create();
> PubsubIO.Read<PubsubMessage> read = PubsubIO
>     .readMessagesWithAttributes()
>     .withTimestampAttribute("time")
>     .fromTopic("test-topic");
> Window<String> window = Window.<String>into(
>     FixedWindows.of(Duration.standardSeconds(10L))
> )
>     .triggering(AfterWatermark.pastEndOfWindow())
>     .withAllowedLateness(Duration.standardSeconds(10L))
>     .discardingFiredPanes();
> PCollection<KV<String, Iterable<PubsubMessage>>> windowedMessages = p
>     .apply("Read Events", read)
>     .apply("Apply Window", window)
>     .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
>     .apply("Group by key", GroupByKey.<String, PubsubMessage>create())
>     .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code}
>  
> LogGroupedEvents would log the key as a string, and the array of 
> PubsubMessages in the grouped array. But this function never runs correctly.
> For simplicity I have simplified the pipeline to demonstrate the issue and 
> have removed the actual use case of the pipeline. Therefore it may seem odd 
> that I am grouping and logging simple messages but that is actually not what 
> I am doing.
>  
> If I swap the windowing function for one with triggers it works correctly.
> {code:java}
> Window<String> getDefaultWindow(Long duration) {
>     return Window.<String>into(new GlobalWindows())
>         .triggering(Repeatedly.forever(
>             AfterProcessingTime
>                 .pastFirstElementInPane()
>                 .plusDelayOf(Duration.standardSeconds(duration)
>             )
>         ))
>         .withAllowedLateness(Duration.standardSeconds(10L))
>         .discardingFiredPanes()
>     ;
> }
> {code}
>  
> This could be due to me not understanding windowing and triggers but 
> according the documentation and many examples online all that people use is a 
> simple FixedWindow because it needs to automatically run a trigger at the end 
> of the window per the beam docs:
>  
> [https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function]
> On example 7.3.1.
>  
> I have been researching as much as I can about how windowing works 
> internally. We arrived to our solution with triggering by looking at source 
> code.
>  
> Let me know if there is any other information you need from me to help look 
> into this.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to