This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c806da0aa30e54a43f6b7c0cfe2083823e730ac9
Author: Croway <[email protected]>
AuthorDate: Sun Mar 22 11:36:31 2026 +0100

    JTA Shutdown rollback
---
 components/camel-jta/pom.xml                       |  63 +++++
 .../apache/camel/jta/TransactionErrorHandler.java  |  21 ++
 ...ansactionErrorHandlerGracePeriodShutdownIT.java | 262 +++++++++++++++++++++
 .../jta/TransactionErrorHandlerShutdownTest.java   | 148 ++++++++++++
 .../camel-jta/src/test/resources/log4j2.properties |  28 +++
 ...actionalClientDataSourceForcedShutdownTest.java | 125 ++++++++++
 .../camel/spring/spi/TransactionErrorHandler.java  |  26 ++
 parent/pom.xml                                     |   1 +
 8 files changed, 674 insertions(+)

diff --git a/components/camel-jta/pom.xml b/components/camel-jta/pom.xml
index 9bdb5f9c52fa..9fed72e22c55 100644
--- a/components/camel-jta/pom.xml
+++ b/components/camel-jta/pom.xml
@@ -53,5 +53,68 @@
             <artifactId>jakarta.transaction-api</artifactId>
             <version>${jakarta-transaction-api-version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-sql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-postgres</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${pgjdbc-driver-version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Narayana JTA transaction manager -->
+        <dependency>
+            <groupId>org.jboss.narayana.jta</groupId>
+            <artifactId>narayana-jta-jakarta</artifactId>
+            <version>${narayana-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss</groupId>
+            <artifactId>jboss-transaction-spi-jakarta</artifactId>
+            <version>${jboss-transaction-spi-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.logging</groupId>
+            <artifactId>jboss-logging</artifactId>
+            <version>${jboss-logging-version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Agroal connection pool with Narayana integration -->
+        <dependency>
+            <groupId>io.agroal</groupId>
+            <artifactId>agroal-pool</artifactId>
+            <version>${agroal-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.agroal</groupId>
+            <artifactId>agroal-narayana</artifactId>
+            <version>${agroal-version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
 
b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
index 2958309726fa..bdee9c0d97da 100644
--- 
a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
+++ 
b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java
@@ -18,7 +18,9 @@ package org.apache.camel.jta;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
 import jakarta.transaction.TransactionRolledbackException;
 
@@ -58,6 +60,7 @@ public class TransactionErrorHandler extends 
ErrorHandlerSupport
     private JtaTransactionPolicy transactionPolicy;
     private final String transactionKey;
     private final LoggingLevel rollbackLoggingLevel;
+    private final Set<Exchange> inflightTransactedExchanges = 
ConcurrentHashMap.newKeySet();
 
     /**
      * Creates the transaction error handler.
@@ -133,6 +136,7 @@ public class TransactionErrorHandler extends 
ErrorHandlerSupport
         try {
             // mark the beginning of this transaction boundary
             exchange.getUnitOfWork().beginTransactedBy(transactionKey);
+            inflightTransactedExchanges.add(exchange);
             // do in transaction
             logTransactionBegin(redelivered, ids);
             doInTransactionTemplate(exchange);
@@ -145,6 +149,7 @@ public class TransactionErrorHandler extends 
ErrorHandlerSupport
             exchange.setException(e);
             logTransactionRollback(redelivered, ids, e, false);
         } finally {
+            inflightTransactedExchanges.remove(exchange);
             // mark the end of this transaction boundary
             exchange.getUnitOfWork().endTransactedBy(transactionKey);
         }
@@ -196,6 +201,13 @@ public class TransactionErrorHandler extends 
ErrorHandlerSupport
                 // and now let process the exchange by the error handler
                 processByErrorHandler(exchange);
 
+                // if forced shutdown is in progress, mark the exchange for 
rollback
+                if (preparingShutdown) {
+                    LOG.debug("Forced shutdown in progress, marking exchange 
for rollback: {}",
+                            exchange.getExchangeId());
+                    exchange.setRollbackOnly(true);
+                }
+
                 // after handling and still an exception or marked as rollback
                 // only then rollback
                 if (exchange.getException() != null || 
exchange.isRollbackOnly()) {
@@ -346,5 +358,14 @@ public class TransactionErrorHandler extends 
ErrorHandlerSupport
         // prepare for shutdown, eg do not allow redelivery if configured
         LOG.trace("Prepare shutdown on error handler {}", this);
         preparingShutdown = true;
+        if (forced) {
+            // mark all in-flight transacted exchanges for rollback so the 
transaction
+            // is rolled back before the connection pool is destroyed during 
shutdown
+            for (Exchange exchange : inflightTransactedExchanges) {
+                LOG.debug("Marking in-flight transacted exchange for rollback 
due to forced shutdown: {}",
+                        exchange.getExchangeId());
+                exchange.setRollbackOnly(true);
+            }
+        }
     }
 }
diff --git 
a/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java
 
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java
new file mode 100644
index 000000000000..64ad1bc97e70
--- /dev/null
+++ 
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jta;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.transaction.Status;
+import jakarta.transaction.TransactionManager;
+
+import io.agroal.api.AgroalDataSource;
+import 
io.agroal.api.configuration.supplier.AgroalDataSourceConfigurationSupplier;
+import io.agroal.api.security.NamePrincipal;
+import io.agroal.api.security.SimplePassword;
+import io.agroal.narayana.NarayanaTransactionIntegration;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sql.SqlComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import 
org.apache.camel.test.infra.postgres.services.PostgresLocalContainerService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.postgresql.xa.PGXADataSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test that mimics a production scenario where a transacted route 
performs a SQL operation followed by a
+ * long-running step (e.g., a stored procedure call or a delay that exceeds 
the shutdown grace period).
+ *
+ * <p>
+ * The key behavior being tested: when the shutdown grace period expires and 
forced shutdown is triggered, the
+ * {@link TransactionErrorHandler} marks the in-flight exchange as {@code 
rollbackOnly}. After the delay/procedure
+ * completes, the exchange finishes processing <b>without an exception</b>, 
but the {@code rollbackOnly} flag causes the
+ * transaction to be rolled back instead of committed. Without the fix, the 
INSERT would be committed.
+ */
+public class TransactionErrorHandlerGracePeriodShutdownIT {
+
+    @RegisterExtension
+    static PostgresLocalContainerService postgres = new 
PostgresLocalContainerService();
+
+    private CamelContext camelContext;
+    private AgroalDataSource agroalDataSource;
+    private PGSimpleDataSource plainDataSource;
+    private TransactionManager tm;
+
+    private final CountDownLatch firstInsertDone = new CountDownLatch(1);
+    private final CountDownLatch delayStarted = new CountDownLatch(1);
+    private final CountDownLatch releaseLatch = new CountDownLatch(1);
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        // Narayana transaction manager and synchronization registry
+        tm = com.arjuna.ats.jta.TransactionManager.transactionManager();
+        jakarta.transaction.TransactionSynchronizationRegistry tsr
+                = new 
com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();
+
+        String jdbcUrl = postgres.jdbcUrl();
+
+        // Agroal DataSource with Narayana integration — same stack as Quarkus.
+        // Agroal automatically enlists connections in the active JTA 
transaction
+        // via NarayanaTransactionIntegration, no custom wrapper needed.
+        agroalDataSource = AgroalDataSource.from(
+                new AgroalDataSourceConfigurationSupplier()
+                        .connectionPoolConfiguration(pool -> pool
+                                .maxSize(5)
+                                .transactionIntegration(new 
NarayanaTransactionIntegration(tm, tsr))
+                                .connectionFactoryConfiguration(cf -> cf
+                                        
.connectionProviderClass(PGXADataSource.class)
+                                        .jdbcUrl(jdbcUrl)
+                                        .principal(new 
NamePrincipal(postgres.userName()))
+                                        .credential(new 
SimplePassword(postgres.password())))));
+
+        // Plain DataSource for assertions (outside of JTA)
+        plainDataSource = new PGSimpleDataSource();
+        plainDataSource.setServerNames(new String[] { postgres.host() });
+        plainDataSource.setPortNumbers(new int[] { postgres.port() });
+        plainDataSource.setDatabaseName(postgres.database());
+        plainDataSource.setUser(postgres.userName());
+        plainDataSource.setPassword(postgres.password());
+
+        // Create test table
+        try (Connection conn = plainDataSource.getConnection();
+             Statement stmt = conn.createStatement()) {
+            stmt.execute("DROP TABLE IF EXISTS orders");
+            stmt.execute("CREATE TABLE orders (id SERIAL PRIMARY KEY, item 
VARCHAR(255))");
+        }
+
+        // JtaTransactionPolicy backed by Narayana (same pattern as Quarkus 
RequiredJtaTransactionPolicy)
+        JtaTransactionPolicy requiredPolicy = new JtaTransactionPolicy() {
+            @Override
+            public void run(Runnable runnable) throws Throwable {
+                boolean isNew = tm.getStatus() == Status.STATUS_NO_TRANSACTION;
+                if (isNew) {
+                    tm.begin();
+                }
+                try {
+                    runnable.run();
+                } catch (Throwable e) {
+                    if (isNew) {
+                        tm.rollback();
+                    } else {
+                        tm.setRollbackOnly();
+                    }
+                    throw e;
+                }
+                if (isNew) {
+                    tm.commit();
+                }
+            }
+        };
+
+        camelContext = new DefaultCamelContext();
+
+        // Short shutdown timeout: simulates the production grace period 
expiring
+        camelContext.getShutdownStrategy().setTimeout(2);
+        camelContext.getShutdownStrategy().setTimeUnit(TimeUnit.SECONDS);
+
+        // Register Agroal DataSource for camel-sql
+        SqlComponent sqlComponent = new SqlComponent();
+        sqlComponent.setDataSource(agroalDataSource);
+        camelContext.addComponent("sql", sqlComponent);
+
+        camelContext.getRegistry().bind("PROPAGATION_REQUIRED", 
requiredPolicy);
+
+        // Route: SQL INSERT then a long delay.
+        // No steps after the delay — the exchange completes normally (no 
exception).
+        // The ONLY thing that should cause rollback is the rollbackOnly flag
+        // set by TransactionErrorHandler.prepareShutdown(forced=true).
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").routeId("transactedRoute")
+                        .transacted()
+                        // SQL insert — completes successfully and is enlisted 
in the JTA transaction
+                        .to("sql:INSERT INTO orders(item) VALUES ('first')")
+                        .process(exchange -> firstInsertDone.countDown())
+                        // Long delay (simulates stored procedure or 
long-running operation
+                        // that exceeds the shutdown grace period)
+                        .process(exchange -> {
+                            delayStarted.countDown();
+                            releaseLatch.await(30, TimeUnit.SECONDS);
+                        });
+            }
+        });
+
+        camelContext.start();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        releaseLatch.countDown();
+        if (camelContext != null && camelContext.isStarted()) {
+            camelContext.stop();
+        }
+        if (agroalDataSource != null) {
+            agroalDataSource.close();
+        }
+    }
+
+    /**
+     * Simulates the production scenario:
+     * <ol>
+     * <li>Exchange enters transacted route</li>
+     * <li>SQL INSERT ('first') completes against PostgreSQL (enlisted in JTA 
tx)</li>
+     * <li>Long delay begins (simulating stored procedure exceeding grace 
period)</li>
+     * <li>CamelContext.stop() -> DefaultShutdownStrategy waits 2s -> forced 
shutdown</li>
+     * <li>TransactionErrorHandler.prepareShutdown(forced=true) marks exchange 
rollbackOnly</li>
+     * <li>context.stop() returns (timeout expired, in-flight exchange still 
running)</li>
+     * <li>Delay released, exchange completes normally (no exception)</li>
+     * <li>TransactionErrorHandler checks preparingShutdown -> sets 
rollbackOnly -> throws</li>
+     * <li>JtaTransactionPolicy.run() catches -> Narayana rolls back the JTA 
transaction</li>
+     * <li>Assert: the INSERT is NOT in the database (rolled back)</li>
+     * </ol>
+     *
+     * <p>
+     * Without the fix, step 8 does not happen — the exchange commits normally 
because there is no exception and
+     * rollbackOnly is never set. The INSERT persists in the database.
+     */
+    @Test
+    public void testForcedShutdownRollsBackDatabaseTransaction() throws 
Exception {
+        assertEquals(0, countOrders(), "Table should be empty initially");
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            // Send message in a background thread (the route blocks during 
the delay)
+            executor.submit(() -> {
+                try {
+                    
camelContext.createProducerTemplate().sendBody("direct:start", "trigger");
+                } catch (Exception e) {
+                    // expected — rollback triggers 
TransactionRolledbackException
+                }
+                return null;
+            });
+
+            assertTrue(firstInsertDone.await(10, TimeUnit.SECONDS),
+                    "SQL insert should have completed");
+
+            assertTrue(delayStarted.await(10, TimeUnit.SECONDS),
+                    "Long delay should have started");
+
+            // Stop the CamelContext. The DefaultShutdownStrategy will:
+            //   1. Stop the direct consumer (no new messages)
+            //   2. Wait up to 2s for in-flight exchanges
+            //   3. Exchange is stuck in the delay -> timeout expires
+            //   4. Forced shutdown -> prepareShutdown(false, true) on 
TransactionErrorHandler
+            //   5. context.stop() returns (in-flight exchange still blocked)
+            camelContext.stop();
+
+            // Now release the delay. The exchange wakes up and finishes 
processing.
+            // The route has no more steps, so processByErrorHandler() returns 
normally.
+            // Back in doInTransactionTemplate(), the preparingShutdown check 
(the fix)
+            // sets rollbackOnly, causing the transaction to be rolled back.
+            releaseLatch.countDown();
+
+            executor.shutdown();
+            assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS),
+                    "Exchange processing should have completed");
+
+            int finalCount = countOrders();
+            assertEquals(0, finalCount,
+                    "No rows should be in the database — the JTA transaction 
should have been rolled back. "
+                                        + "Found " + finalCount + " rows 
instead.");
+        } finally {
+            releaseLatch.countDown();
+            executor.shutdownNow();
+        }
+    }
+
+    private int countOrders() throws SQLException {
+        try (Connection conn = plainDataSource.getConnection();
+             Statement stmt = conn.createStatement();
+             ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM orders")) {
+            rs.next();
+            return rs.getInt(1);
+        }
+    }
+}
diff --git 
a/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java
 
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java
new file mode 100644
index 000000000000..d6b436080788
--- /dev/null
+++ 
b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jta;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.DefaultUnitOfWork;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test that verifies in-flight transacted exchanges are rolled back when a 
forced shutdown occurs on the JTA
+ * TransactionErrorHandler.
+ */
+public class TransactionErrorHandlerShutdownTest {
+
+    private CamelContext camelContext;
+    private final CountDownLatch processingStarted = new CountDownLatch(1);
+    private final CountDownLatch releaseLatch = new CountDownLatch(1);
+    private final AtomicBoolean rollbackTriggered = new AtomicBoolean(false);
+    private TransactionErrorHandler transactionErrorHandler;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        camelContext = new DefaultCamelContext();
+
+        // Create a test JtaTransactionPolicy that tracks whether rollback was 
triggered
+        JtaTransactionPolicy testPolicy = new JtaTransactionPolicy() {
+            @Override
+            public void run(Runnable runnable) throws Throwable {
+                try {
+                    runnable.run();
+                } catch (Throwable t) {
+                    rollbackTriggered.set(true);
+                    throw t;
+                }
+            }
+        };
+
+        // Create the TransactionErrorHandler with a processor that blocks
+        Processor blockingProcessor = new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                processingStarted.countDown();
+                releaseLatch.await(30, TimeUnit.SECONDS);
+            }
+        };
+
+        transactionErrorHandler = new TransactionErrorHandler(
+                camelContext, blockingProcessor, testPolicy, 
LoggingLevel.WARN);
+
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:test").routeId("testRoute")
+                        .process(blockingProcessor);
+            }
+        });
+
+        camelContext.start();
+        transactionErrorHandler.start();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        releaseLatch.countDown(); // ensure we don't hang
+        if (transactionErrorHandler != null) {
+            transactionErrorHandler.stop();
+        }
+        if (camelContext != null) {
+            camelContext.stop();
+        }
+    }
+
+    @Test
+    public void testForcedShutdownMarksExchangeForRollback() throws Exception {
+        AtomicReference<Exchange> exchangeRef = new AtomicReference<>();
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            // process an exchange through the TransactionErrorHandler in a 
separate thread
+            executor.submit(() -> {
+                Exchange exchange = 
camelContext.getEndpoint("direct:test").createExchange();
+                exchange.getIn().setBody("test");
+                // set up UnitOfWork so transacted tracking works
+                DefaultUnitOfWork uow = new DefaultUnitOfWork(exchange);
+                exchange.getExchangeExtension().setUnitOfWork(uow);
+                exchangeRef.set(exchange);
+                try {
+                    transactionErrorHandler.process(exchange);
+                } catch (Exception e) {
+                    // expected - rollback may throw
+                }
+                return null;
+            });
+
+            // wait for the exchange to enter the blocking processor
+            assertTrue(processingStarted.await(10, TimeUnit.SECONDS), 
"Exchange should have started processing");
+
+            // simulate forced shutdown
+            transactionErrorHandler.prepareShutdown(false, true);
+
+            // release the blocking processor
+            releaseLatch.countDown();
+
+            // wait for processing to complete
+            executor.shutdown();
+            assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), 
"Processing should have completed");
+
+            // verify the transaction was rolled back
+            // before the fix: rollbackTriggered = false (exchange completes 
normally, transaction commits)
+            // after the fix: rollbackTriggered = true (exchange marked 
rollbackOnly, transaction rolls back)
+            assertTrue(rollbackTriggered.get(),
+                    "Transaction should have been rolled back due to forced 
shutdown");
+        } finally {
+            releaseLatch.countDown();
+            executor.shutdown();
+        }
+    }
+}
diff --git a/components/camel-jta/src/test/resources/log4j2.properties 
b/components/camel-jta/src/test/resources/log4j2.properties
new file mode 100644
index 000000000000..d9ae9de26bd8
--- /dev/null
+++ b/components/camel-jta/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-jta-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git 
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java
 
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java
new file mode 100644
index 000000000000..b8258f108bd0
--- /dev/null
+++ 
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.camel.spring.spi.TransactionErrorHandler;
+import org.apache.camel.support.service.ServiceHelper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test that verifies in-flight transacted exchanges are rolled back when a 
forced shutdown occurs.
+ */
+public class TransactionalClientDataSourceForcedShutdownTest extends 
TransactionClientDataSourceSupport {
+
+    private final CountDownLatch firstInsertDone = new CountDownLatch(1);
+    private final CountDownLatch releaseLatch = new CountDownLatch(1);
+
+    @Test
+    public void testForcedShutdownRollsBackInFlightTransaction() throws 
Exception {
+        // verify initial state: 1 book from init.sql
+        int initialCount = jdbc.queryForObject("select count(*) from books", 
Integer.class);
+        assertEquals(1, initialCount, "Initial number of books");
+
+        // send message asynchronously since the route will block
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            executor.submit(() -> {
+                template.sendBody("direct:forceShutdown", "Hello World");
+                return null;
+            });
+
+            // wait for the first insert to complete
+            firstInsertDone.await(10, TimeUnit.SECONDS);
+
+            // find the TransactionErrorHandler in the route's services and 
call prepareShutdown
+            Route route = context.getRoute("forceShutdownRoute");
+            TransactionErrorHandler teh = findTransactionErrorHandler(route);
+            if (teh != null) {
+                teh.prepareShutdown(false, true);
+            }
+
+            // release the blocking processor so the exchange can complete
+            releaseLatch.countDown();
+
+            // give the exchange time to complete
+            Thread.sleep(2000);
+
+            // verify that the transaction was rolled back
+            // before the fix: count = 3 (original + 2 inserts committed)
+            // after the fix: count = 1 (both inserts rolled back)
+            int count = jdbc.queryForObject("select count(*) from books", 
Integer.class);
+            assertEquals(1, count, "Number of books after forced shutdown - 
transaction should have been rolled back");
+        } finally {
+            releaseLatch.countDown(); // ensure we don't hang if test fails
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+    }
+
+    private TransactionErrorHandler findTransactionErrorHandler(Route route) {
+        Processor processor = route.getProcessor();
+        if (processor instanceof Service service) {
+            Set<Service> children = ServiceHelper.getChildServices(service, 
true);
+            for (Service child : children) {
+                if (child instanceof TransactionErrorHandler teh) {
+                    return teh;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new SpringRouteBuilder() {
+            public void configure() throws Exception {
+                SpringTransactionPolicy required = 
lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class);
+                errorHandler(transactionErrorHandler(required));
+
+                from("direct:forceShutdown").routeId("forceShutdownRoute")
+                        .policy(required)
+                        .setBody(constant("Tiger in 
Action")).bean("bookService")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                // signal that the first insert is done
+                                firstInsertDone.countDown();
+                                // block until released
+                                releaseLatch.await(30, TimeUnit.SECONDS);
+                            }
+                        })
+                        .setBody(constant("Elephant in 
Action")).bean("bookService");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
 
b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
index f31374ddd8bb..8afd21871dbe 100644
--- 
a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
+++ 
b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spring.spi;
 
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -49,6 +51,7 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
     private final TransactionTemplate transactionTemplate;
     private final String transactionKey;
     private final LoggingLevel rollbackLoggingLevel;
+    private final Set<Exchange> inflightTransactedExchanges = 
ConcurrentHashMap.newKeySet();
 
     /**
      * Creates the transaction error handler.
@@ -149,6 +152,7 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
             if (exchange.getUnitOfWork() != null) {
                 exchange.getUnitOfWork().beginTransactedBy(transactionKey);
             }
+            inflightTransactedExchanges.add(exchange);
 
             // do in transaction
             logTransactionBegin(redelivered, ids);
@@ -162,6 +166,7 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
             exchange.setException(e);
             logTransactionRollback(redelivered, ids, e, false);
         } finally {
+            inflightTransactedExchanges.remove(exchange);
             // mark the end of this transaction boundary
             if (exchange.getUnitOfWork() != null) {
                 exchange.getUnitOfWork().endTransactedBy(transactionKey);
@@ -206,6 +211,13 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
                 // and now let process the exchange by the error handler
                 processByErrorHandler(exchange);
 
+                // if forced shutdown is in progress, mark the exchange for 
rollback
+                if (preparingShutdown) {
+                    LOG.debug("Forced shutdown in progress, marking exchange 
for rollback: {}",
+                            exchange.getExchangeId());
+                    exchange.setRollbackOnly(true);
+                }
+
                 // after handling and still an exception or marked as rollback 
only then rollback
                 if (exchange.getException() != null || 
exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {
 
@@ -326,6 +338,20 @@ public class TransactionErrorHandler extends 
RedeliveryErrorHandler {
         }
     }
 
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        super.prepareShutdown(suspendOnly, forced);
+        if (forced) {
+            // mark all in-flight transacted exchanges for rollback so the 
transaction
+            // is rolled back before the connection pool is destroyed during 
shutdown
+            for (Exchange exchange : inflightTransactedExchanges) {
+                LOG.debug("Marking in-flight transacted exchange for rollback 
due to forced shutdown: {}",
+                        exchange.getExchangeId());
+                exchange.setRollbackOnly(true);
+            }
+        }
+    }
+
     private static String propagationBehaviorToString(int propagationBehavior) 
{
         String rc;
         switch (propagationBehavior) {
diff --git a/parent/pom.xml b/parent/pom.xml
index ba5820c3fa1d..1fc83d65805f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -60,6 +60,7 @@
         <!-- dependency versions -->
         <activemq-version>5.19.2</activemq-version>
         <activemq6-version>6.2.1</activemq6-version>
+        <agroal-version>3.0</agroal-version>
         <activemq-artemis-version>2.44.0</activemq-artemis-version>
         <allegro-converter-version>0.3.0</allegro-converter-version>
         <amazon-kinesis-client-version>3.4.1</amazon-kinesis-client-version>


Reply via email to