[
https://issues.apache.org/activemq/browse/CAMEL-3348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63445#action_63445
]
Damien Delautre commented on CAMEL-3348:
----------------------------------------
Thank you for your quick fix!
So I tested it and if I left my breakpoint where it was (for testing purpose),
it works.
But, if I put a breakpoint at the line with tasks.incrementAndGet() in
SedaConsumer to simulate that the call to getPendingExchangesSize() is done
between the queue.poll and the tasks.incrementAndGet(), I can reproduce the
problem:
{code}
public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue();
while (queue != null && isRunAllowed()) {
Exchange exchange = null;
try {
exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
if (exchange != null) {
try {
tasks.incrementAndGet(); // if we put a breakpoint
here the problem will occur
sendToConsumers(exchange);
// log exception if an exception occurred and was not
handled
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
}
} catch (Exception e) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
} finally {
tasks.decrementAndGet();
}
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep interrupted, are we stopping? " +
(isStopping() || isStopped()));
}
continue;
} catch (Throwable e) {
if (exchange != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
} else {
getExceptionHandler().handleException(e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Ending this polling consumer thread, there are still " +
tasks.get() + " threads left.");
}
}
{code}
The odds that it will occur are lower but it can happen if we are really,
really unlucky.
I've seen that you fixed this bug for the 2.6.0 version. When will it be
released?
> DefaultShutdownStrategy and ShutdownAware (SedaConsumer) losing exchange
> ------------------------------------------------------------------------
>
> Key: CAMEL-3348
> URL: https://issues.apache.org/activemq/browse/CAMEL-3348
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.5.0
> Reporter: Damien Delautre
> Assignee: Claus Ibsen
> Fix For: 2.6.0
>
>
> There's a problem when we shutdown the camel context with a seda endpoint.
> In the SedaConsumer, the exchange is removed from the queue and then, later,
> is added to the InflightRepository as shown in the following code (I put
> comments where it is done):
> {code}
> public void run() {
> BlockingQueue<Exchange> queue = endpoint.getQueue();
> while (queue != null && isRunAllowed()) {
> final Exchange exchange;
> try {
> exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The
> exchange is removed here from the queue
> } catch (InterruptedException e) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Sleep interrupted, are we stopping? " +
> (isStopping() || isStopped()));
> }
> continue;
> }
> if (exchange != null) {
> if (isRunAllowed()) {
> try {
> sendToConsumers(exchange); // Call to sendToConsumers
> detailed below
> if (exchange.getException() != null) {
> getExceptionHandler().handleException("Error
> processing exchange", exchange, exchange.getException());
> }
> } catch (Exception e) {
> getExceptionHandler().handleException("Error
> processing exchange", exchange, e);
> }
> } else {
> if (LOG.isWarnEnabled()) {
> LOG.warn("This consumer is stopped during polling an
> exchange, so putting it back on the seda queue: " + exchange);
> }
> try {
> queue.put(exchange);
> } catch (InterruptedException e) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Sleep interrupted, are we stopping? "
> + (isStopping() || isStopped()));
> }
> continue;
> }
> }
> }
> }
> }
> protected void sendToConsumers(Exchange exchange) throws Exception {
> int size = endpoint.getConsumers().size();
> if (size > 1) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Multicasting to " + endpoint.getConsumers().size()
> + " consumers for Exchange: " + exchange);
> }
>
> MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
> AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
> public void done(boolean doneSync) {
> }
> });
> } else {
> AsyncProcessorHelper.process(processor, exchange, new
> AsyncCallback() { // This line will create the UnitOfWork (in
> UnitOfWorkProcessor) which will put the exchange in the InflightRepository
> public void done(boolean doneSync) {
> }
> });
> }
> }
> {code}
> If the shutdown occurs between these two actions, the DefaultShutdownStrategy
> will shutdown the route even if there is a message in progress. And the
> message will be lost.
> Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the
> shutdown even if there is some messages still in progress. (I put comments in
> it to show the state of the seda queue and InflightRepository if it is called
> between the queue.poll() and the InflightRepository.add())
> {code}
> for (Consumer consumer : order.getInputs()) {
> int inflight =
> context.getInflightRepository().size(consumer.getEndpoint()); // check the
> number of inflight exchanges which is 0 because the UnitOfWork is not created
> if (consumer instanceof ShutdownAware) {
> inflight += ((ShutdownAware)
> consumer).getPendingExchangesSize(); // check the number of exchange in the
> seda queue which is 0 because the message is already removed
> }
> if (inflight > 0) {
> size += inflight;
> if (LOG.isDebugEnabled()) {
> LOG.debug(inflight + " inflight and pending
> exchanges for consumer: " + consumer);
> }
> }
> }
> {code}
> You can reproduce it by putting a breakpoint in the method {code}protected
> void sendToConsumers(Exchange exchange){code} in SedaConsumer and calling
> stop() on the CamelContext while the thread is suspended by the breakpoint.
> We caught the problem in a unit test where we were testing the shutdown and
> when our test server was under heavy load.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.