[ 
https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367087#comment-17367087
 ] 

Michał Ciesielczyk commented on FLINK-20244:
--------------------------------------------

Hi [[email protected]] , we've recently run into the same issues as 
well. And I agree with most of the changes you proposed. So there are actually 
three issues described here:
1) Lost ACKs for redelivered messages (only applicable when autoAck is 
disabled). This is critical. When used with a prefetch count in the consumer it 
may even lead to stop the source to consume any more messages. I think the 
proposed solution here is correct. We event tested it internally and it worked 
fine.
2) Connection leak. This is a bug as well, in case of failure the connection is 
not closed properly, and will hand until the Flink TaskManager is either 
stopped or crashes. Solution is simple (as suggested by the Reporter), and it 
should be possible to add a test case for that as well.
3) Improving ACK handling by ACKing multiple messages at once. This will 
improve the performance slightly, but can be done only when the exactly-once 
mode is used (see more details on the requirements here: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/rabbitmq/#rabbitmq-source]
 ). Otherwise, it may lead to ACKing messages that were not yet sent (which 
means: data loss).

As we've already managed to solve the issue internally with a custom connector, 
I think I would be able to help with pushing the fixes back to Flink. Question 
is, should this ticked be divided into 3 smaller ones (subtasks? related 
tasks?), to have 3 separate PR, or can we it all under a single PR? Maybe 
[~austince] would have any recommendations on how to proceed here?

> RMQSource does not ACK duplicated messages
> ------------------------------------------
>
>                 Key: FLINK-20244
>                 URL: https://issues.apache.org/jira/browse/FLINK-20244
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors/ RabbitMQ
>    Affects Versions: 1.11.2, 1.12.0
>            Reporter: Thomas Eckestad
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> *Background*
> The following has been observed when using RMQSource with 
> exactly-once-guarantees.
> When a job is restarted from a checkpoint, and there were unacked messages 
> referenced by that checkpoint at time of the restart, those messages will be 
> requeued by RMQ and resent to the restarted job. But they will not be acked. 
> Later when the connection to RMQ is closed (the job either finishes or is 
> restarted) they will be requeued again.
> When looking at the source code, messages are ACK:ed by the RMQSource after a 
> checkpoint is complete 
> (MessageAcknowledgingSourceBase::notifyCheckpointComplete).
> Also, when looking at the source code in RMQSource::setMessageIdentifier() 
> (on the master branch, the ACK semantics does not seem to have changed since 
> 1.11.2) it is clear that if a RMQ message carries a correlation ID which has 
> already been handled, that message is skipped and not further processed. It 
> is also clear that skipped messages are not added to the sessionIds-list of 
> messages that are targeted for ACK to RMQ.
> I believe all successfully consumed RMQ messages should be ACK:ed, it is 
> irrelevant if the message is ignored or processed by Flink. RMQ needs to know 
> that the consumer considers the message as handled OK.
> The following code is from RMQSource::setMessageIdentifier(). Note the return 
> before sessionIds.add():
>  .
>  .
>  .
>  if (!addId(correlationId))
> { // we have already processed this message return false; }
> }
>  sessionIds.add(deliveryTag);
>  .
>  .
>  .
> Directly related to the above I also noticed that RMQ connections are leaked 
> at internal job restart. From the Flink log (this stack trace is from 1.11.2):
> 2020-11-18 10:08:25,118 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during 
> disposal of stream operator.
>  com.rabbitmq.client.AlreadyClosedException: connection is already closed due 
> to connection error; protocol method: 
> #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - 
> Closed via management plugin, class-id=0, method-id=0)
>  at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> AlreadyClosedException is not handled by the RMQSource::close(). This results 
> in a RMQ connection thread somewhere being left behind. I triggered three 
> restarts of the job in a row and noticed one new connection added to the pile 
> of connections for each restart. I triggered the restart by killing the 
> active connection to RMQ using the RMQ admin GUI (management plugin, see 
> above exception details).
> I also tried to kill one of the leaked connections. But a new one is 
> instantly created when doing so. The traceback when doing this (1.11.2):
> 2020-11-18 10:27:51,715 ERROR 
> com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected 
> connection driver error occured
>  java.lang.NoClassDefFoundError: 
> com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
>  at 
> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at java.lang.Thread.run(Unknown Source) [?:?]
> These problems are most probably present in more versions than 1.11.2 and 
> 1.12.0.
> *Proposed fix (not verified)*
> The fix to ACK all messages should be simple. Just move 
> sessionIds.add(deliveryTag) up a few rows before the test that checks if 
> correlation ID:s are used or not in RMQSource::setMessageIdentifier().
> The fix for close() and the stale RMQ connections would be to ignore and log 
> any exceptions thrown by channel.basicCancel(), channel.close() and 
> connection.close(). So that failing to call channel.basicCancel() does not 
> inhibit channel.close() or connection.close() from being called. Or maybe 
> just call connection.close() which should close the channel as well.
> Another minor thing that could be improved, while working on this, is to 
> leverage the support to ack multiple messages at once, by doing 
> channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a 
> message and deliveryTags are monotonically increasing that should be doable, 
> or?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to