[
https://issues.apache.org/jira/browse/CAMEL-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piero Cangianiello updated CAMEL-10229:
---------------------------------------
Description:
Run the following code and hit enter while one message is in unacked state (see
RabbitMQ console):
{code:java}
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
.delayer(5000)
.setHeader("rabbitmq.ROUTING_KEY",
constant("destinationQueue"))
.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
.routeId("myRoute");
}
});
context.start();
new BufferedReader(new InputStreamReader(System.in)).readLine();
context.stop();
}
{code}
you get the following exception:
{noformat}
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer
org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel
AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel
AMQChannel(amqp://[email protected]:5672/,1):
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to
clean channel shutdown; protocol method: #method<channel.close>(reply-code=200,
reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
at
org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
at
com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at
com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}
I think that this is caused by a race condition between the main thread that
runs channel.close(); immediately after channel.basicCancel(tag); (see
org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the
channel.basicAck(deliveryTag, false); in handleDelivery().
Another bad side effect is that you'll find a duplicate of a message on the
destinationQueue. For example if you have 10 initial messages in sourceQueue
and you hit enter while it's processing the third one, you'll get 7 messages in
sourceQueue and 4 messages in destinationQueue.
The correct behaviour should be the following:
1) Stop consumer: channel.basicCancel(tag)
2) Wait if there is a running consumer
3) The consumer acks the previous message
4) Close the channel
was:
Run the following code and hit enter while one message is in unacked state (see
RabbitMQ console):
{code:java}
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
createEndpoints(context);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
.delayer(5000)
.setHeader("rabbitmq.ROUTING_KEY",
constant("destinationQueue"))
.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
.routeId("myRoute");
}
});
context.start();
new BufferedReader(new InputStreamReader(System.in)).readLine();
context.stop();
}
{code}
you get the following exception:
{noformat}
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer
org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel
AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel
AMQChannel(amqp://[email protected]:5672/,1):
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to
clean channel shutdown; protocol method: #method<channel.close>(reply-code=200,
reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
at
org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
at
com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at
com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}
I think that this is caused by a race condition between the main thread that
runs channel.close(); immediately after channel.basicCancel(tag); (see
org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the
channel.basicAck(deliveryTag, false); in handleDelivery().
Another bad side effect is that you'll find a duplicate of a message on the
destinationQueue. For example if you have 10 initial messages in sourceQueue
and you hit enter while it's processing the third one, you'll get 7 messages in
sourceQueue and 4 messages in destinationQueue.
The correct behaviour should be the following:
1) Stop consumer: channel.basicCancel(tag)
2) Wait if there is a running consumer
3) The consumer acks the previous message
4) Close the channel
> Race condition when stopping context with autoack=false
> -------------------------------------------------------
>
> Key: CAMEL-10229
> URL: https://issues.apache.org/jira/browse/CAMEL-10229
> Project: Camel
> Issue Type: Bug
> Components: camel-rabbitmq
> Affects Versions: 2.17.3
> Reporter: Piero Cangianiello
> Labels: autoack, rabbitmq, stop
>
> Run the following code and hit enter while one message is in unacked state
> (see RabbitMQ console):
> {code:java}
> public static void main(String[] args) throws Exception {
> CamelContext context = new DefaultCamelContext();
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() {
>
> from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
> .delayer(5000)
> .setHeader("rabbitmq.ROUTING_KEY",
> constant("destinationQueue"))
>
> .to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
> .routeId("myRoute");
> }
> });
> context.start();
> new BufferedReader(new InputStreamReader(System.in)).readLine();
> context.stop();
> }
> {code}
> you get the following exception:
> {noformat}
> com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer
> org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
> (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel
> AMQChannel(amqp://[email protected]:5672/,1) threw an exception for
> channel AMQChannel(amqp://[email protected]:5672/,1):
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due to
> clean channel shutdown; protocol method:
> #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
> at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
> at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
> at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
> at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
> at
> org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
> at
> com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
> at
> com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> I think that this is caused by a race condition between the main thread that
> runs channel.close(); immediately after channel.basicCancel(tag); (see
> org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the
> channel.basicAck(deliveryTag, false); in handleDelivery().
> Another bad side effect is that you'll find a duplicate of a message on the
> destinationQueue. For example if you have 10 initial messages in sourceQueue
> and you hit enter while it's processing the third one, you'll get 7 messages
> in sourceQueue and 4 messages in destinationQueue.
> The correct behaviour should be the following:
> 1) Stop consumer: channel.basicCancel(tag)
> 2) Wait if there is a running consumer
> 3) The consumer acks the previous message
> 4) Close the channel
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)