[
https://issues.apache.org/activemq/browse/CAMEL-3348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-3348.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.6.0
trunk: 1037666.
Damien could you test the fix on your system if possible?
> DefaultShutdownStrategy and ShutdownAware (SedaConsumer) losing exchange
> ------------------------------------------------------------------------
>
> Key: CAMEL-3348
> URL: https://issues.apache.org/activemq/browse/CAMEL-3348
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.5.0
> Reporter: Damien Delautre
> Assignee: Claus Ibsen
> Fix For: 2.6.0
>
>
> There's a problem when we shutdown the camel context with a seda endpoint.
> In the SedaConsumer, the exchange is removed from the queue and then, later,
> is added to the InflightRepository as shown in the following code (I put
> comments where it is done):
> {code}
> public void run() {
> BlockingQueue<Exchange> queue = endpoint.getQueue();
> while (queue != null && isRunAllowed()) {
> final Exchange exchange;
> try {
> exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The
> exchange is removed here from the queue
> } catch (InterruptedException e) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Sleep interrupted, are we stopping? " +
> (isStopping() || isStopped()));
> }
> continue;
> }
> if (exchange != null) {
> if (isRunAllowed()) {
> try {
> sendToConsumers(exchange); // Call to sendToConsumers
> detailed below
> if (exchange.getException() != null) {
> getExceptionHandler().handleException("Error
> processing exchange", exchange, exchange.getException());
> }
> } catch (Exception e) {
> getExceptionHandler().handleException("Error
> processing exchange", exchange, e);
> }
> } else {
> if (LOG.isWarnEnabled()) {
> LOG.warn("This consumer is stopped during polling an
> exchange, so putting it back on the seda queue: " + exchange);
> }
> try {
> queue.put(exchange);
> } catch (InterruptedException e) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Sleep interrupted, are we stopping? "
> + (isStopping() || isStopped()));
> }
> continue;
> }
> }
> }
> }
> }
> protected void sendToConsumers(Exchange exchange) throws Exception {
> int size = endpoint.getConsumers().size();
> if (size > 1) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Multicasting to " + endpoint.getConsumers().size()
> + " consumers for Exchange: " + exchange);
> }
>
> MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
> AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
> public void done(boolean doneSync) {
> }
> });
> } else {
> AsyncProcessorHelper.process(processor, exchange, new
> AsyncCallback() { // This line will create the UnitOfWork (in
> UnitOfWorkProcessor) which will put the exchange in the InflightRepository
> public void done(boolean doneSync) {
> }
> });
> }
> }
> {code}
> If the shutdown occurs between these two actions, the DefaultShutdownStrategy
> will shutdown the route even if there is a message in progress. And the
> message will be lost.
> Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the
> shutdown even if there is some messages still in progress. (I put comments in
> it to show the state of the seda queue and InflightRepository if it is called
> between the queue.poll() and the InflightRepository.add())
> {code}
> for (Consumer consumer : order.getInputs()) {
> int inflight =
> context.getInflightRepository().size(consumer.getEndpoint()); // check the
> number of inflight exchanges which is 0 because the UnitOfWork is not created
> if (consumer instanceof ShutdownAware) {
> inflight += ((ShutdownAware)
> consumer).getPendingExchangesSize(); // check the number of exchange in the
> seda queue which is 0 because the message is already removed
> }
> if (inflight > 0) {
> size += inflight;
> if (LOG.isDebugEnabled()) {
> LOG.debug(inflight + " inflight and pending
> exchanges for consumer: " + consumer);
> }
> }
> }
> {code}
> You can reproduce it by putting a breakpoint in the method {code}protected
> void sendToConsumers(Exchange exchange){code} in SedaConsumer and calling
> stop() on the CamelContext while the thread is suspended by the breakpoint.
> We caught the problem in a unit test where we were testing the shutdown and
> when our test server was under heavy load.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.