Repository: camel Updated Branches: refs/heads/camel-2.18.x cc704cd2e -> c82284cfe refs/heads/master 60952ba8d -> f6c27f9a3
CAMEL-10873: camel-sjms transacted routes dead-lock when exceptions are thrown by asynchronous processors. Thanks to Daniele Fognini for test case. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f6c27f9a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f6c27f9a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f6c27f9a Branch: refs/heads/master Commit: f6c27f9a304118536d3a9c760b8841933bb80a7e Parents: 60952ba Author: Claus Ibsen <[email protected]> Authored: Fri Feb 24 14:17:09 2017 +0100 Committer: Claus Ibsen <[email protected]> Committed: Fri Feb 24 14:17:43 2017 +0100 ---------------------------------------------------------------------- .../sjms/consumer/AbstractMessageHandler.java | 8 +- .../tx/SessionTransactionSynchronization.java | 36 +++---- .../sjms/tx/TransactedAsyncExceptionTest.java | 108 +++++++++++++++++++ 3 files changed, 131 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f6c27f9a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java index 2cbc2ea..2b61bc3 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java @@ -75,9 +75,6 @@ public abstract class AbstractMessageHandler implements MessageListener { log.debug("Processing Exchange.id:{}", exchange.getExchangeId()); if (isTransacted()) { - if (synchronization != null) { - exchange.addOnCompletion(synchronization); - } if (isSharedJMSSession()) { // Propagate a JMS Session as an initiator if sharedJMSSession is enabled exchange.getIn().setHeader(SjmsConstants.JMS_SESSION, getSession()); @@ -87,6 +84,11 @@ public abstract class AbstractMessageHandler implements MessageListener { if (isTransacted() || isSynchronous()) { log.debug("Handling synchronous message: {}", exchange.getIn().getBody()); handleMessage(exchange); + if (exchange.isFailed()) { + synchronization.onFailure(exchange); + } else { + synchronization.onComplete(exchange); + } } else { log.debug("Handling asynchronous message: {}", exchange.getIn().getBody()); executor.execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/camel/blob/f6c27f9a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java index 433c20e..d2cf09f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java @@ -20,16 +20,19 @@ import javax.jms.Session; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.TransactionCommitStrategy; -import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.SynchronizationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exhcnage}. + * SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exchange}. + * <p/> + * The commit or rollback on the {@link Session} must be performed from the same thread that consumed the message. */ -public class SessionTransactionSynchronization implements Synchronization { - private Logger log = LoggerFactory.getLogger(getClass()); - private Session session; +public class SessionTransactionSynchronization extends SynchronizationAdapter { + private static final Logger LOG = LoggerFactory.getLogger(SessionTransactionSynchronization.class); + + private final Session session; private final TransactionCommitStrategy commitStrategy; public SessionTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) { @@ -41,42 +44,39 @@ public class SessionTransactionSynchronization implements Synchronization { } } - /** - * @param exchange - * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange) - */ + @Override public void onFailure(Exchange exchange) { try { if (commitStrategy.rollback(exchange)) { - log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId()); + LOG.debug("Processing failure of Exchange id: {}", exchange.getExchangeId()); if (session != null && session.getTransacted()) { session.rollback(); } } } catch (Exception e) { - log.warn("Failed to rollback the session: {}", e.getMessage()); + LOG.warn("Failed to rollback the session: {}", e.getMessage()); } } - /** - * @param exchange - * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange - *) - */ @Override public void onComplete(Exchange exchange) { try { if (commitStrategy.commit(exchange)) { - log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId()); + LOG.debug("Processing completion of Exchange id: {}", exchange.getExchangeId()); if (session != null && session.getTransacted()) { session.commit(); } } } catch (Exception e) { - log.warn("Failed to commit the session: {}", e.getMessage()); + LOG.warn("Failed to commit the session: {}", e.getMessage()); exchange.setException(e); } } + @Override + public boolean allowHandover() { + // must not handover as we should be synchronous + return false; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/f6c27f9a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedAsyncExceptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedAsyncExceptionTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedAsyncExceptionTest.java new file mode 100644 index 0000000..14a31fc --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedAsyncExceptionTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.tx; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.SjmsComponent; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class TransactedAsyncExceptionTest extends CamelTestSupport { + + private static final String BROKER_URI = "vm://tqc_test_broker?broker.persistent=false&broker.useJmx=false"; + + private static final int TRANSACTION_REDELIVERY_COUNT = 10; + + @Test + public void testRouteWithThread() throws Exception { + String destination = "sjms:queue:async.exception"; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + AtomicInteger counter = new AtomicInteger(); + + from(destination + "?acknowledgementMode=SESSION_TRANSACTED&transacted=true") + .threads() + .process(exchange -> { + if (counter.incrementAndGet() < TRANSACTION_REDELIVERY_COUNT) { + throw new IllegalArgumentException(); + } + }) + .to("mock:async.exception"); + } + }); + + template.sendBody(destination, "begin"); + + MockEndpoint mockEndpoint = context.getEndpoint("mock:async.exception", MockEndpoint.class); + + mockEndpoint.expectedMessageCount(1); + if (!mockEndpoint.await(getShutdownTimeout(), TimeUnit.SECONDS)) { + dumpThreads(); + } + assertMockEndpointsSatisfied(getShutdownTimeout(), TimeUnit.SECONDS); + } + + private void dumpThreads() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), Integer.MAX_VALUE)) { + if (Thread.State.BLOCKED.equals(threadInfo.getThreadState())) { + log.error("blocked thread: {}", threadInfo); + } else { + log.info("normal thread: {}", threadInfo); + } + log.info("full stack: {}", Arrays.stream(threadInfo.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n\t"))); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URI); + + connectionFactory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); + connectionFactory.getRedeliveryPolicy().setRedeliveryDelay(0); + connectionFactory.getRedeliveryPolicy().setUseCollisionAvoidance(false); + connectionFactory.getRedeliveryPolicy().setUseExponentialBackOff(false); + connectionFactory.getRedeliveryPolicy().setMaximumRedeliveries(TRANSACTION_REDELIVERY_COUNT); + + SjmsComponent component = new SjmsComponent(); + component.setConnectionFactory(connectionFactory); + camelContext.addComponent("sjms", component); + + return camelContext; + } + + @Override + protected int getShutdownTimeout() { + return 2; + } + +}
