[
https://issues.apache.org/jira/browse/CAMEL-19575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nikunj Kumar Gupta updated CAMEL-19575:
---------------------------------------
Description:
Minimal Repro steps -
* create a rabbitmq docker container.
**
{code:java}
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672
rabbitmq:3.12-management {code}
* create a queue named 'demo' in the rabbitmq
* Add 2 messages into the queue.
* Run the following code -
**
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class RabitMqMain
{
public static void main(String[] args) throws Exception
{
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
String endpoint = "rabbitmq:?" +
"prefetchCount=1&" +
"autoAck=false&" +
"automaticRecoveryEnabled=true&" +
"skipExchangeDeclare=true&" +
"synchronous=true&" +
"skipQueueDeclare=true&" +
"skipQueueBind=true&" +
"portNumber=5672&" +
"prefetchSize=0&" +
"vhost=%2F&" +
"hostname=localhost&" +
"password=guest&" +
"requestedHeartbeat=60&" +
"topologyRecoveryEnabled=false&" +
"prefetchEnabled=true&" +
"connectionTimeout=60000&" +
"networkRecoveryInterval=10000&" +
"concurrentConsumers=3&" +
"queue=demo&" +
"username=guest";
from(endpoint).routeId("rabit")
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
String msg = (String) exchange.getIn().getBody();
System.out.println("started processing message - "+
msg);
Thread.sleep(25 * 1000); // 25sec
System.out.println("finished processing message - "+
msg);
}
});
}
});
camelContext.start();
Thread.sleep(10 * 1000);
System.out.println("STOPPING");
camelContext.stop();
System.out.println("STOPPED");
}
}
{code}
* Once you run this code, add one more message to the queue after you see the
"STOPPING" log (do this in less than 15 sec).
* Now you will see that the message you added in previous step has also
started to be processed.
was:
Minimal Repro steps -
* create a rabbitmq docker container.
**
{code:java}
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672
rabbitmq:3.12-management {code}
* create a queue named 'demo' in the rabbitmq
* Add 2 messages into the queue.
* Run the following code -
**
{code:java}
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class RabitMqMain
{
public static void main(String[] args) throws Exception
{
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
String endpoint = "rabbitmq:?" +
"prefetchCount=1&" +
"autoAck=false&" +
"automaticRecoveryEnabled=true&" +
"skipExchangeDeclare=true&" +
"synchronous=true&" +
"skipQueueDeclare=true&" +
"skipQueueBind=true&" +
"portNumber=5672&" +
"prefetchSize=0&" +
"vhost=%2F&" +
"hostname=localhost&" +
"password=guest&" +
"requestedHeartbeat=60&" +
"topologyRecoveryEnabled=false&" +
"prefetchEnabled=true&" +
"connectionTimeout=60000&" +
"networkRecoveryInterval=10000&" +
"concurrentConsumers=3&" +
"queue=demo&" +
"username=guest";
from(endpoint).routeId("rabit")
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
String msg = (String) exchange.getIn().getBody();
System.out.println("started processing message - "+
msg);
Thread.sleep(15 * 1000); // 15sec
System.out.println("finished processing message - "+
msg);
}
});
}
});
camelContext.start();
Thread.sleep(10 * 1000);
System.out.println("STOPPING");
camelContext.stop();
System.out.println("STOPPED");
}
}
{code}
> RabbitMQConsumer keeps on consuming even when route shutdown is triggered.
> --------------------------------------------------------------------------
>
> Key: CAMEL-19575
> URL: https://issues.apache.org/jira/browse/CAMEL-19575
> Project: Camel
> Issue Type: Bug
> Affects Versions: 3.21.0
> Reporter: Nikunj Kumar Gupta
> Priority: Major
>
> Minimal Repro steps -
> * create a rabbitmq docker container.
> **
> {code:java}
> docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672
> rabbitmq:3.12-management {code}
> * create a queue named 'demo' in the rabbitmq
> * Add 2 messages into the queue.
> * Run the following code -
> **
> {code:java}
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> public class RabitMqMain
> {
> public static void main(String[] args) throws Exception
> {
> CamelContext camelContext = new DefaultCamelContext();
> camelContext.addRoutes(new RouteBuilder()
> {
> @Override
> public void configure() throws Exception
> {
> String endpoint = "rabbitmq:?" +
> "prefetchCount=1&" +
> "autoAck=false&" +
> "automaticRecoveryEnabled=true&" +
> "skipExchangeDeclare=true&" +
> "synchronous=true&" +
> "skipQueueDeclare=true&" +
> "skipQueueBind=true&" +
> "portNumber=5672&" +
> "prefetchSize=0&" +
> "vhost=%2F&" +
> "hostname=localhost&" +
> "password=guest&" +
> "requestedHeartbeat=60&" +
> "topologyRecoveryEnabled=false&" +
> "prefetchEnabled=true&" +
> "connectionTimeout=60000&" +
> "networkRecoveryInterval=10000&" +
> "concurrentConsumers=3&" +
> "queue=demo&" +
> "username=guest";
> from(endpoint).routeId("rabit")
> .process(new Processor()
> {
> @Override
> public void process(Exchange exchange) throws Exception
> {
> String msg = (String) exchange.getIn().getBody();
> System.out.println("started processing message - "+
> msg);
> Thread.sleep(25 * 1000); // 25sec
> System.out.println("finished processing message - "+
> msg);
> }
> });
> }
> });
> camelContext.start();
> Thread.sleep(10 * 1000);
> System.out.println("STOPPING");
> camelContext.stop();
> System.out.println("STOPPED");
> }
> }
> {code}
> * Once you run this code, add one more message to the queue after you see
> the "STOPPING" log (do this in less than 15 sec).
> * Now you will see that the message you added in previous step has also
> started to be processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)