[ 
https://issues.apache.org/jira/browse/BEAM-7322?focusedWorklogId=247954&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-247954
 ]

ASF GitHub Bot logged work on BEAM-7322:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/19 09:37
            Start Date: 24/May/19 09:37
    Worklog Time Spent: 10m 
      Work Description: tims commented on pull request #8598: [BEAM-7322] Add 
threshold to PubSub unbounded source
URL: https://github.com/apache/beam/pull/8598#discussion_r287287094
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 ##########
 @@ -973,6 +980,24 @@ public Instant getWatermark() {
       return new Instant(lastWatermarkMsSinceEpoch);
     }
 
+    /**
+     * In case of streams with low traffic, {@link MovingFunction} could never 
get enough samples in
+     * {@link PubsubUnboundedSource#SAMPLE_PERIOD} to move watermark. To 
prevent this situation, we
+     * need to check if watermark is stale (it was not updated during {@link
+     * PubsubUnboundedSource#UPDATE_THRESHOLD}) and force its update if it is.
+     *
+     * @param nowMsSinceEpoch - current timestamp
+     * @return should the watermark be updated
+     */
+    private boolean shouldUpdate(long nowMsSinceEpoch) {
+      boolean hasEnoughSamples =
+          minReadTimestampMsSinceEpoch.isSignificant()
+              || minUnreadTimestampMsSinceEpoch.isSignificant();
+      boolean isStale =
+          lastWatermarkMsSinceEpoch < (nowMsSinceEpoch - 
UPDATE_THRESHOLD.getMillis());
 
 Review comment:
   Oops... 
   I fixed it to use lastReceivedMsSinceEpoch. 
   
   I am still struggling to write a test for this, which would have caught 
this, I'm finding it difficult to understand the PubsubUnboundedSourceTest 
code, but plodding away. 
   
   I was trying to adapt the readManyMessages tests and ran into some issues 
with PubsubTestClient state not being cleared if a test fails to close the 
factory in the @after clause, which then breaks all other pubsub tests! This is 
because the PubsubTestClientFactory cannot be closed if it still has pending 
messages. Any test guidance appreciated. 
   
   Also, what do you think of the idea of making SAMPLE_PERIOD configurable? So 
that we could advance the watermark very optimistically if we wanted to (eg: 
when demonstrating and teaching Beam)? When done, this PR will ensure very low 
traffic subscriptions' watermarks advance, but it will still have a 2 minute 
delay. Could be a separate PR.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 247954)
    Time Spent: 1h 20m  (was: 1h 10m)

> PubSubIO watermark does not advance for very low volumes
> --------------------------------------------------------
>
>                 Key: BEAM-7322
>                 URL: https://issues.apache.org/jira/browse/BEAM-7322
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Tim Sell
>            Priority: Minor
>         Attachments: data.json
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I have identified an issue where the watermark does not advance when using 
> the beam PubSubIO when volumes are very low.
> I have created a mini example project to demonstrate the behaviour with a 
> python script for generating messages at different frequencies:
> https://github.com/tims/beam/tree/pubsub-watermark-example/pubsub-watermark 
> [note: this is in a directory of a Beam fork for corp hoop jumping 
> convenience on my end, it is not intended for merging].
> The behaviour is easily replicated if you apply a fixed window triggering 
> after the watermark passes the end of the window.
> {code}
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>     .apply(ParDo.of(new ParseScoreEventFn()))
>     
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>         .triggering(AfterWatermark.pastEndOfWindow())
>         .withAllowedLateness(Duration.standardSeconds(60))
>         .discardingFiredPanes())
>     .apply(MapElements.into(kvs(strings(), integers()))
>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
> scoreEvent.getScore())))
>     .apply(Count.perKey())
>     .apply(ParDo.of(Log.of("counted per key")));
> {code}
> With this triggering, using both the flink local runner the direct runner, 
> panes will be fired after a long delay (minutes) for low frequencies of 
> messages in pubsub (seconds). The biggest issue is that it seems no panes 
> will ever be emitted if you just send a few events and stop. This is 
> particularly likely trip up people new to Beam.
> If I change the triggering to have early firings I get exactly the emitted 
> panes that you would expect.
> {code}
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>     .triggering(AfterWatermark.pastEndOfWindow()
>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>             .alignedTo(Duration.standardSeconds(60))))
>     .withAllowedLateness(Duration.standardSeconds(60))
>     .discardingFiredPanes())
> {code}
> I can use any variation of early firing triggers and they work as expected.
> We believe that the watermark is not advancing when the volume is too low 
> because of the sampling that PubSubIO does to determine it's watermark. It 
> just never has a large enough sample. 
> This problem occurs in the direct runner and flink runner, but not in the 
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow has 
> access to internal details of pubsub and so doesn't need to do any sampling).
> For extra context from the user@ list:
> *Kenneth Knowles:*
> Thanks to your info, I think it is the configuration of MovingFunction [1] 
> that is the likely culprit, but I don't totally understand why. It is 
> configured like so:
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to 
> 'significant'
> I would expect a rate of 1 message per second to satisfy this. I may have 
> read something wrong.
> Have you filed an issue in Jira [2]?
> Kenn
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
> *Alexey Romanenko:*
> Not sure that this can be very helpful but I recall a similar issue with 
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to