[
https://issues.apache.org/jira/browse/CAMEL-19575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-19575.
---------------------------------
Resolution: Fixed
> camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown
> is triggered.
> -------------------------------------------------------------------------------------------
>
> Key: CAMEL-19575
> URL: https://issues.apache.org/jira/browse/CAMEL-19575
> Project: Camel
> Issue Type: Bug
> Affects Versions: 3.21.0
> Reporter: Nikunj Kumar Gupta
> Priority: Minor
> Labels: rabbitmq
> Fix For: 3.14.10, 3.20.7, 3.21.1, 3.22.0
>
>
> Minimal Repro steps -
> * create a rabbitmq docker container.
> **
> {code:java}
> docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672
> rabbitmq:3.12-management {code}
> * create a queue named 'demo' in the rabbitmq
> * Add 2 messages into the queue.
> * Run the following code -
> **
> {code:java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> public class RabitMqMain
> {
> public static void main(String[] args) throws Exception
> {
> CamelContext camelContext = new DefaultCamelContext();
> camelContext.addRoutes(new RouteBuilder()
> {
> @Override
> public void configure() throws Exception
> {
> String endpoint = "rabbitmq:?" +
> "prefetchCount=1&" +
> "autoAck=false&" +
> "automaticRecoveryEnabled=true&" +
> "skipExchangeDeclare=true&" +
> "synchronous=true&" +
> "skipQueueDeclare=true&" +
> "skipQueueBind=true&" +
> "portNumber=5672&" +
> "prefetchSize=0&" +
> "vhost=%2F&" +
> "hostname=localhost&" +
> "password=guest&" +
> "requestedHeartbeat=60&" +
> "topologyRecoveryEnabled=false&" +
> "prefetchEnabled=true&" +
> "connectionTimeout=60000&" +
> "networkRecoveryInterval=10000&" +
> "concurrentConsumers=3&" +
> "queue=demo&" +
> "username=guest";
> from(endpoint).routeId("rabit")
> .process(new Processor()
> {
> @Override
> public void process(Exchange exchange) throws Exception
> {
> String msg = exchange.toString();
> System.out.println("started processing message - "+
> msg);
> Thread.sleep(25 * 1000); // 25sec
> System.out.println("finished processing message - "+
> msg);
> }
> });
> }
> });
> camelContext.start();
> Thread.sleep(10 * 1000);
> System.out.println("STOPPING");
> camelContext.stop();
> System.out.println("STOPPED");
> }
> }
> {code}
> * Once you run this code, add one more message to the queue after you see
> the "STOPPING" log (do this in less than 15 sec).
> * Now you will see that the message you added in previous step has also
> started to be processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)