This is an automated email from the ASF dual-hosted git repository.

zhfeng pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9ef5f081e82252cf061bf75362f766a44c8bf71a
Author: Zheng Feng <[email protected]>
AuthorDate: Thu Feb 24 15:03:56 2022 +0800

    CAMEL-17474: camel-jta - Mark exchange transacted in JtaTransacionErr… 
(#7014)
    
    * CAMEL-17474: camel-jta - Mark exchange transacted in 
JtaTransacionErrorHandler
    
    * make changes according to comments
---
 .../TransactionalJtaTransactionPolicy.java         | 42 +++++++++++++++
 .../camel/jta/JtaTransactionErrorHandler.java      | 19 +++++++
 tests/camel-itest/pom.xml                          |  6 +++
 .../org/apache/camel/itest/tx/JtaRouteTest.java    | 60 ++++++++++++++++++++++
 4 files changed, 127 insertions(+)

diff --git 
a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java
 
b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java
index 680cc3a..a737996 100644
--- 
a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java
+++ 
b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/TransactionalJtaTransactionPolicy.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.cdi.transaction;
 
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import javax.transaction.HeuristicMixedException;
@@ -31,6 +36,8 @@ import org.apache.camel.jta.JtaTransactionPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.stream.Collectors.toList;
+
 /**
  * Helper methods for transaction handling
  *
@@ -47,6 +54,13 @@ public abstract class TransactionalJtaTransactionPolicy 
extends JtaTransactionPo
             "java:pm/TransactionManager",
             "java:/TransactionManager"
     };
+    private static final String[] METHODS = new String[] {
+            "org.openejb.OpenEJB.getTransactionManager",
+            "com.arjuna.ats.jta.TransactionManager.transactionManager",
+            
"com.bluestone.jta.SaTransactionManagerFactory.SaGetTransactionManager",
+            "com.sun.jts.jta.TransactionManagerImpl.getTransactionManagerImpl",
+            
"com.inprise.visitransact.jta.TransactionManagerImpl.getTransactionManagerImpl",
+    };
 
     protected TransactionManager transactionManager;
 
@@ -69,6 +83,7 @@ public abstract class TransactionalJtaTransactionPolicy 
extends JtaTransactionPo
         }
     }
 
+    // todo: see 
@openjpa:openjpa-kernel/src/main/java/org/apache/openjpa/ee/AutomaticManagedRuntime.java
     private TransactionManager lookupTransactionManager() {
         TransactionManager tm;
         for (String jndiName : TRANSACTION_MANAGER_JNDI_NAMES) {
@@ -81,6 +96,33 @@ public abstract class TransactionalJtaTransactionPolicy 
extends JtaTransactionPo
                 LOG.debug("No JTA TransactionManager found at JNDI location 
[{}]", jndiName, ex);
             }
         }
+        List<ClassLoader> loaders = 
Stream.of(Thread.currentThread().getContextClassLoader(), 
getClass().getClassLoader())
+                .filter(Objects::nonNull)
+                .distinct()
+                .collect(toList());
+        for (String method : METHODS) {
+            final int sep = method.lastIndexOf('.');
+            try {
+                Class<?> clazz = null;
+                for (final ClassLoader loader : loaders) {
+                    try {
+                        clazz = loader.loadClass(method.substring(0, sep));
+                    } catch (final NoClassDefFoundError | 
ClassNotFoundException cnfe) {
+                        // continue
+                    }
+                }
+                if (clazz != null) {
+                    final Method getter = 
clazz.getDeclaredMethod(method.substring(sep + 1));
+                    getter.setAccessible(true);
+                    final TransactionManager txMgr = (TransactionManager) 
getter.invoke(null);
+                    if (txMgr != null) {
+                        return txMgr;
+                    }
+                }
+            } catch (final RuntimeException | ReflectiveOperationException | 
NoClassDefFoundError t) {
+                // no-op
+            }
+        }
         LOG.warn("Could not find the transaction manager through any of 
following locations: {}",
                 String.join(",", TRANSACTION_MANAGER_JNDI_NAMES));
         return null;
diff --git 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
index 03ca157..dd3542e 100644
--- 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
+++ 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
@@ -18,7 +18,9 @@ package org.apache.camel.jta;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -26,6 +28,8 @@ import 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
 import org.apache.camel.processor.errorhandler.RedeliveryPolicy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This error handler does redelivering. If the transaction fails it can be 
retried if configured to do so. In the
@@ -34,6 +38,7 @@ import org.apache.camel.spi.ErrorHandler;
  */
 public class JtaTransactionErrorHandler extends RedeliveryErrorHandler {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JtaTransactionErrorHandler.class);
     private final JtaTransactionPolicy transactionPolicy;
     private final LoggingLevel rollbackLoggingLevel;
 
@@ -64,4 +69,18 @@ public class JtaTransactionErrorHandler extends 
RedeliveryErrorHandler {
         return answer;
     }
 
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        if (!exchange.isTransacted()) {
+            try {
+                LOG.debug("Mark {} as transacted", exchange);
+                exchange.getUnitOfWork().beginTransactedBy("camel-jta");
+                return super.process(exchange, callback);
+            } finally {
+                exchange.getUnitOfWork().endTransactedBy("camel-jta");
+            }
+        }
+
+        return super.process(exchange, callback);
+    }
 }
diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml
index 6f0a25e..675c69e 100644
--- a/tests/camel-itest/pom.xml
+++ b/tests/camel-itest/pom.xml
@@ -207,6 +207,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cdi</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- some TX tests using iBatis -->
         <dependency>
             <groupId>org.apache.derby</groupId>
diff --git 
a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
new file mode 100644
index 0000000..2c95a0b
--- /dev/null
+++ 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class JtaRouteTest extends CamelTestSupport {
+    @EndpointInject("mock:splitted")
+    private MockEndpoint splitted;
+
+    @EndpointInject("direct:requires_new")
+    private ProducerTemplate start;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new 
RequiresNewJtaTransactionPolicy());
+
+                from("direct:requires_new")
+                        .transacted("PROPAGATION_REQUIRES_NEW")
+                        
.split(body()).delimiter("_").to("direct:splitted").end()
+                        .log("after splitter log which you will never see...")
+                        .transform().constant("requires_new");
+
+                from("direct:splitted").to("mock:splitted");
+            }
+        };
+    }
+
+    @Test
+    void testTransactedSplit() throws Exception {
+        splitted.expectedBodiesReceived("requires", "new");
+
+        start.sendBody("requires_new");
+
+        splitted.assertIsSatisfied();
+    }
+}

Reply via email to