DefaultShutdownStrategy and ShutdownAware losing exchange
---------------------------------------------------------

                 Key: CAMEL-3348
                 URL: https://issues.apache.org/activemq/browse/CAMEL-3348
             Project: Apache Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.5.0
            Reporter: Damien Delautre


There's a problem when we shutdown the camel context with a seda endpoint.

In the SedaConsumer, the exchange is removed from the queue and then, later, is 
added to the InflightRepository as shown in the following code (I put comments 
where it is done):

{code}
public void run() {
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        while (queue != null && isRunAllowed()) {
            final Exchange exchange;
            try {
                exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The 
exchange is removed here from the queue
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
                }
                continue;
            }
            if (exchange != null) {
                if (isRunAllowed()) {
                    try {
                        sendToConsumers(exchange); // Call to sendToConsumers 
detailed below

                        if (exchange.getException() != null) {
                            getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                        }
                    } catch (Exception e) {
                        getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
                    }
                } else {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("This consumer is stopped during polling an 
exchange, so putting it back on the seda queue: " + exchange);
                    }
                    try {
                        queue.put(exchange);
                    } catch (InterruptedException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
                        }
                        continue;
                    }
                }
            }
        }
    }

    protected void sendToConsumers(Exchange exchange) throws Exception {
        int size = endpoint.getConsumers().size();

        if (size > 1) {

            if (LOG.isDebugEnabled()) {
                LOG.debug("Multicasting to " + endpoint.getConsumers().size() + 
" consumers for Exchange: " + exchange);
            }
           
            MulticastProcessor mp = endpoint.getConumserMulticastProcessor();

            AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
                public void done(boolean doneSync) {
                }
            });
        } else {
            AsyncProcessorHelper.process(processor, exchange, new 
AsyncCallback() { // This line will create the UnitOfWork (in 
UnitOfWorkProcessor) which will put the exchange in the InflightRepository
                public void done(boolean doneSync) {

                }
            });
        }
    }
{code}

If the shutdown occurs between these two actions, the DefaultShutdownStrategy 
will shutdown the route even if there is a message in progress. And the message 
will be lost.

Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the 
shutdown even if there is some messages still in progress. (I put comments in 
it to show the state of the seda queue and InflightRepository if it is called 
between the queue.poll() and the InflightRepository.add())

{code}
for (Consumer consumer : order.getInputs()) {
                        int inflight = 
context.getInflightRepository().size(consumer.getEndpoint()); // check the 
number of inflight exchanges which is 0 because the UnitOfWork is not created

                        if (consumer instanceof ShutdownAware) {
                            inflight += ((ShutdownAware) 
consumer).getPendingExchangesSize(); // check the number of exchange in the 
seda queue which is 0 because the message is already removed
                        }
                        if (inflight > 0) {
                            size += inflight;
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(inflight + " inflight and pending 
exchanges for consumer: " + consumer);
                            }
                        }
                    }
{code}
You can reproduce it by putting a breakpoint in the method {code}protected void 
sendToConsumers(Exchange exchange){code} in SedaConsumer and calling stop() on 
the CamelContext while the thread is suspended by the breakpoint.

We caught the problem in a unit test where we were testing the shutdown and 
when our test server was under heavy load.


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to