[
https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-7322:
----------------------------------
This Jira ticket has a pull request attached to it, but is still open. Did the
pull request resolve the issue? If so, could you please mark it resolved? This
will help the project have a clear view of its open issues.
> PubSubIO watermark does not advance for very low volumes
> --------------------------------------------------------
>
> Key: BEAM-7322
> URL: https://issues.apache.org/jira/browse/BEAM-7322
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Reporter: Tim Sell
> Priority: P3
> Attachments: data.json
>
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> I have identified an issue where the watermark does not advance when using
> the beam PubSubIO when volumes are very low.
> I have created a mini example project to demonstrate the behaviour with a
> python script for generating messages at different frequencies:
> https://github.com/tims/beam/tree/pubsub-watermark-example/pubsub-watermark
> [note: this is in a directory of a Beam fork for corp hoop jumping
> convenience on my end, it is not intended for merging].
> The behaviour is easily replicated if you apply a fixed window triggering
> after the watermark passes the end of the window.
> {code}
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
> .apply(ParDo.of(new ParseScoreEventFn()))
>
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.standardSeconds(60))
> .discardingFiredPanes())
> .apply(MapElements.into(kvs(strings(), integers()))
> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(),
> scoreEvent.getScore())))
> .apply(Count.perKey())
> .apply(ParDo.of(Log.of("counted per key")));
> {code}
> With this triggering, using both the flink local runner the direct runner,
> panes will be fired after a long delay (minutes) for low frequencies of
> messages in pubsub (seconds). The biggest issue is that it seems no panes
> will ever be emitted if you just send a few events and stop. This is
> particularly likely trip up people new to Beam.
> If I change the triggering to have early firings I get exactly the emitted
> panes that you would expect.
> {code}
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
> .alignedTo(Duration.standardSeconds(60))))
> .withAllowedLateness(Duration.standardSeconds(60))
> .discardingFiredPanes())
> {code}
> I can use any variation of early firing triggers and they work as expected.
> We believe that the watermark is not advancing when the volume is too low
> because of the sampling that PubSubIO does to determine it's watermark. It
> just never has a large enough sample.
> This problem occurs in the direct runner and flink runner, but not in the
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow has
> access to internal details of pubsub and so doesn't need to do any sampling).
> For extra context from the user@ list:
> *Kenneth Knowles:*
> Thanks to your info, I think it is the configuration of MovingFunction [1]
> that is the likely culprit, but I don't totally understand why. It is
> configured like so:
> - store 60 seconds of data
> - update data every 5 seconds
> - require at least 10 messages to be 'significant'
> - require messages from at least 2 distinct 5 second update periods to
> 'significant'
> I would expect a rate of 1 message per second to satisfy this. I may have
> read something wrong.
> Have you filed an issue in Jira [2]?
> Kenn
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
> *Alexey Romanenko:*
> Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
--
This message was sent by Atlassian Jira
(v8.20.1#820001)