This is an automated email from the ASF dual-hosted git repository. zhfeng pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9ef5f081e82252cf061bf75362f766a44c8bf71a Author: Zheng Feng <[email protected]> AuthorDate: Thu Feb 24 15:03:56 2022 +0800 CAMEL-17474: camel-jta - Mark exchange transacted in JtaTransacionErr… (#7014) * CAMEL-17474: camel-jta - Mark exchange transacted in JtaTransacionErrorHandler * make changes according to comments --- .../TransactionalJtaTransactionPolicy.java | 42 +++++++++++++++ .../camel/jta/JtaTransactionErrorHandler.java | 19 +++++++ tests/camel-itest/pom.xml | 6 +++ .../org/apache/camel/itest/tx/JtaRouteTest.java | 60 ++++++++++++++++++++++ 4 files changed, 127 insertions(+) diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java index 680cc3a..a737996 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java @@ -16,6 +16,11 @@ */ package org.apache.camel.cdi.transaction; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.HeuristicMixedException; @@ -31,6 +36,8 @@ import org.apache.camel.jta.JtaTransactionPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.stream.Collectors.toList; + /** * Helper methods for transaction handling * @@ -47,6 +54,13 @@ public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPo "java:pm/TransactionManager", "java:/TransactionManager" }; + private static final String[] METHODS = new String[] { + "org.openejb.OpenEJB.getTransactionManager", + "com.arjuna.ats.jta.TransactionManager.transactionManager", + "com.bluestone.jta.SaTransactionManagerFactory.SaGetTransactionManager", + "com.sun.jts.jta.TransactionManagerImpl.getTransactionManagerImpl", + "com.inprise.visitransact.jta.TransactionManagerImpl.getTransactionManagerImpl", + }; protected TransactionManager transactionManager; @@ -69,6 +83,7 @@ public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPo } } + // todo: see @openjpa:openjpa-kernel/src/main/java/org/apache/openjpa/ee/AutomaticManagedRuntime.java private TransactionManager lookupTransactionManager() { TransactionManager tm; for (String jndiName : TRANSACTION_MANAGER_JNDI_NAMES) { @@ -81,6 +96,33 @@ public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPo LOG.debug("No JTA TransactionManager found at JNDI location [{}]", jndiName, ex); } } + List<ClassLoader> loaders = Stream.of(Thread.currentThread().getContextClassLoader(), getClass().getClassLoader()) + .filter(Objects::nonNull) + .distinct() + .collect(toList()); + for (String method : METHODS) { + final int sep = method.lastIndexOf('.'); + try { + Class<?> clazz = null; + for (final ClassLoader loader : loaders) { + try { + clazz = loader.loadClass(method.substring(0, sep)); + } catch (final NoClassDefFoundError | ClassNotFoundException cnfe) { + // continue + } + } + if (clazz != null) { + final Method getter = clazz.getDeclaredMethod(method.substring(sep + 1)); + getter.setAccessible(true); + final TransactionManager txMgr = (TransactionManager) getter.invoke(null); + if (txMgr != null) { + return txMgr; + } + } + } catch (final RuntimeException | ReflectiveOperationException | NoClassDefFoundError t) { + // no-op + } + } LOG.warn("Could not find the transaction manager through any of following locations: {}", String.join(",", TRANSACTION_MANAGER_JNDI_NAMES)); return null; diff --git a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java index 03ca157..dd3542e 100644 --- a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java +++ b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java @@ -18,7 +18,9 @@ package org.apache.camel.jta; import java.util.concurrent.ScheduledExecutorService; +import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.Predicate; import org.apache.camel.Processor; @@ -26,6 +28,8 @@ import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler; import org.apache.camel.processor.errorhandler.RedeliveryPolicy; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This error handler does redelivering. If the transaction fails it can be retried if configured to do so. In the @@ -34,6 +38,7 @@ import org.apache.camel.spi.ErrorHandler; */ public class JtaTransactionErrorHandler extends RedeliveryErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandler.class); private final JtaTransactionPolicy transactionPolicy; private final LoggingLevel rollbackLoggingLevel; @@ -64,4 +69,18 @@ public class JtaTransactionErrorHandler extends RedeliveryErrorHandler { return answer; } + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + if (!exchange.isTransacted()) { + try { + LOG.debug("Mark {} as transacted", exchange); + exchange.getUnitOfWork().beginTransactedBy("camel-jta"); + return super.process(exchange, callback); + } finally { + exchange.getUnitOfWork().endTransactedBy("camel-jta"); + } + } + + return super.process(exchange, callback); + } } diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml index 6f0a25e..675c69e 100644 --- a/tests/camel-itest/pom.xml +++ b/tests/camel-itest/pom.xml @@ -207,6 +207,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cdi</artifactId> + <scope>test</scope> + </dependency> + <!-- some TX tests using iBatis --> <dependency> <groupId>org.apache.derby</groupId> diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java new file mode 100644 index 0000000..2c95a0b --- /dev/null +++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.itest.tx; + +import org.apache.camel.EndpointInject; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class JtaRouteTest extends CamelTestSupport { + @EndpointInject("mock:splitted") + private MockEndpoint splitted; + + @EndpointInject("direct:requires_new") + private ProducerTemplate start; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new RequiresNewJtaTransactionPolicy()); + + from("direct:requires_new") + .transacted("PROPAGATION_REQUIRES_NEW") + .split(body()).delimiter("_").to("direct:splitted").end() + .log("after splitter log which you will never see...") + .transform().constant("requires_new"); + + from("direct:splitted").to("mock:splitted"); + } + }; + } + + @Test + void testTransactedSplit() throws Exception { + splitted.expectedBodiesReceived("requires", "new"); + + start.sendBody("requires_new"); + + splitted.assertIsSatisfied(); + } +}
