[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125225#comment-17125225
 ] 

Luka Jurukovski commented on FLINK-10195:
-----------------------------------------

[~austince]

Your example  did inform me to a feature I was unaware of in the AMQP protocol 
([basic.qos global flag|https://www.rabbitmq.com/consumer-prefetch.html]) that 
I wish I had known about at the time I was writing the ticket. Since each 
parallel consumer creates its own channel (from what I remember) I this would 
be set to work per channel.

That being said while this does solve the OOM issue, but it does come at a 
potential cost of performance on high volume queues. Correctly me if I am wrong 
here, but I don't think this design fixes If there are infrequent checkpoints. 
Messages will not be sent by Rabbitmq even if they are dequeued from the 
client's buffer, until the next checkpoint occurs and the message are acked. So 
for example if a checkpoint occurs every 10 seconds, and it takes 1 seconds for 
the job to process the configured qos limit message, the other 9 seconds are 
spent doing nothing. One would have to balance checkpoint interval, time to 
checkpoint, processing rate and memory in order to not waste cycles.

The only reason I have not contributed back the solution of doing a soft cancel 
on the connection (while maintaining the channel) is due to the fact that 
Rabbitmq's Java client doesn't (didn't?) have locking mechanisms on canceling, 
which resulted in no way of terminating the connection without causing an 
exception (as the auto recovery mechanism might be in play at the same time) 
that has no good ways of being handled.  My solution was sufficient for our 
internal use case, but not something that I think is acceptable for general 
solution.

I'll revisit this, as I have not looked at this since my part of the compay has 
moved away from Rabbitmq. In googling to refresh myself on some of the details 
for this response, I see that the landscape may have changed since I last took 
a look at this problem, or I may have overlooked some options.

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -----------------------------------------------------------------
>
>                 Key: FLINK-10195
>                 URL: https://issues.apache.org/jira/browse/FLINK-10195
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors/ RabbitMQ
>    Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>            Reporter: Luka Jurukovski
>            Assignee: Luka Jurukovski
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to