[ 
https://issues.apache.org/activemq/browse/CAMEL-3348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63428#action_63428
 ] 

Claus Ibsen commented on CAMEL-3348:
------------------------------------

Well spotted.



> DefaultShutdownStrategy and ShutdownAware (SedaConsumer) losing exchange
> ------------------------------------------------------------------------
>
>                 Key: CAMEL-3348
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-3348
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.5.0
>            Reporter: Damien Delautre
>            Assignee: Claus Ibsen
>
> There's a problem when we shutdown the camel context with a seda endpoint.
> In the SedaConsumer, the exchange is removed from the queue and then, later, 
> is added to the InflightRepository as shown in the following code (I put 
> comments where it is done):
> {code}
> public void run() {
>         BlockingQueue<Exchange> queue = endpoint.getQueue();
>         while (queue != null && isRunAllowed()) {
>             final Exchange exchange;
>             try {
>                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The 
> exchange is removed here from the queue
>             } catch (InterruptedException e) {
>                 if (LOG.isDebugEnabled()) {
>                     LOG.debug("Sleep interrupted, are we stopping? " + 
> (isStopping() || isStopped()));
>                 }
>                 continue;
>             }
>             if (exchange != null) {
>                 if (isRunAllowed()) {
>                     try {
>                         sendToConsumers(exchange); // Call to sendToConsumers 
> detailed below
>                         if (exchange.getException() != null) {
>                             getExceptionHandler().handleException("Error 
> processing exchange", exchange, exchange.getException());
>                         }
>                     } catch (Exception e) {
>                         getExceptionHandler().handleException("Error 
> processing exchange", exchange, e);
>                     }
>                 } else {
>                     if (LOG.isWarnEnabled()) {
>                         LOG.warn("This consumer is stopped during polling an 
> exchange, so putting it back on the seda queue: " + exchange);
>                     }
>                     try {
>                         queue.put(exchange);
>                     } catch (InterruptedException e) {
>                         if (LOG.isDebugEnabled()) {
>                             LOG.debug("Sleep interrupted, are we stopping? " 
> + (isStopping() || isStopped()));
>                         }
>                         continue;
>                     }
>                 }
>             }
>         }
>     }
>     protected void sendToConsumers(Exchange exchange) throws Exception {
>         int size = endpoint.getConsumers().size();
>         if (size > 1) {
>             if (LOG.isDebugEnabled()) {
>                 LOG.debug("Multicasting to " + endpoint.getConsumers().size() 
> + " consumers for Exchange: " + exchange);
>             }
>            
>             MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
>             AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
>                 public void done(boolean doneSync) {
>                 }
>             });
>         } else {
>             AsyncProcessorHelper.process(processor, exchange, new 
> AsyncCallback() { // This line will create the UnitOfWork (in 
> UnitOfWorkProcessor) which will put the exchange in the InflightRepository
>                 public void done(boolean doneSync) {
>                 }
>             });
>         }
>     }
> {code}
> If the shutdown occurs between these two actions, the DefaultShutdownStrategy 
> will shutdown the route even if there is a message in progress. And the 
> message will be lost.
> Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the 
> shutdown even if there is some messages still in progress. (I put comments in 
> it to show the state of the seda queue and InflightRepository if it is called 
> between the queue.poll() and the InflightRepository.add())
> {code}
> for (Consumer consumer : order.getInputs()) {
>                         int inflight = 
> context.getInflightRepository().size(consumer.getEndpoint()); // check the 
> number of inflight exchanges which is 0 because the UnitOfWork is not created
>                         if (consumer instanceof ShutdownAware) {
>                             inflight += ((ShutdownAware) 
> consumer).getPendingExchangesSize(); // check the number of exchange in the 
> seda queue which is 0 because the message is already removed
>                         }
>                         if (inflight > 0) {
>                             size += inflight;
>                             if (LOG.isDebugEnabled()) {
>                                 LOG.debug(inflight + " inflight and pending 
> exchanges for consumer: " + consumer);
>                             }
>                         }
>                     }
> {code}
> You can reproduce it by putting a breakpoint in the method {code}protected 
> void sendToConsumers(Exchange exchange){code} in SedaConsumer and calling 
> stop() on the CamelContext while the thread is suspended by the breakpoint.
> We caught the problem in a unit test where we were testing the shutdown and 
> when our test server was under heavy load.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to