Gregory Parsons created BEAM-7853:
-------------------------------------
Summary: PubsubIO and FixedWindows
Key: BEAM-7853
URL: https://issues.apache.org/jira/browse/BEAM-7853
Project: Beam
Issue Type: Bug
Components: beam-model
Reporter: Gregory Parsons
Hi all,
I am having a potential issue with Windowing on cloud PubsubIO.
I am finding that FixedWindows do not trigger on either DirectRunner or
DataflowRunner after running a GroupBy transform.
A basic pipeline with my use case would look like:
{code:java}
Pipeline p = Pipeline.create();
PubsubIO.Read<PubsubMessage> read = PubsubIO
.readMessagesWithAttributes()
.withTimestampAttribute("time")
.fromTopic("test-topic");
Window<String> window = Window.<String>into(
FixedWindows.of(Duration.standardSeconds(10L))
)
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(10L))
.discardingFiredPanes();
PCollection<KV<String, Iterable<PubsubMessage>>> windowedMessages = p
.apply("Read Events", read)
.apply("Apply Window", window)
.apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
.apply("Group by key", GroupByKey.<String, PubsubMessage>create())
.apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code}
LogGroupedEvents would log the key as a string, and the array of PubsubMessages
in the grouped array. But this function never runs correctly.
For simplicity I have simplified the pipeline to demonstrate the issue and have
removed the actual use case of the pipeline. Therefore it may seem odd that I
am grouping and logging simple messages but that is actually not what I am
doing.
If I swap the windowing function for one with triggers it works correctly.
{code:java}
Window<String> getDefaultWindow(Long duration) {
return Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(duration)
)
))
.withAllowedLateness(Duration.standardSeconds(10L))
.discardingFiredPanes()
;
}
{code}
This could be due to me not understanding windowing and triggers but according
the documentation and many examples online all that people use is a simple
FixedWindow because it needs to automatically run a trigger at the end of the
window per the beam docs:
[https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function]
On example 7.3.1.
I have been researching as much as I can about how windowing works internally.
We arrived to our solution with triggering by looking at source code.
Let me know if there is any other information you need from me to help look
into this.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)