damccorm opened a new issue, #19714:
URL: https://github.com/apache/beam/issues/19714
Hi all,
I am having a potential issue with Windowing on cloud PubsubIO.
I am finding that FixedWindows do not trigger on either DirectRunner or
DataflowRunner after running a GroupBy transform.
A basic pipeline with my use case would look like:
```
Pipeline p = Pipeline.create();
PubsubIO.Read<PubsubMessage> read = PubsubIO
.readMessagesWithAttributes()
.withTimestampAttribute("time")
.fromTopic("test-topic");
Window<String> window = Window.<String>into(
FixedWindows.of(Duration.standardSeconds(10L))
)
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(10L))
.discardingFiredPanes();
PCollection<KV<String,
Iterable<PubsubMessage>>> windowedMessages = p
.apply("Read Events", read)
.apply("Apply Window",
window)
.apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
.apply("Group by key",
GroupByKey.<String, PubsubMessage>create())
.apply("Log Pairs", ParDo.of(new LogGroupedEvents()));
```
LogGroupedEvents would log the key as a string, and the array of
PubsubMessages in the grouped array. But this function never runs correctly.
For simplicity I have simplified the pipeline to demonstrate the issue and
have removed the actual use case of the pipeline. Therefore it may seem odd
that I am grouping and logging simple messages but that is actually not what I
am doing.
If I swap the windowing function for one with triggers it works correctly.
```
Window<String> getDefaultWindow(Long duration) {
return Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(duration)
)
))
.withAllowedLateness(Duration.standardSeconds(10L))
.discardingFiredPanes()
;
}
```
This could be due to me not understanding windowing and triggers but
according the documentation and many examples online all that people use is a
simple FixedWindow because it needs to automatically run a trigger at the end
of the window per the beam docs:
[https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function](https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function)
On example 7.3.1.
I have been researching as much as I can about how windowing works
internally. We arrived to our solution with triggering by looking at source
code.
Let me know if there is any other information you need from me to help look
into this.
Imported from Jira
[BEAM-7853](https://issues.apache.org/jira/browse/BEAM-7853). Original Jira may
contain additional context.
Reported by: gregorskii.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]