This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 089eb7a  CAMEL-17474: camel-jta - Mark exchange transacted in 
JtaTransacionErr… (#7014)
089eb7a is described below

commit 089eb7a60c177c3f0466b28e941e1fcbc183a122
Author: Amos Feng <[email protected]>
AuthorDate: Wed Feb 23 20:53:55 2022 +0800

    CAMEL-17474: camel-jta - Mark exchange transacted in JtaTransacionErr… 
(#7014)
    
    * CAMEL-17474: camel-jta - Mark exchange transacted in 
JtaTransacionErrorHandler
    
    * make changes according to comments
---
 .../camel/jta/JtaTransactionErrorHandler.java      | 19 +++++++
 tests/camel-itest/pom.xml                          |  6 +++
 .../org/apache/camel/itest/tx/JtaRouteTest.java    | 60 ++++++++++++++++++++++
 3 files changed, 85 insertions(+)

diff --git 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
index 03ca157..dd3542e 100644
--- 
a/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
+++ 
b/components/camel-jta/src/main/java/org/apache/camel/jta/JtaTransactionErrorHandler.java
@@ -18,7 +18,9 @@ package org.apache.camel.jta;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -26,6 +28,8 @@ import 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler;
 import org.apache.camel.processor.errorhandler.RedeliveryPolicy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This error handler does redelivering. If the transaction fails it can be 
retried if configured to do so. In the
@@ -34,6 +38,7 @@ import org.apache.camel.spi.ErrorHandler;
  */
 public class JtaTransactionErrorHandler extends RedeliveryErrorHandler {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JtaTransactionErrorHandler.class);
     private final JtaTransactionPolicy transactionPolicy;
     private final LoggingLevel rollbackLoggingLevel;
 
@@ -64,4 +69,18 @@ public class JtaTransactionErrorHandler extends 
RedeliveryErrorHandler {
         return answer;
     }
 
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        if (!exchange.isTransacted()) {
+            try {
+                LOG.debug("Mark {} as transacted", exchange);
+                exchange.getUnitOfWork().beginTransactedBy("camel-jta");
+                return super.process(exchange, callback);
+            } finally {
+                exchange.getUnitOfWork().endTransactedBy("camel-jta");
+            }
+        }
+
+        return super.process(exchange, callback);
+    }
 }
diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml
index 48b06ca..0336367 100644
--- a/tests/camel-itest/pom.xml
+++ b/tests/camel-itest/pom.xml
@@ -207,6 +207,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cdi-jta</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- some TX tests using iBatis -->
         <dependency>
             <groupId>org.apache.derby</groupId>
diff --git 
a/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
new file mode 100644
index 0000000..2c95a0b
--- /dev/null
+++ 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/tx/JtaRouteTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.tx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class JtaRouteTest extends CamelTestSupport {
+    @EndpointInject("mock:splitted")
+    private MockEndpoint splitted;
+
+    @EndpointInject("direct:requires_new")
+    private ProducerTemplate start;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new 
RequiresNewJtaTransactionPolicy());
+
+                from("direct:requires_new")
+                        .transacted("PROPAGATION_REQUIRES_NEW")
+                        
.split(body()).delimiter("_").to("direct:splitted").end()
+                        .log("after splitter log which you will never see...")
+                        .transform().constant("requires_new");
+
+                from("direct:splitted").to("mock:splitted");
+            }
+        };
+    }
+
+    @Test
+    void testTransactedSplit() throws Exception {
+        splitted.expectedBodiesReceived("requires", "new");
+
+        start.sendBody("requires_new");
+
+        splitted.assertIsSatisfied();
+    }
+}

Reply via email to