[
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349245#comment-17349245
]
Michał Ciesielczyk commented on FLINK-22698:
--------------------------------------------
Hi [~austince], we've encountered a similar issue after upgrading to Flink
1.12. Seems recent changes (introduced in 1.12) in the RMQSource caused this
issue to appear (so most likely 1.13 is affected as well). Actually, it happens
with a single queue as well - it's not possible to stop the job with a
savepoint when there is no data on the queue (the job ends up in some weird
state). You can still cancel it though (which seems to be a good workaround
here).
I've investigated the issue, and its caused by the way how message delivery is
implemented in the {color:#000000}QueueingConsumer (or rather on how it's
used). In the run function it is waiting indefinitely for the next message to
arrive (without checking if the source was cancelled):
{color}
{color:#000000}[https://github.com/apache/flink/blob/0a7ec4fe3f11dbcc1f56685c2211b60d8f496b2d/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L329]{color}
{color:#000000}So I believe it's not about inspecting the isEndOfStream method
as [~nicholasjiang] suggested.{color}
{color:#000000}This can be easily solved by adding a delivery timeout (setting
it to some low values such as 15s instead of infinity solves the issue).{color}
{color:#000000}We've already tested this out, and were able to reproduce it
with additional unit tests. I suggest to add a deliveryTimeout parameter to the
configuration (so by default we could leave the current behavior). If you think
it's a good approach I could create a PR solving the problem.{color}
> RabbitMQ source does not stop unless message arrives in queue
> -------------------------------------------------------------
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
> Issue Type: Bug
> Components: Connectors/ RabbitMQ
> Affects Versions: 1.12.0
> Reporter: Austin Cawley-Edwards
> Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request
> has unexpected behavior. Regular checkpoints and savepoints complete
> successfully, it is only the stop-with-savepoint request where this behavior
> is seen.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless
> a message arrives in all the queues that the job consumes from after the
> stop-with-savepoint request is made.
>
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the
> deserialization schema checks in its isEndOfStream method. This is cumbersome
> and makes it difficult to do stateful upgrades, as coordination with another
> system is now necessary.
>
>
> The TaskManager thread dump is attached.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)