[
https://issues.apache.org/jira/browse/BEAM-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anonymous updated BEAM-8347:
----------------------------
Status: Triage Needed (was: Resolved)
> UnboundedRabbitMqReader can fail to advance watermark if no new data comes in
> -----------------------------------------------------------------------------
>
> Key: BEAM-8347
> URL: https://issues.apache.org/jira/browse/BEAM-8347
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Affects Versions: 2.15.0
> Environment: testing has been done using the DirectRunner. I also
> have DataflowRunner available
> Reporter: Daniel Robert
> Assignee: Jean-Baptiste Onofré
> Priority: P2
> Fix For: 2.18.0
>
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> I stumbled upon this and then saw a similar StackOverflow post:
> [https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]
> When calling `advance()` if there are no messages, no state changes,
> including no changes to the CheckpointMark or Watermark. If there is a
> relatively constant rate of new messages coming in, this is not a problem. If
> data is bursty, and there are periods of no new messages coming in, the
> watermark will never advance.
> Contrast this with some of the logic in PubsubIO which will make provisions
> for periods of inactivity to advance the watermark (although it, too, is
> imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )
> The example given in the StackOverflow post is something like this:
>
> {code:java}
> pipeline
> .apply(RabbitMqIO.read()
> .withUri("amqp://guest:guest@localhost:5672")
> .withQueue("test")
> .apply("Windowing",
> Window.<RabbitMqMessage>into(
> FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.ZERO)
> .accumulatingFiredPanes()){code}
> If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a
> window that never performs an on time trigger.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)