[ 
https://issues.apache.org/jira/browse/CAMEL-14789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Szczesiak updated CAMEL-14789:
-------------------------------------
    Description: 
When Remote Procedure Call (RPC) communication pattern is used RabbitMQ Camel 
component creates server-named auto-deleted temporary reply queue. Next, the 
queue is bound to the exchange with routing key equal to queue name. Likewise, 
ReplyManager's replyTo field is set with the value of the queue name. Reply 
queue created in this manner is then reused for subsequent RPC requests and 
message property rabbitmq.REPLY_TO is copied from the value stored previously 
by ReplyManager.

When some network error suddenly appears causing connection failure, RabbitMQ's 
automatic recovery kicks in and tries to recover affected entities (assuming 
the connection was created with automatic recovery enabled, which is default.). 
As temporary quueues are auto-deleted, during recovery process a new temporary 
queue is created which has a new name that differs from the original one and 
here is where the problem begins.

Creation of the new temporary queue is NOT detected by ReplyManager and 
therefore replyTo property is NOT updated. Also, routing key no longer matches 
queue name. This causes a problem when some implementations of RabbitMQ client, 
like. Spring AMQP, are used server-site. RPC service receives our request, 
processes it, and replies to default exchange with routing key equal to 
rabbitmq.REPLY_TO sent in our request. RPC service provider perceives no 
problem as response is sent successfully but RabbitMQ Camel Component keeps 
awaiting for the response to arrive to the original temporary queue which no 
longer exists due to connection failure and recovery. Eventually, Camel throws 
ExchangeTimedOutException.

Example:
 # After automatic recovery, a RPC request is sent to example-exchange with 
rabitmq.REPLY_TO=amq.gen-0lLvpnj4ZMlkhxZIcCPVpA
 # After 20 seconds org.apache.camel.ExchangeTimedOutException is thrown:

{code:java}
The OUT message was not received within: 20000 millis due reply message with 
correlationID: Camel-ID-hostname-1583401848872-0-1690397 not received on 
destination: amq.gen-0lLvpnj4ZMlkhxZIcCPVpA. 
Exchange[ID-hostname-1583401848872-0-1690391]{code}
 # Using RabbitMQ Management WebApp (rabbitmqctl) we check example-exchange's 
bindings and see that routing key amq.gen-0lLvpnj4ZMlkhxZIcCPVpA now 
corresponds to queue amq.gen-zaRCP-p-JbXeSzJmzSp83g

*Proposed solution* is to add QueueRecoveryListener to notify when temporary 
queue name changes due to recovery. On event, replyTo field will be updated 
with the new temporary queue name and the queue will be rebound to the exchange 
so that routing key matches queue name again. The change will be made to 
org.apache.camel.component.rabbitmq.reply#createListenerContainer.

Attached patch also contains integration test.

  was:
When Remote Procedure Call (RPC) communication pattern is used RabbitMQ Camel 
component creates server-named auto-deleted temporary reply queue. Next, the 
queue is bound to the exchange with routing key equal to queue name. Likewise, 
ReplyManager's replyTo field is set with the value of the queue name. Reply 
queue created in this manner is then reused for subsequent RPC requests and 
message property rabbitmq.REPLY_TO is copied from the value stored previously 
by ReplyManager.

When some network error suddenly appears causing connection failure, RabbitMQ's 
automatic recovery kicks in and tries to recover affected entities (assuming 
the connection was created with automatic recovery enabled, which is default.). 
As temporary quueues are auto-deleted, during recovery process a new temporary 
queue is created which has a new name that differs from the original one and 
here is where the problem begins.

Creation of the new temporary queue is NOT detected by ReplyManager and 
therefore replyTo property is NOT updated. Also, routing key no longer matches 
queue name. This causes a problem when some implementations of RabbitMQ client, 
like. Spring AMQP, are used server-site. RPC service receives our request, 
processes it, and replies to default exchange with routing key equal to 
rabbitmq.REPLY_TO sent in our request. RPC service provider perceives no 
problem as response is sent successfully but RabbitMQ Camel Component keeps 
awaiting for the response to arrive to the original temporary queue which no 
longer exists due to connection failure and recovery. Eventually, Camel throws 
ExchangeTimedOutException.

Example:
 # After automatic recovery, a RPC request is sent to example-exchange with 
rabitmq.REPLY_TO=amq.gen-0lLvpnj4ZMlkhxZIcCPVpA
 # After 20 seconds org.apache.camel.ExchangeTimedOutException is thrown:

{code:java}
The OUT message was not received within: 20000 millis due reply message with 
correlationID: 
Camel-ID-hostname-y579vjqenbwmcd1bpqmd5phtd-1583401848872-0-1690397 not 
received on destination: amq.gen-0lLvpnj4ZMlkhxZIcCPVpA. 
Exchange[ID-hostname-y579vjqenbwmcd1bpqmd5phtd-1583401848872-0-1690391]{code}
 # Using RabbitMQ Management WebApp (rabbitmqctl) we check example-exchange's 
bindings and see that routing key amq.gen-0lLvpnj4ZMlkhxZIcCPVpA now 
corresponds to queue amq.gen-zaRCP-p-JbXeSzJmzSp83g

*Proposed solution* is to add QueueRecoveryListener to notify when temporary 
queue name changes due to recovery. On event, replyTo field will be updated 
with the new temporary queue name and the queue will be rebound to the exchange 
so that routing key matches queue name again. The change will be made to 
org.apache.camel.component.rabbitmq.reply#createListenerContainer.

Attached patch also contains integration test.


> Automatic recovery of temporary reply queue is not handled correctly
> --------------------------------------------------------------------
>
>                 Key: CAMEL-14789
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14789
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-rabbitmq
>    Affects Versions: 2.25.0
>         Environment: * RabbitMQ server 3.8.3
>  * Erlang 22.3
>            Reporter: Robert Szczesiak
>            Priority: Major
>             Fix For: 2.25.1
>
>         Attachments: 
> 0001-CAMEL-14789-FIX-Automatic-recovery-of-temporary-repl.patch
>
>
> When Remote Procedure Call (RPC) communication pattern is used RabbitMQ Camel 
> component creates server-named auto-deleted temporary reply queue. Next, the 
> queue is bound to the exchange with routing key equal to queue name. 
> Likewise, ReplyManager's replyTo field is set with the value of the queue 
> name. Reply queue created in this manner is then reused for subsequent RPC 
> requests and message property rabbitmq.REPLY_TO is copied from the value 
> stored previously by ReplyManager.
> When some network error suddenly appears causing connection failure, 
> RabbitMQ's automatic recovery kicks in and tries to recover affected entities 
> (assuming the connection was created with automatic recovery enabled, which 
> is default.). As temporary quueues are auto-deleted, during recovery process 
> a new temporary queue is created which has a new name that differs from the 
> original one and here is where the problem begins.
> Creation of the new temporary queue is NOT detected by ReplyManager and 
> therefore replyTo property is NOT updated. Also, routing key no longer 
> matches queue name. This causes a problem when some implementations of 
> RabbitMQ client, like. Spring AMQP, are used server-site. RPC service 
> receives our request, processes it, and replies to default exchange with 
> routing key equal to rabbitmq.REPLY_TO sent in our request. RPC service 
> provider perceives no problem as response is sent successfully but RabbitMQ 
> Camel Component keeps awaiting for the response to arrive to the original 
> temporary queue which no longer exists due to connection failure and 
> recovery. Eventually, Camel throws ExchangeTimedOutException.
> Example:
>  # After automatic recovery, a RPC request is sent to example-exchange with 
> rabitmq.REPLY_TO=amq.gen-0lLvpnj4ZMlkhxZIcCPVpA
>  # After 20 seconds org.apache.camel.ExchangeTimedOutException is thrown:
> {code:java}
> The OUT message was not received within: 20000 millis due reply message with 
> correlationID: Camel-ID-hostname-1583401848872-0-1690397 not received on 
> destination: amq.gen-0lLvpnj4ZMlkhxZIcCPVpA. 
> Exchange[ID-hostname-1583401848872-0-1690391]{code}
>  # Using RabbitMQ Management WebApp (rabbitmqctl) we check example-exchange's 
> bindings and see that routing key amq.gen-0lLvpnj4ZMlkhxZIcCPVpA now 
> corresponds to queue amq.gen-zaRCP-p-JbXeSzJmzSp83g
> *Proposed solution* is to add QueueRecoveryListener to notify when temporary 
> queue name changes due to recovery. On event, replyTo field will be updated 
> with the new temporary queue name and the queue will be rebound to the 
> exchange so that routing key matches queue name again. The change will be 
> made to org.apache.camel.component.rabbitmq.reply#createListenerContainer.
> Attached patch also contains integration test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to