Nikunj Kumar Gupta created CAMEL-19575:
------------------------------------------

             Summary: RabbitMQConsumer keeps on consuming even when route 
shutdown is triggered.
                 Key: CAMEL-19575
                 URL: https://issues.apache.org/jira/browse/CAMEL-19575
             Project: Camel
          Issue Type: Bug
    Affects Versions: 3.21.0
            Reporter: Nikunj Kumar Gupta


Minimal Repro steps - 
 * create a rabbitmq docker container.
 ** 
{code:java}
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 
rabbitmq:3.12-management {code}

 * create a queue named 'demo' in the rabbitmq
 * Add 2 messages into the queue.
 * Run the following code - 
 ** 
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class RabitMqMain
{
   public static void main(String[] args) throws Exception
   {
      CamelContext camelContext = new DefaultCamelContext();
      camelContext.addRoutes(new RouteBuilder()
      {
         @Override
         public void configure() throws Exception
         {
            String endpoint = "rabbitmq:?" +
                  "prefetchCount=1&" +
                  "autoAck=false&" +
                  "automaticRecoveryEnabled=true&" +
                  "skipExchangeDeclare=true&" +
                  "synchronous=true&" +
                  "skipQueueDeclare=true&" +
                  "skipQueueBind=true&" +
                  "portNumber=5672&" +
                  "prefetchSize=0&" +
                  "vhost=%2F&" +
                  "hostname=localhost&" +
                  "password=guest&" +
                  "requestedHeartbeat=60&" +
                  "topologyRecoveryEnabled=false&" +
                  "prefetchEnabled=true&" +
                  "connectionTimeout=60000&" +
                  "networkRecoveryInterval=10000&" +
                  "concurrentConsumers=3&" +
                  "queue=demo&" +
                  "username=guest";
            from(endpoint).routeId("rabit")
                  .process(new Processor()
                  {
                     @Override
                     public void process(Exchange exchange) throws Exception
                     {
                        String msg = (String) exchange.getIn().getBody();
                        System.out.println("started processing message - "+ 
msg);
                        Thread.sleep(15 * 1000); // 15sec
                        System.out.println("finished processing message - "+ 
msg);
                     }
                  });
         }
      });
      camelContext.start();
      Thread.sleep(10 * 1000);
      System.out.println("STOPPING");
      camelContext.stop();
      System.out.println("STOPPED");

   }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to