[
https://issues.apache.org/jira/browse/BEAM-8347?focusedWorklogId=337310&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337310
]
ASF GitHub Bot logged work on BEAM-8347:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Nov/19 13:46
Start Date: 01/Nov/19 13:46
Worklog Time Spent: 10m
Work Description: drobert commented on pull request #9820: [BEAM-8347]:
Consistently advance UnboundedRabbitMqReader watermark
URL: https://github.com/apache/beam/pull/9820#discussion_r341577585
##########
File path:
sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
##########
@@ -376,16 +389,11 @@ public void finalizeCheckpoint() throws IOException {
this.source = source;
this.current = null;
this.checkpointMark = checkpointMark != null ? checkpointMark : new
RabbitMQCheckpointMark();
- try {
- connectionHandler = new ConnectionHandler(source.spec.uri());
- } catch (Exception e) {
- throw new IOException(e);
- }
}
@Override
public Instant getWatermark() {
- return checkpointMark.oldestTimestamp;
+ return checkpointMark.latestTimestamp;
Review comment:
I'm not sure I follow.
> The goal is for the watermark to be a good prediction of the fact "no
future records will have timestamps before the watermark's timestamp".
In this implementation, the timestamp is effectively processing time. The
timestamp is the delivery timestamp (which is how this was implemented even
before this PR), which I believe is when the consumer picked up the message,
not when it was enqueued. The queue is FIFO (we should probably add explicit
documentation that priority queues are unsupported).
As noted in the PR description, the AmqpIO and certain Kafka timestamp
watermarking implementations directly use 'now', so there's prior art here.
> when there's not a lot of messages (but in that case, we can imagine to
update the watermark before the finalize checkpoint
This use case is what prompted this PR originally.
> when we use an existing exchange (instead of creating one)
I don't see how this is relevant. Messages are pulled from a single queue,
regardless of how the exchange was declared.
> As it depends of the use case
I'm having a hard time seeing how the use case matters here. The only
available timestamp is delivery time, which should be monotonically increasing
as we're using a pull-based API against a FIFO queue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 337310)
Time Spent: 1h 20m (was: 1h 10m)
> UnboundedRabbitMqReader can fail to advance watermark if no new data comes in
> -----------------------------------------------------------------------------
>
> Key: BEAM-8347
> URL: https://issues.apache.org/jira/browse/BEAM-8347
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Affects Versions: 2.15.0
> Environment: testing has been done using the DirectRunner. I also
> have DataflowRunner available
> Reporter: Daniel Robert
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> I stumbled upon this and then saw a similar StackOverflow post:
> [https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]
> When calling `advance()` if there are no messages, no state changes,
> including no changes to the CheckpointMark or Watermark. If there is a
> relatively constant rate of new messages coming in, this is not a problem. If
> data is bursty, and there are periods of no new messages coming in, the
> watermark will never advance.
> Contrast this with some of the logic in PubsubIO which will make provisions
> for periods of inactivity to advance the watermark (although it, too, is
> imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )
> The example given in the StackOverflow post is something like this:
>
> {code:java}
> pipeline
> .apply(RabbitMqIO.read()
> .withUri("amqp://guest:guest@localhost:5672")
> .withQueue("test")
> .apply("Windowing",
> Window.<RabbitMqMessage>into(
> FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.ZERO)
> .accumulatingFiredPanes()){code}
> If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a
> window that never performs an on time trigger.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)