[ 
https://issues.apache.org/jira/browse/CAMEL-19575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikunj Kumar Gupta updated CAMEL-19575:
---------------------------------------
    Description: 
Minimal Repro steps - 
 * create a rabbitmq docker container.
 ** 
{code:java}
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 
rabbitmq:3.12-management {code}

 * create a queue named 'demo' in the rabbitmq
 * Add 2 messages into the queue.
 * Run the following code - 
 ** 
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class RabitMqMain
{
   public static void main(String[] args) throws Exception
   {
      CamelContext camelContext = new DefaultCamelContext();
      camelContext.addRoutes(new RouteBuilder()
      {
         @Override
         public void configure() throws Exception
         {
            String endpoint = "rabbitmq:?" +
                  "prefetchCount=1&" +
                  "autoAck=false&" +
                  "automaticRecoveryEnabled=true&" +
                  "skipExchangeDeclare=true&" +
                  "synchronous=true&" +
                  "skipQueueDeclare=true&" +
                  "skipQueueBind=true&" +
                  "portNumber=5672&" +
                  "prefetchSize=0&" +
                  "vhost=%2F&" +
                  "hostname=localhost&" +
                  "password=guest&" +
                  "requestedHeartbeat=60&" +
                  "topologyRecoveryEnabled=false&" +
                  "prefetchEnabled=true&" +
                  "connectionTimeout=60000&" +
                  "networkRecoveryInterval=10000&" +
                  "concurrentConsumers=3&" +
                  "queue=demo&" +
                  "username=guest";
            from(endpoint).routeId("rabit")
                  .process(new Processor()
                  {
                     @Override
                     public void process(Exchange exchange) throws Exception
                     {
                        String msg = (String) exchange.getIn().getBody();
                        System.out.println("started processing message - "+ 
msg);
                        Thread.sleep(25 * 1000); // 25sec
                        System.out.println("finished processing message - "+ 
msg);
                     }
                  });
         }
      });
      camelContext.start();
      Thread.sleep(10 * 1000);
      System.out.println("STOPPING");
      camelContext.stop();
      System.out.println("STOPPED");

   }
}
{code}

 * Once you run this code, add one more message to the queue after you see the 
"STOPPING" log (do this in less than 15 sec).
 * Now you will see that the message you added in previous step has also 
started to be processed.

  was:
Minimal Repro steps - 
 * create a rabbitmq docker container.
 ** 
{code:java}
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 
rabbitmq:3.12-management {code}

 * create a queue named 'demo' in the rabbitmq
 * Add 2 messages into the queue.
 * Run the following code - 
 ** 
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class RabitMqMain
{
   public static void main(String[] args) throws Exception
   {
      CamelContext camelContext = new DefaultCamelContext();
      camelContext.addRoutes(new RouteBuilder()
      {
         @Override
         public void configure() throws Exception
         {
            String endpoint = "rabbitmq:?" +
                  "prefetchCount=1&" +
                  "autoAck=false&" +
                  "automaticRecoveryEnabled=true&" +
                  "skipExchangeDeclare=true&" +
                  "synchronous=true&" +
                  "skipQueueDeclare=true&" +
                  "skipQueueBind=true&" +
                  "portNumber=5672&" +
                  "prefetchSize=0&" +
                  "vhost=%2F&" +
                  "hostname=localhost&" +
                  "password=guest&" +
                  "requestedHeartbeat=60&" +
                  "topologyRecoveryEnabled=false&" +
                  "prefetchEnabled=true&" +
                  "connectionTimeout=60000&" +
                  "networkRecoveryInterval=10000&" +
                  "concurrentConsumers=3&" +
                  "queue=demo&" +
                  "username=guest";
            from(endpoint).routeId("rabit")
                  .process(new Processor()
                  {
                     @Override
                     public void process(Exchange exchange) throws Exception
                     {
                        String msg = (String) exchange.getIn().getBody();
                        System.out.println("started processing message - "+ 
msg);
                        Thread.sleep(15 * 1000); // 15sec
                        System.out.println("finished processing message - "+ 
msg);
                     }
                  });
         }
      });
      camelContext.start();
      Thread.sleep(10 * 1000);
      System.out.println("STOPPING");
      camelContext.stop();
      System.out.println("STOPPED");

   }
}
{code}


> RabbitMQConsumer keeps on consuming even when route shutdown is triggered.
> --------------------------------------------------------------------------
>
>                 Key: CAMEL-19575
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19575
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 3.21.0
>            Reporter: Nikunj Kumar Gupta
>            Priority: Major
>
> Minimal Repro steps - 
>  * create a rabbitmq docker container.
>  ** 
> {code:java}
> docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 
> rabbitmq:3.12-management {code}
>  * create a queue named 'demo' in the rabbitmq
>  * Add 2 messages into the queue.
>  * Run the following code - 
>  ** 
> {code:java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> public class RabitMqMain
> {
>    public static void main(String[] args) throws Exception
>    {
>       CamelContext camelContext = new DefaultCamelContext();
>       camelContext.addRoutes(new RouteBuilder()
>       {
>          @Override
>          public void configure() throws Exception
>          {
>             String endpoint = "rabbitmq:?" +
>                   "prefetchCount=1&" +
>                   "autoAck=false&" +
>                   "automaticRecoveryEnabled=true&" +
>                   "skipExchangeDeclare=true&" +
>                   "synchronous=true&" +
>                   "skipQueueDeclare=true&" +
>                   "skipQueueBind=true&" +
>                   "portNumber=5672&" +
>                   "prefetchSize=0&" +
>                   "vhost=%2F&" +
>                   "hostname=localhost&" +
>                   "password=guest&" +
>                   "requestedHeartbeat=60&" +
>                   "topologyRecoveryEnabled=false&" +
>                   "prefetchEnabled=true&" +
>                   "connectionTimeout=60000&" +
>                   "networkRecoveryInterval=10000&" +
>                   "concurrentConsumers=3&" +
>                   "queue=demo&" +
>                   "username=guest";
>             from(endpoint).routeId("rabit")
>                   .process(new Processor()
>                   {
>                      @Override
>                      public void process(Exchange exchange) throws Exception
>                      {
>                         String msg = (String) exchange.getIn().getBody();
>                         System.out.println("started processing message - "+ 
> msg);
>                         Thread.sleep(25 * 1000); // 25sec
>                         System.out.println("finished processing message - "+ 
> msg);
>                      }
>                   });
>          }
>       });
>       camelContext.start();
>       Thread.sleep(10 * 1000);
>       System.out.println("STOPPING");
>       camelContext.stop();
>       System.out.println("STOPPED");
>    }
> }
> {code}
>  * Once you run this code, add one more message to the queue after you see 
> the "STOPPING" log (do this in less than 15 sec).
>  * Now you will see that the message you added in previous step has also 
> started to be processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to