[ 
https://issues.apache.org/jira/browse/CAMEL-19575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen resolved CAMEL-19575.
---------------------------------
    Resolution: Fixed

> camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown 
> is triggered.
> -------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-19575
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19575
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 3.21.0
>            Reporter: Nikunj Kumar Gupta
>            Priority: Minor
>              Labels: rabbitmq
>             Fix For: 3.14.10, 3.20.7, 3.21.1, 3.22.0
>
>
> Minimal Repro steps - 
>  * create a rabbitmq docker container.
>  ** 
> {code:java}
> docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 
> rabbitmq:3.12-management {code}
>  * create a queue named 'demo' in the rabbitmq
>  * Add 2 messages into the queue.
>  * Run the following code - 
>  ** 
> {code:java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> public class RabitMqMain
> {
>    public static void main(String[] args) throws Exception
>    {
>       CamelContext camelContext = new DefaultCamelContext();
>       camelContext.addRoutes(new RouteBuilder()
>       {
>          @Override
>          public void configure() throws Exception
>          {
>             String endpoint = "rabbitmq:?" +
>                   "prefetchCount=1&" +
>                   "autoAck=false&" +
>                   "automaticRecoveryEnabled=true&" +
>                   "skipExchangeDeclare=true&" +
>                   "synchronous=true&" +
>                   "skipQueueDeclare=true&" +
>                   "skipQueueBind=true&" +
>                   "portNumber=5672&" +
>                   "prefetchSize=0&" +
>                   "vhost=%2F&" +
>                   "hostname=localhost&" +
>                   "password=guest&" +
>                   "requestedHeartbeat=60&" +
>                   "topologyRecoveryEnabled=false&" +
>                   "prefetchEnabled=true&" +
>                   "connectionTimeout=60000&" +
>                   "networkRecoveryInterval=10000&" +
>                   "concurrentConsumers=3&" +
>                   "queue=demo&" +
>                   "username=guest";
>             from(endpoint).routeId("rabit")
>                   .process(new Processor()
>                   {
>                      @Override
>                      public void process(Exchange exchange) throws Exception
>                      {
>                         String msg = exchange.toString();
>                         System.out.println("started processing message - "+ 
> msg);
>                         Thread.sleep(25 * 1000); // 25sec
>                         System.out.println("finished processing message - "+ 
> msg);
>                      }
>                   });
>          }
>       });
>       camelContext.start();
>       Thread.sleep(10 * 1000);
>       System.out.println("STOPPING");
>       camelContext.stop();
>       System.out.println("STOPPED");
>    }
> }
> {code}
>  * Once you run this code, add one more message to the queue after you see 
> the "STOPPING" log (do this in less than 15 sec).
>  * Now you will see that the message you added in previous step has also 
> started to be processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to