[ 
https://issues.apache.org/jira/browse/CAMEL-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zdeněk Obst updated CAMEL-9567:
-------------------------------
    Description: 
When using SjmsConsumer that consumes JMS messages from broker (e.g. ActiveMQ) 
and *stop Camel Context, the consumer still reads new messages from broker* 
even if it should only finish processing of already fetched messages. It causes 
that Context is never stopped if there are still new and new messages in the 
broker (or before timeout for stop operation occurs).

When I investigated code, it seems that suspend operation is not implemented 
(or code does not check isSuspended flag). What I would expect is that consumer 
unregisters JMS listener on context stop (consumer suspend).

Here is the sample code I used for testing:
{code:java}
public class Test {

    public static void main(String[] args) throws Exception {
       // pre-fill JMS Broker with many many messages, e.g. 10.000

        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("sjms:queue:test?consumerCount=5")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws 
Exception {
                                Thread.sleep(1000); // not to consume all 
messages instantly
                                System.out.println("Processed message " + 
exchange.getExchangeId());
                            }
                        });
            }
        };

        CamelContext context = new DefaultCamelContext();
        context.getShutdownStrategy().setTimeout(1000); // 1000s = big enough 
timeout so I can be sure it is consumer problem
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        Thread.sleep(5 * 1000); // Consume few messages at the beginning

        System.out.println("=====> Stopping context");
        context.stop();
        System.out.println("=====> Context stopped"); // Will not get here as 
long as there are any messages left in the queue
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ is easiest for 
testing this
        ConnectionFactoryResource connResource = new 
ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }
}
{code}

The original mailing list thread:
http://camel.465427.n5.nabble.com/CamelContext-stop-with-SJMS-consumer-does-not-stop-consuming-messages-from-broker-td5777207.html

  was:
When using SjmsConsumer that consumes JMS messages from broker (e.g. ActiveMQ) 
and *stop Camel Context, the consumer still reads new messages from broker* 
even if it should only finish processing of already fetched messages. It causes 
that Context is never stopped if there are still new and new messages in the 
broker (or before timeout for stop operation occurs).

When I investigated code, it seems that suspend operation is not implemented 
(or code does not check isSuspended flag). What I would expect is that consumer 
unregisters JMS listener on context stop (consumer suspend).

Here is the sample code I used for testing:
{code:java}
public class Test {

    public static void main(String[] args) throws Exception {
        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("sjms:queue:test?consumerCount=5")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws 
Exception {
                                Thread.sleep(1000); // not to consume all 
messages instantly
                                System.out.println("Processed message " + 
exchange.getExchangeId());
                            }
                        });
            }
        };

        CamelContext context = new DefaultCamelContext();
        context.getShutdownStrategy().setTimeout(1000); // 1000s = big enough 
timeout so I can be sure it is consumer problem
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        Thread.sleep(5 * 1000); // Consume few messages at the beginning

        System.out.println("=====> Stopping context");
        context.stop();
        System.out.println("=====> Context stopped"); // Will not get here as 
long as there are any messages left in the queue
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactoryResource connResource = new 
ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }
}
{code}

The original mailing list thread:
http://camel.465427.n5.nabble.com/CamelContext-stop-with-SJMS-consumer-does-not-stop-consuming-messages-from-broker-td5777207.html


> Consumer does not suspend (on context) stop
> -------------------------------------------
>
>                 Key: CAMEL-9567
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9567
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 2.16.2
>            Reporter: Zdeněk Obst
>              Labels: camel-sjms
>
> When using SjmsConsumer that consumes JMS messages from broker (e.g. 
> ActiveMQ) and *stop Camel Context, the consumer still reads new messages from 
> broker* even if it should only finish processing of already fetched messages. 
> It causes that Context is never stopped if there are still new and new 
> messages in the broker (or before timeout for stop operation occurs).
> When I investigated code, it seems that suspend operation is not implemented 
> (or code does not check isSuspended flag). What I would expect is that 
> consumer unregisters JMS listener on context stop (consumer suspend).
> Here is the sample code I used for testing:
> {code:java}
> public class Test {
>     public static void main(String[] args) throws Exception {
>        // pre-fill JMS Broker with many many messages, e.g. 10.000
>         RouteBuilder rb = new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("sjms:queue:test?consumerCount=5")
>                         .process(new Processor() {
>                             @Override
>                             public void process(Exchange exchange) throws 
> Exception {
>                                 Thread.sleep(1000); // not to consume all 
> messages instantly
>                                 System.out.println("Processed message " + 
> exchange.getExchangeId());
>                             }
>                         });
>             }
>         };
>         CamelContext context = new DefaultCamelContext();
>         context.getShutdownStrategy().setTimeout(1000); // 1000s = big enough 
> timeout so I can be sure it is consumer problem
>         addJmsComponent(context);
>         context.addRoutes(rb);
>         System.out.println("=====> Starting context");
>         context.start();
>         Thread.sleep(5 * 1000); // Consume few messages at the beginning
>         System.out.println("=====> Stopping context");
>         context.stop();
>         System.out.println("=====> Context stopped"); // Will not get here as 
> long as there are any messages left in the queue
>     }
>     private static void addJmsComponent(CamelContext context) {
>         ConnectionFactory factory = new 
> ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ is easiest 
> for testing this
>         ConnectionFactoryResource connResource = new 
> ConnectionFactoryResource(5, factory);
>         SjmsComponent comp = new SjmsComponent();
>         comp.setConnectionResource(connResource);
>         context.addComponent("sjms", comp);
>     }
> }
> {code}
> The original mailing list thread:
> http://camel.465427.n5.nabble.com/CamelContext-stop-with-SJMS-consumer-does-not-stop-consuming-messages-from-broker-td5777207.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to