Gregory Parsons created BEAM-7853:
-------------------------------------

             Summary: PubsubIO and FixedWindows
                 Key: BEAM-7853
                 URL: https://issues.apache.org/jira/browse/BEAM-7853
             Project: Beam
          Issue Type: Bug
          Components: beam-model
            Reporter: Gregory Parsons


Hi all,

I am having a potential issue with Windowing on cloud PubsubIO.

I am finding that FixedWindows do not trigger on either DirectRunner or 
DataflowRunner after running a GroupBy transform.

A basic pipeline with my use case would look like:

 
{code:java}
Pipeline p = Pipeline.create();

PubsubIO.Read<PubsubMessage> read = PubsubIO
    .readMessagesWithAttributes()
    .withTimestampAttribute("time")
    .fromTopic("test-topic");

Window<String> window = Window.<String>into(
    FixedWindows.of(Duration.standardSeconds(10L))
)
    .triggering(AfterWatermark.pastEndOfWindow())
    .withAllowedLateness(Duration.standardSeconds(10L))
    .discardingFiredPanes();

PCollection<KV<String, Iterable<PubsubMessage>>> windowedMessages = p
    .apply("Read Events", read)
    .apply("Apply Window", window)
    .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
    .apply("Group by key", GroupByKey.<String, PubsubMessage>create())
    .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code}
 

LogGroupedEvents would log the key as a string, and the array of PubsubMessages 
in the grouped array. But this function never runs correctly.

For simplicity I have simplified the pipeline to demonstrate the issue and have 
removed the actual use case of the pipeline. Therefore it may seem odd that I 
am grouping and logging simple messages but that is actually not what I am 
doing.

 

If I swap the windowing function for one with triggers it works correctly.
{code:java}
Window<String> getDefaultWindow(Long duration) {
    return Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(
            AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(duration)
            )
        ))
        .withAllowedLateness(Duration.standardSeconds(10L))
        .discardingFiredPanes()
    ;
}
{code}
 

This could be due to me not understanding windowing and triggers but according 
the documentation and many examples online all that people use is a simple 
FixedWindow because it needs to automatically run a trigger at the end of the 
window per the beam docs:

 

[https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function]

On example 7.3.1.

 

I have been researching as much as I can about how windowing works internally. 
We arrived to our solution with triggering by looking at source code.

 

Let me know if there is any other information you need from me to help look 
into this.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to