[
https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371939#comment-17371939
]
Michał Ciesielczyk commented on FLINK-20244:
--------------------------------------------
Yes, I think we can close this one
> RMQSource does not ACK duplicated messages
> ------------------------------------------
>
> Key: FLINK-20244
> URL: https://issues.apache.org/jira/browse/FLINK-20244
> Project: Flink
> Issue Type: Bug
> Components: Connectors/ RabbitMQ
> Affects Versions: 1.11.2, 1.12.0
> Reporter: Thomas Eckestad
> Priority: Minor
> Labels: auto-deprioritized-major
>
> *Background*
> The following has been observed when using RMQSource with
> exactly-once-guarantees.
> When a job is restarted from a checkpoint, and there were unacked messages
> referenced by that checkpoint at time of the restart, those messages will be
> requeued by RMQ and resent to the restarted job. But they will not be acked.
> Later when the connection to RMQ is closed (the job either finishes or is
> restarted) they will be requeued again.
> When looking at the source code, messages are ACK:ed by the RMQSource after a
> checkpoint is complete
> (MessageAcknowledgingSourceBase::notifyCheckpointComplete).
> Also, when looking at the source code in RMQSource::setMessageIdentifier()
> (on the master branch, the ACK semantics does not seem to have changed since
> 1.11.2) it is clear that if a RMQ message carries a correlation ID which has
> already been handled, that message is skipped and not further processed. It
> is also clear that skipped messages are not added to the sessionIds-list of
> messages that are targeted for ACK to RMQ.
> I believe all successfully consumed RMQ messages should be ACK:ed, it is
> irrelevant if the message is ignored or processed by Flink. RMQ needs to know
> that the consumer considers the message as handled OK.
> The following code is from RMQSource::setMessageIdentifier(). Note the return
> before sessionIds.add():
> .
> .
> .
> if (!addId(correlationId))
> { // we have already processed this message return false; }
> }
> sessionIds.add(deliveryTag);
> .
> .
> .
> Directly related to the above I also noticed that RMQ connections are leaked
> at internal job restart. From the Flink log (this stack trace is from 1.11.2):
> 2020-11-18 10:08:25,118 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during
> disposal of stream operator.
> com.rabbitmq.client.AlreadyClosedException: connection is already closed due
> to connection error; protocol method:
> #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED -
> Closed via management plugin, class-id=0, method-id=0)
> at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228)
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303)
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294)
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
>
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
>
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> AlreadyClosedException is not handled by the RMQSource::close(). This results
> in a RMQ connection thread somewhere being left behind. I triggered three
> restarts of the job in a row and noticed one new connection added to the pile
> of connections for each restart. I triggered the restart by killing the
> active connection to RMQ using the RMQ admin GUI (management plugin, see
> above exception details).
> I also tried to kill one of the leaked connections. But a new one is
> instantly created when doing so. The traceback when doing this (1.11.2):
> 2020-11-18 10:27:51,715 ERROR
> com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected
> connection driver error occured
> java.lang.NoClassDefFoundError:
> com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
> at
> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800)
>
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753)
>
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237)
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
>
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623)
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> These problems are most probably present in more versions than 1.11.2 and
> 1.12.0.
> *Proposed fix (not verified)*
> The fix to ACK all messages should be simple. Just move
> sessionIds.add(deliveryTag) up a few rows before the test that checks if
> correlation ID:s are used or not in RMQSource::setMessageIdentifier().
> The fix for close() and the stale RMQ connections would be to ignore and log
> any exceptions thrown by channel.basicCancel(), channel.close() and
> connection.close(). So that failing to call channel.basicCancel() does not
> inhibit channel.close() or connection.close() from being called. Or maybe
> just call connection.close() which should close the channel as well.
> Another minor thing that could be improved, while working on this, is to
> leverage the support to ack multiple messages at once, by doing
> channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a
> message and deliveryTags are monotonically increasing that should be doable,
> or?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)