[
https://issues.apache.org/jira/browse/BEAM-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250905#comment-17250905
]
Kalanyu Zintus-art commented on BEAM-7853:
------------------------------------------
I'm trouble by this for the past 2 years and now I know it's this issue. I was
using PubsubIO to generate side inputs and it never respected FixedWindow.
Causing faulty behaviors where side inputs will not fire and eventually clog
the pipeline.
> PubsubIO and FixedWindows
> -------------------------
>
> Key: BEAM-7853
> URL: https://issues.apache.org/jira/browse/BEAM-7853
> Project: Beam
> Issue Type: Bug
> Components: beam-model
> Reporter: Gregory Parsons
> Priority: P3
>
> Hi all,
> I am having a potential issue with Windowing on cloud PubsubIO.
> I am finding that FixedWindows do not trigger on either DirectRunner or
> DataflowRunner after running a GroupBy transform.
> A basic pipeline with my use case would look like:
>
> {code:java}
> Pipeline p = Pipeline.create();
> PubsubIO.Read<PubsubMessage> read = PubsubIO
> .readMessagesWithAttributes()
> .withTimestampAttribute("time")
> .fromTopic("test-topic");
> Window<String> window = Window.<String>into(
> FixedWindows.of(Duration.standardSeconds(10L))
> )
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.standardSeconds(10L))
> .discardingFiredPanes();
> PCollection<KV<String, Iterable<PubsubMessage>>> windowedMessages = p
> .apply("Read Events", read)
> .apply("Apply Window", window)
> .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
> .apply("Group by key", GroupByKey.<String, PubsubMessage>create())
> .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code}
>
> LogGroupedEvents would log the key as a string, and the array of
> PubsubMessages in the grouped array. But this function never runs correctly.
> For simplicity I have simplified the pipeline to demonstrate the issue and
> have removed the actual use case of the pipeline. Therefore it may seem odd
> that I am grouping and logging simple messages but that is actually not what
> I am doing.
>
> If I swap the windowing function for one with triggers it works correctly.
> {code:java}
> Window<String> getDefaultWindow(Long duration) {
> return Window.<String>into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> AfterProcessingTime
> .pastFirstElementInPane()
> .plusDelayOf(Duration.standardSeconds(duration)
> )
> ))
> .withAllowedLateness(Duration.standardSeconds(10L))
> .discardingFiredPanes()
> ;
> }
> {code}
>
> This could be due to me not understanding windowing and triggers but
> according the documentation and many examples online all that people use is a
> simple FixedWindow because it needs to automatically run a trigger at the end
> of the window per the beam docs:
>
> [https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function]
> On example 7.3.1.
>
> I have been researching as much as I can about how windowing works
> internally. We arrived to our solution with triggering by looking at source
> code.
>
> Let me know if there is any other information you need from me to help look
> into this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)