Daniel Robert created BEAM-8347:
-----------------------------------

             Summary: UnboundedRabbitMqReader can fail to advance watermark if 
no new data comes in
                 Key: BEAM-8347
                 URL: https://issues.apache.org/jira/browse/BEAM-8347
             Project: Beam
          Issue Type: Bug
          Components: io-java-rabbitmq
    Affects Versions: 2.15.0
         Environment: testing has been done using the DirectRunner. I also have 
DataflowRunner available
            Reporter: Daniel Robert


I stumbled upon this and then saw a similar StackOverflow post: 
[https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]

When calling `advance()` if there are no messages, no state changes, including 
no changes to the CheckpointMark or Watermark.  If there is a relatively 
constant rate of new messages coming in, this is not a problem. If data is 
bursty, and there are periods of new new messages coming in, the watermark will 
never advance.

Contrast this with some of the logic in PubsubIO which will make provisions for 
periods of inactivity to advance the watermark (although it, too, is imperfect: 
https://issues.apache.org/jira/browse/BEAM-7322 )

The example given in the StackOverflow post is something like this:

 
{code:java}
pipeline
  .apply(RabbitMqIO.read()
  .withUri("amqp://guest:guest@localhost:5672")
  .withQueue("test")
  .apply("Windowing", 
    Window.<RabbitMqMessage>into(
      FixedWindows.of(Duration.standardSeconds(10)))
    .triggering(AfterWatermark.pastEndOfWindow())
    .withAllowedLateness(Duration.ZERO)
    .accumulatingFiredPanes()){code}
If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a 
window that never performs and on time trigger. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to