[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592595#comment-16592595
 ] 

Stephan Ewen commented on FLINK-10195:
--------------------------------------

Thanks for the input. This seems to be a fundamental issue in the way that 
RabbitMQ consumer works, that the connection thread "pushes" messages 
unconditionally, rather than letting thread that processes them "pull" the 
messages, or at least having a flow control mechanism.

I don't have a good suggestion here. Your approach seem to be one possible 
option. Another one would be to embed this in the RabbitMQ client: Have an 
ArrayBlockingQueue. Offer records, and turn off the consumer when the queue is 
full, turn it on again once it is half empty. Similar to your solution, but the 
connection thread does that itself, which is probably more robust.

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -----------------------------------------------------------------
>
>                 Key: FLINK-10195
>                 URL: https://issues.apache.org/jira/browse/FLINK-10195
>             Project: Flink
>          Issue Type: Bug
>          Components: RabbitMQ Connector
>    Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>            Reporter: Luka Jurukovski
>            Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to