[ 
https://issues.apache.org/jira/browse/BEAM-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16953004#comment-16953004
 ] 

Daniel Robert edited comment on BEAM-8347 at 10/16/19 4:58 PM:
---------------------------------------------------------------

Alright, I've been playing around with this a lot locally (even rewriting the 
rabbitio unbounded source/reader) and I either don't understand the expected 
use case or the design seems incorrect.

 

Notable concerns:

Splits:

In `Source.split()`, the design of the reader binds to a single queue. As such, 
there's no concurrency or other advantage in parallelizing reads; it might as 
well return `Collections.singletonList(this)` regardless of inputs. 
Additionally, using a single reader decreases the likelihood of processing or 
acking messages out of queue order.

Watermarks:

Most of the issues are here. Looking at the documentation of 
{{UnboundedSource.getWatermark}}:

 
{quote}[watermark] can be approximate. If records are read that violate this 
guarantee, they will be considered late, which will affect how they will be 
processed. ...

However, this value should be _as late as possible_. Downstream windows may not 
be able to close until this watermark passes their end.

For example, a source may know that the records it reads will be in timestamp 
order. In this case, the watermark can be the timestamp of the last record 
read. For a source that does not have natural timestamps, timestamps can be set 
to the time of reading, in which case the watermark is the current clock time.
{quote}
The current implementation retains the *oldest* timestamp. Further, the 
timestamp comes from delivery properties, which, if we run single threaded 
consuming above, should be monotonically increasing. A switch to purely using 
the most recent timestamp rather than the oldest would go a long way to 
ensuring windows advance.

There is precedent for this in some of the other unbounded sources, such as the 
Kafka ProcessingTimePolicy 
([https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L112)]

Watermarks with no new input:

In the event where there are no new messages, it would be ok to advance the 
watermark as well. The approach taken in the kafka io when the stream is 
'caught up': 
[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L169]
 .

It might be useful to set the watermark to {{NOW - 2 seconds}} in the event 
there are no messages available (i.e. here: 
[https://github.com/apache/beam/blob/master/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L467]
 ) in the event 'two seconds ago' is more recent than the current watermark.

 

(There's an unrelated bug I've discovered as well where start() 
[https://github.com/apache/beam/blob/master/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L432]
 creates and uses a new ConnectionHandler even though one is already created at 
construction time, and thus the one constructed in start() is never properly 
stopped)

—

Curious if I'm way off here, or if it'd be worth refactoring the rabbitio 
reader to support these changes. Alternatively, if it would make sense to 
produce a separate rabbit reader for 'single queue, processing time 
watermarking' use cases.


was (Author: drobert):
Alright, I've been playing around with this a lot locally (even rewriting the 
rabbitio unbounded source/reader) and I either don't understand the expected 
use case or the design seems incorrect.

 

Notable concerns:

Splits:

In \{{Source.split()}, the design of the reader binds to a single queue. As 
such, there's no concurrency or other advantage in parallelizing reads; it 
might as well return `Collections.singletonList(this)` regardless of inputs. 
Additionally, using a single reader decreases the likelihood of processing or 
acking messages out of queue order.

Watermarks:

Most of the issues are here. Looking at the documentation of 
{{UnboundedSource.getWatermark}}:

 
{quote}[watermark] can be approximate. If records are read that violate this 
guarantee, they will be considered late, which will affect how they will be 
processed. ...

However, this value should be _as late as possible_. Downstream windows may not 
be able to close until this watermark passes their end.

For example, a source may know that the records it reads will be in timestamp 
order. In this case, the watermark can be the timestamp of the last record 
read. For a source that does not have natural timestamps, timestamps can be set 
to the time of reading, in which case the watermark is the current clock time.
{quote}
The current implementation retains the *oldest* timestamp. Further, the 
timestamp comes from delivery properties, which, if we run single threaded 
consuming above, should be monotonically increasing. A switch to purely using 
the most recent timestamp rather than the oldest would go a long way to 
ensuring windows advance.

There is precedent for this in some of the other unbounded sources, such as the 
Kafka ProcessingTimePolicy 
([https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L112)]

Watermarks with no new input:

In the event where there are no new messages, it would be ok to advance the 
watermark as well. The approach taken in the kafka io when the stream is 
'caught up': 
[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L169]
 .

It might be useful to set the watermark to {{NOW - 2 seconds}} in the event 
there are no messages available (i.e. here: 
[https://github.com/apache/beam/blob/master/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L467]
 ) in the event 'two seconds ago' is more recent than the current watermark.

 

(There's an unrelated bug I've discovered as well where start() 
[https://github.com/apache/beam/blob/master/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L432]
 creates and uses a new ConnectionHandler even though one is already created at 
construction time, and thus the one constructed in start() is never properly 
stopped)

---

Curious if I'm way off here, or if it'd be worth refactoring the rabbitio 
reader to support these changes. Alternatively, if it would make sense to 
produce a separate rabbit reader for 'single queue, processing time 
watermarking' use cases.

> UnboundedRabbitMqReader can fail to advance watermark if no new data comes in
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-8347
>                 URL: https://issues.apache.org/jira/browse/BEAM-8347
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-rabbitmq
>    Affects Versions: 2.15.0
>         Environment: testing has been done using the DirectRunner. I also 
> have DataflowRunner available
>            Reporter: Daniel Robert
>            Priority: Major
>
> I stumbled upon this and then saw a similar StackOverflow post: 
> [https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]
> When calling `advance()` if there are no messages, no state changes, 
> including no changes to the CheckpointMark or Watermark.  If there is a 
> relatively constant rate of new messages coming in, this is not a problem. If 
> data is bursty, and there are periods of no new messages coming in, the 
> watermark will never advance.
> Contrast this with some of the logic in PubsubIO which will make provisions 
> for periods of inactivity to advance the watermark (although it, too, is 
> imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )
> The example given in the StackOverflow post is something like this:
>  
> {code:java}
> pipeline
>   .apply(RabbitMqIO.read()
>   .withUri("amqp://guest:guest@localhost:5672")
>   .withQueue("test")
>   .apply("Windowing", 
>     Window.<RabbitMqMessage>into(
>       FixedWindows.of(Duration.standardSeconds(10)))
>     .triggering(AfterWatermark.pastEndOfWindow())
>     .withAllowedLateness(Duration.ZERO)
>     .accumulatingFiredPanes()){code}
> If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a 
> window that never performs an on time trigger.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to