Thomas Eckestad created FLINK-20244:
---------------------------------------
Summary: RMQSource does not ACK duplicated messages
Key: FLINK-20244
URL: https://issues.apache.org/jira/browse/FLINK-20244
Project: Flink
Issue Type: Bug
Components: Connectors/ RabbitMQ
Affects Versions: 1.11.2, 1.12.0
Reporter: Thomas Eckestad
*Background*
The following has been observed when using RMQSource with
exactly-once-guarantees.
When a job is restarted from a checkpoint, and there were unacked messages
referenced by that checkpoint at time of the restart, those messages will be
requeued by RMQ and resent to the restarted job. But they will not be acked.
Later when the connection to RMQ is closed, the job either finishes or is
restarted, they will be requeued again.
When looking at the source code, messages are ACK:ed by the RMQSource after a
checkpoint is complete
(MessageAcknowledgingSourceBase::notifyCheckpointComplete).
Also, when looking at the source code in RMQSource::setMessageIdentifier() (on
the master branch, the ACK semantics does not seem to have changed since
1.11.2) it is clear that if a RMQ message carries a correlation ID which has
already been handled, that message is skipped and not further processed. It is
also clear that skipped messages are not added to the sessionIds-list of
messages that are targeted for ACK to RMQ.
I believe all successfully consumed RMQ messages should be ACK:ed, it is
irrelevant if the message is ignored or processed by Flink. RMQ needs to know
that the consumer considers the message as handled OK.
The following code is from RMQSource::setMessageIdentifier(). Note the return
before sessionIds.add():
.
.
.
if (!addId(correlationId)) {
// we have already processed this message
return false;
}
}
sessionIds.add(deliveryTag);
.
.
.
Directly related to the above I also notice that RMQ connections are leaked at
internal job restart. From the Flink log (this stack trace is from 1.11.2):
2020-11-18 10:08:25,118 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during disposal
of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to
connection error; protocol method: #method<connection.close>(reply-code=320,
reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0,
method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
AlreadyClosedException is not handled by the RMQSource::close(). This results
in a RMQ connection thread somewhere being left behind. I triggered three
restarts of the job in a row and noticed one new connection added to the pile
of connections for each restart. I triggered the restart by killing the active
connection to RMQ using the RMQ admin GUI (management plugin, see above
exception details).
I also tried to kill one of the leaked connections. But a new one is instantly
created when doing so. The traceback when doing this (1.11.2):
2020-11-18 10:27:51,715 ERROR
com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected
connection driver error occured
java.lang.NoClassDefFoundError:
com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
at
com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at
com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at
com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623)
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at java.lang.Thread.run(Unknown Source) [?:?]
This problem is most probably present in more versions than 1.11.2 and 1.12.0.
*Proposed fix (not verified)*
The fix to ACK all messages should be simple. Just move
sessionIds.add(deliveryTag) up a few rows before the test that checks if
correlation ID:s are used or not in RMQSource::setMessageIdentifier().
The fix for close() and the stale RMQ connections would be to ignore and log
any exceptions thrown by channel.basicCancel(), channel.close() and
connection.close(). So that failing to call channel.basicCancel() does not
inhibit channel.close() or connection.close() from being called. Or maybe just
call connection.close() which should close the channel as well.
Another minor thing that could be improved, while working on this, is to
leverage the support to ack multiple messages at once, by doing
channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a
message and deliveryTags are monotonically increasing that should be doable, or?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)