This is an automated email from the ASF dual-hosted git repository.

martinweiler pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a9a0e4ca1 [incubator-kie-issues#2270] Ensure Kafka event emitter is 
participating in the existing transaction (#4239)
1a9a0e4ca1 is described below

commit 1a9a0e4ca1385dad12ec0a32a0a86b4f3c097458
Author: Martin Weiler <[email protected]>
AuthorDate: Wed May 6 13:08:50 2026 -0600

    [incubator-kie-issues#2270] Ensure Kafka event emitter is participating in 
the existing transaction (#4239)
    
    * [incubator-kie-issues#2270] Ensure Kafka event emitter is participating 
in the existing transaction
    
    * Add fallbackExecution in case of missing transaction
    
    * Add check for transaction status, eg. in case of transaction timeout
    
    * Add Quarkus integration test to verify rollback behavior
    
    * Add Spring Boot integration test to verify rollback behavior
    
    * Handle STATUS_NO_TRANSACTION
---
 .../events/TxEventEmitterQuarkusTemplate.java      |  20 +-
 .../events/TxEventEmitterSpringTemplate.java       |  40 ++--
 .../integration-tests-quarkus-transactions/pom.xml | 174 +++++++++++++++++
 .../src/main/resources/application.properties      |  39 ++++
 .../src/main/resources/tx_rollback_test.bpmn       | 145 ++++++++++++++
 .../quarkus/TransactionRollbackMessageIT.java      | 206 ++++++++++++++++++++
 .../src/test/resources/application.properties      |  22 +++
 quarkus/integration-tests/pom.xml                  |   1 +
 .../pom.xml                                        | 194 +++++++++++++++++++
 .../springboot/KogitoSpringbootApplication.java    |  30 +++
 .../src/main/resources/application.properties      |  38 ++++
 .../src/main/resources/tx_rollback_test.bpmn       | 145 ++++++++++++++
 .../springboot/TransactionRollbackMessageTest.java | 212 +++++++++++++++++++++
 springboot/integration-tests/pom.xml               |   1 +
 14 files changed, 1252 insertions(+), 15 deletions(-)

diff --git 
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
 
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
index b414d323c6..52ced75cd0 100644
--- 
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
+++ 
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterQuarkusTemplate.java
@@ -25,7 +25,8 @@ import jakarta.enterprise.event.Observes;
 import jakarta.enterprise.event.TransactionPhase;
 import jakarta.inject.Inject;
 import jakarta.inject.Named;
-import jakarta.transaction.Transactional;
+import jakarta.transaction.Status;
+import jakarta.transaction.TransactionManager;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -64,6 +65,9 @@ public class $ClassName$ extends 
AbstractQuarkusCloudEventEmitter<$Type$> {
     @Inject
     MessageDecoratorProvider messageDecorator;
 
+    @Inject
+    TransactionManager transactionManager;
+
     class EmitEventType {
         DataEvent<?> data;
 
@@ -73,18 +77,28 @@ public class $ClassName$ extends 
AbstractQuarkusCloudEventEmitter<$Type$> {
     }
 
     public void observe(@Observes(during = TransactionPhase.AFTER_SUCCESS) 
EmitEventType emitEventType) {
-        logger.debug("publishing event {}", emitEventType.data);
         try {
+            // Verify transaction was actually committed successfully
+            int status = transactionManager.getStatus();
+            if (status != Status.STATUS_COMMITTED && status != 
Status.STATUS_NO_TRANSACTION) {
+                logger.debug("Skipping event publication - transaction status 
is {} (not committed)", status);
+                return;
+            }
+
+            logger.debug("publishing event {}", emitEventType.data);
             Message<$Type$> message = 
messageDecorator.decorate(getMessage(emitEventType.data));
             emitter.send(message);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
+        } catch (Exception e) {
+            logger.error("Error checking transaction status or publishing 
event", e);
+            throw new RuntimeException(e);
         }
     }
 
     @Override
-    @Transactional
     public void emit(DataEvent<?> dataEvent) {
+        logger.debug("emit event {}", dataEvent);
         event.fire(new EmitEventType(dataEvent));
     }
 
diff --git 
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
 
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
index f94c607e9f..1ac2ef60e4 100644
--- 
a/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
+++ 
b/kogito-codegen-modules/kogito-codegen-events/src/main/resources/class-templates/events/TxEventEmitterSpringTemplate.java
@@ -20,42 +20,58 @@ package org.kie.kogito.addon.cloudevents.spring;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
 
-import org.kie.kogito.config.ConfigBean;
 import org.kie.kogito.event.CloudEventMarshaller;
 import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.EventEmitter;
 import org.kie.kogito.event.EventMarshaller;
-import org.kie.kogito.event.EventUnmarshaller;
-import org.kie.kogito.event.KogitoEventStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.core.env.Environment;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
 import org.kie.kogito.addon.cloudevents.spring.KogitoMessaging;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
- * Spring implementation delegating to kafka template
+ * Spring transactional implementation using @TransactionalEventListener
+ * to ensure Kafka messages are only sent after database transaction commits
  */
 @Component("Emitter-$ChannelName$")
-@Transactional
 public class $ClassName$ implements EventEmitter {
 
+    private static final Logger logger = 
LoggerFactory.getLogger($ClassName$.class);
+
     @Autowired
     org.springframework.kafka.core.KafkaTemplate<String, $Type$> emitter;
 
     @Autowired
     ObjectMapper mapper;
 
+    @Autowired
+    ApplicationEventPublisher eventPublisher;
+
+    static class EmitEventType {
+        final DataEvent<?> data;
+
+        public EmitEventType(DataEvent<?> data) {
+            this.data = data;
+        }
+    }
+
+    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, 
fallbackExecution = true)
+    public void observe(EmitEventType emitEventType) {
+        logger.debug("publishing event {}", emitEventType.data);
+        emitter.send("$Topic$", toTopicType(emitEventType.data));
+    }
+
     @Override
     public void emit(DataEvent<?> event) {
-        emitter.send("$Topic$", toTopicType(event));
+        logger.debug("emit event {}", event);
+        eventPublisher.publishEvent(new EmitEventType(event));
     }
 
     private $Type$ toTopicTypeCloud(DataEvent<?> event) {
diff --git 
a/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml
new file mode 100644
index 0000000000..aab54aa620
--- /dev/null
+++ b/quarkus/integration-tests/integration-tests-quarkus-transactions/pom.xml
@@ -0,0 +1,174 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.kie.kogito</groupId>
+    <artifactId>kogito-quarkus-integration-tests</artifactId>
+    <version>999-SNAPSHOT</version>
+  </parent>
+  <artifactId>integration-tests-quarkus-transactions</artifactId>
+  <name>Kogito :: Integration Tests :: Quarkus :: Transactions</name>
+
+  <properties>
+    <quarkus.test.list.include>false</quarkus.test.list.include>
+    
<java.module.name>org.kie.kogito.test.quarkus.transactions</java.module.name>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.kie.kogito</groupId>
+        <artifactId>kogito-quarkus-bom</artifactId>
+        <version>${project.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.jbpm</groupId>
+      <artifactId>jbpm-quarkus</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.kie</groupId>
+      <artifactId>kie-addons-quarkus-messaging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-messaging-kafka</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.kie</groupId>
+      <artifactId>kie-addons-quarkus-persistence-jdbc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-jdbc-postgresql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-resteasy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-resteasy-jackson</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-junit5</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.kie.kogito</groupId>
+      <artifactId>kogito-quarkus-test-utils</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.kie.kogito</groupId>
+      <artifactId>kogito-test-utils</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>io.quarkus</groupId>
+          <artifactId>quarkus-maven-plugin</artifactId>
+          <configuration>
+            <noDeps>true</noDeps>
+            <skip>${skipTests}</skip>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <groupId>io.quarkus</groupId>
+        <artifactId>quarkus-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables combine.children="append">
+            
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+            
<container.image.kafka>${container.image.kafka}</container.image.kafka>
+            <kogito.version>${project.version}</kogito.version>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>native</id>
+      <activation>
+        <property>
+          <name>native</name>
+        </property>
+      </activation>
+      <properties>
+        <quarkus.native.enabled>true</quarkus.native.enabled>
+      </properties>
+    </profile>
+  </profiles>
+</project>
+
diff --git 
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
new file mode 100644
index 0000000000..132018c817
--- /dev/null
+++ 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/application.properties
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+quarkus.log.level=INFO
+#quarkus.log.category."org.kie.kogito".level=DEBUG
+
+# Maximum Java heap to be used during the native image generation
+quarkus.native.native-image-xmx=8g
+
+mp.messaging.incoming.tx_test_continue.connector=smallrye-kafka
+mp.messaging.incoming.tx_test_continue.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+mp.messaging.outgoing.tx_test_end.connector=smallrye-kafka
+mp.messaging.outgoing.tx_test_end.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+
+kogito.transactionEnabled=true
+kogito.faultToleranceEnabled=true
+
+# PostgreSQL persistence configuration for transaction rollback test
+kogito.persistence.type=postgresql
+quarkus.datasource.devservices.image-name=mirror.gcr.io/postgres:15.9-alpine3.20
+kie.flyway.enabled=true
+
diff --git 
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
new file mode 100644
index 0000000000..92c9b9196d
--- /dev/null
+++ 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/main/resources/tx_rollback_test.bpmn
@@ -0,0 +1,145 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<bpmn2:definitions xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"; 
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"; 
xmlns:bpsim="http://www.bpsim.org/schemas/1.0"; 
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"; 
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"; 
xmlns:drools="http://www.jboss.org/drools"; id="_TxRollbackTest" exporter="jBPM 
Process Modeler" exporterVersion="2.0" 
targetNamespace="http://www.omg.org/bpmn20";>
+  <bpmn2:itemDefinition id="_sleepTimeItem" structureRef="Integer"/>
+  <bpmn2:itemDefinition id="_messageItem" structureRef="String"/>
+  <bpmn2:itemDefinition id="__MESSAGE_CATCH_eventOutputXItem" 
structureRef="String"/>
+  <bpmn2:itemDefinition id="tx_test_continueType" structureRef="String"/>
+  <bpmn2:itemDefinition id="__END_MESSAGE_eventInputXItem" 
structureRef="String"/>
+  <bpmn2:itemDefinition id="tx_test_endType" structureRef="String"/>
+  <bpmn2:message id="_msg_continue" itemRef="tx_test_continueType" 
name="tx_test_continue"/>
+  <bpmn2:message id="_msg_end" itemRef="tx_test_endType" name="tx_test_end"/>
+  <bpmn2:process id="tx_rollback_test" 
drools:packageName="org.kie.kogito.integrationtests" drools:version="1.0" 
drools:adHoc="false" name="tx_rollback_test" isExecutable="true" 
processType="Public">
+    <bpmn2:property id="sleepTime" itemSubjectRef="_sleepTimeItem" 
name="sleepTime"/>
+    <bpmn2:property id="message" itemSubjectRef="_messageItem" name="message"/>
+    <bpmn2:sequenceFlow id="_SEQ_1" sourceRef="_START_EVENT" 
targetRef="_MESSAGE_CATCH"/>
+    <bpmn2:sequenceFlow id="_SEQ_2" sourceRef="_MESSAGE_CATCH" 
targetRef="_SLEEP_TASK"/>
+    <bpmn2:sequenceFlow id="_SEQ_3" sourceRef="_SLEEP_TASK" 
targetRef="_END_MESSAGE"/>
+    <bpmn2:startEvent id="_START_EVENT" name="Start">
+      <bpmn2:outgoing>_SEQ_1</bpmn2:outgoing>
+    </bpmn2:startEvent>
+    <bpmn2:intermediateCatchEvent id="_MESSAGE_CATCH" name="Wait for Continue">
+      <bpmn2:incoming>_SEQ_1</bpmn2:incoming>
+      <bpmn2:outgoing>_SEQ_2</bpmn2:outgoing>
+      <bpmn2:dataOutput id="_MESSAGE_CATCH_eventOutputX" drools:dtype="String" 
itemSubjectRef="__MESSAGE_CATCH_eventOutputXItem" name="event"/>
+      <bpmn2:dataOutputAssociation>
+        <bpmn2:sourceRef>_MESSAGE_CATCH_eventOutputX</bpmn2:sourceRef>
+        <bpmn2:targetRef>message</bpmn2:targetRef>
+      </bpmn2:dataOutputAssociation>
+      <bpmn2:outputSet>
+        
<bpmn2:dataOutputRefs>_MESSAGE_CATCH_eventOutputX</bpmn2:dataOutputRefs>
+      </bpmn2:outputSet>
+      <bpmn2:messageEventDefinition drools:msgref="tx_test_continue" 
messageRef="_msg_continue"/>
+    </bpmn2:intermediateCatchEvent>
+    <bpmn2:scriptTask id="_SLEEP_TASK" name="Sleep Task" 
scriptFormat="http://www.java.com/java";>
+      <bpmn2:extensionElements>
+        <drools:metaData name="elementname">
+          <drools:metaValue><![CDATA[Sleep Task]]></drools:metaValue>
+        </drools:metaData>
+      </bpmn2:extensionElements>
+      <bpmn2:incoming>_SEQ_2</bpmn2:incoming>
+      <bpmn2:outgoing>_SEQ_3</bpmn2:outgoing>
+      <bpmn2:script><![CDATA[if (sleepTime != null && sleepTime > 0) {
+    System.out.println("Sleeping for " + sleepTime + " milliseconds");
+    Thread.sleep(sleepTime);
+    System.out.println("Sleep completed");
+}]]></bpmn2:script>
+    </bpmn2:scriptTask>
+    <bpmn2:endEvent id="_END_MESSAGE" name="End">
+      <bpmn2:incoming>_SEQ_3</bpmn2:incoming>
+      <bpmn2:dataInput id="_END_MESSAGE_eventInputX" drools:dtype="String" 
itemSubjectRef="__END_MESSAGE_eventInputXItem" name="event"/>
+      <bpmn2:dataInputAssociation>
+        <bpmn2:sourceRef>message</bpmn2:sourceRef>
+        <bpmn2:targetRef>_END_MESSAGE_eventInputX</bpmn2:targetRef>
+      </bpmn2:dataInputAssociation>
+      <bpmn2:inputSet>
+        <bpmn2:dataInputRefs>_END_MESSAGE_eventInputX</bpmn2:dataInputRefs>
+      </bpmn2:inputSet>
+      <bpmn2:messageEventDefinition drools:msgref="tx_test_end" 
messageRef="_msg_end"/>
+    </bpmn2:endEvent>
+  </bpmn2:process>
+  <bpmndi:BPMNDiagram>
+    <bpmndi:BPMNPlane bpmnElement="tx_rollback_test">
+      <bpmndi:BPMNShape id="shape__START_EVENT" bpmnElement="_START_EVENT">
+        <dc:Bounds height="56" width="56" x="100" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__MESSAGE_CATCH" bpmnElement="_MESSAGE_CATCH">
+        <dc:Bounds height="56" width="56" x="230" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__SLEEP_TASK" bpmnElement="_SLEEP_TASK">
+        <dc:Bounds height="102" width="154" x="360" y="77"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__END_MESSAGE" bpmnElement="_END_MESSAGE">
+        <dc:Bounds height="56" width="56" x="590" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNEdge id="edge__START_EVENT_to__MESSAGE_CATCH" 
bpmnElement="_SEQ_1">
+        <di:waypoint x="156" y="128"/>
+        <di:waypoint x="230" y="128"/>
+      </bpmndi:BPMNEdge>
+      <bpmndi:BPMNEdge id="edge__MESSAGE_CATCH_to__SLEEP_TASK" 
bpmnElement="_SEQ_2">
+        <di:waypoint x="286" y="128"/>
+        <di:waypoint x="360" y="128"/>
+      </bpmndi:BPMNEdge>
+      <bpmndi:BPMNEdge id="edge__SLEEP_TASK_to__END_MESSAGE" 
bpmnElement="_SEQ_3">
+        <di:waypoint x="514" y="128"/>
+        <di:waypoint x="590" y="128"/>
+      </bpmndi:BPMNEdge>
+    </bpmndi:BPMNPlane>
+  </bpmndi:BPMNDiagram>
+  <bpmn2:relationship type="BPSimData">
+    <bpmn2:extensionElements>
+      <bpsim:BPSimData>
+        <bpsim:Scenario id="default" name="Simulation scenario">
+          <bpsim:ScenarioParameters/>
+          <bpsim:ElementParameters elementRef="_START_EVENT">
+            <bpsim:TimeParameters>
+              <bpsim:ProcessingTime>
+                <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+              </bpsim:ProcessingTime>
+            </bpsim:TimeParameters>
+          </bpsim:ElementParameters>
+          <bpsim:ElementParameters elementRef="_SLEEP_TASK">
+            <bpsim:TimeParameters>
+              <bpsim:ProcessingTime>
+                <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+              </bpsim:ProcessingTime>
+            </bpsim:TimeParameters>
+            <bpsim:ResourceParameters>
+              <bpsim:Availability>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:Availability>
+              <bpsim:Quantity>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:Quantity>
+            </bpsim:ResourceParameters>
+            <bpsim:CostParameters>
+              <bpsim:UnitCost>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:UnitCost>
+            </bpsim:CostParameters>
+          </bpsim:ElementParameters>
+        </bpsim:Scenario>
+      </bpsim:BPSimData>
+    </bpmn2:extensionElements>
+    <bpmn2:source>_TxRollbackTest</bpmn2:source>
+    <bpmn2:target>_TxRollbackTest</bpmn2:target>
+  </bpmn2:relationship>
+</bpmn2:definitions>
+
diff --git 
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
new file mode 100644
index 0000000000..03faadceae
--- /dev/null
+++ 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/java/org/kie/kogito/integrationtests/quarkus/TransactionRollbackMessageIT.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.quarkus;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.kie.kogito.test.quarkus.QuarkusTestProperty;
+import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
+import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
+import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
+import org.kie.kogito.testcontainers.quarkus.PostgreSqlQuarkusTestResource;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+
+import static io.restassured.RestAssured.given;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Integration test to verify that when a transaction rolls back due to 
persistence failure,
+ * no Kafka message is sent. This validates the transactional behavior of the 
event emitter.
+ * 
+ */
+@QuarkusIntegrationTest
+@QuarkusTestResource(KafkaQuarkusTestResource.class)
+@QuarkusTestResource(TransactionRollbackMessageIT.ExposedPostgreSqlResource.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TransactionRollbackMessageIT {
+
+    private static final String TX_TEST_END_TOPIC = "tx_test_end";
+    private static final String TX_TEST_CONTINUE_TOPIC = "tx_test_continue";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public KafkaTestClient kafkaClient;
+    private AtomicInteger messageCount;
+    private CountDownLatch messageLatch;
+
+    @QuarkusTestProperty(name = KafkaQuarkusTestResource.KOGITO_KAFKA_PROPERTY)
+    private String kafkaBootstrapServers;
+
+    // Static reference to PostgreSQL container for stopping it during test
+    private static KogitoPostgreSqlContainer postgresContainer;
+
+    static {
+        RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+    }
+
+    @BeforeEach
+    public void setup() {
+        kafkaClient = new KafkaTestClient(kafkaBootstrapServers);
+        messageCount = new AtomicInteger(0);
+        messageLatch = new CountDownLatch(1);
+
+        // Ensure PostgreSQL is running before each test
+        if (postgresContainer != null && !postgresContainer.isRunning()) {
+            postgresContainer.start();
+        }
+    }
+
+    @Test
+    @Order(1)
+    void testMessageSentWhenTransactionSucceeds() throws Exception {
+        kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+            messageCount.incrementAndGet();
+            messageLatch.countDown();
+        });
+
+        String pId = given().body(String.format("{ \"sleepTime\": 1000 }"))
+                .contentType(ContentType.JSON)
+                .when()
+                .post("/tx_rollback_test")
+                .then()
+                .statusCode(201)
+                .extract().body().path("id");
+
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> given()
+                        .contentType(ContentType.JSON)
+                        .when()
+                        .get("/tx_rollback_test/" + pId)
+                        .then()
+                        .statusCode(200));
+
+        String continueMessage = createCloudEventsMessage(pId, "continue");
+        kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+        // Wait for the process to complete and message to be sent
+        boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+        // In a successful transaction, the message should be sent
+        assertThat(messageReceived).as("Message should be sent when 
transaction succeeds").isTrue();
+        assertThat(messageCount.get()).as("Message count should be at least 
1").isGreaterThanOrEqualTo(1);
+    }
+
+    @Test
+    // @Order(2)
+    void testNoMessageSentWhenTransactionRollsBack() throws Exception {
+        kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+            System.out.println("Received end message: " + s);
+            messageCount.incrementAndGet();
+            messageLatch.countDown();
+        });
+
+        String pId = given().body(String.format("{ \"sleepTime\": 5000 }"))
+                .contentType(ContentType.JSON)
+                .when()
+                .post("/tx_rollback_test")
+                .then()
+                .statusCode(201)
+                .extract().body().path("id");
+
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> given()
+                        .contentType(ContentType.JSON)
+                        .when()
+                        .get("/tx_rollback_test/" + pId)
+                        .then()
+                        .statusCode(200));
+
+        String continueMessage = createCloudEventsMessage(pId, "continue");
+        kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+        // Wait for the message to be consumed and sleep to start
+        Thread.sleep(1000);
+
+        // Stop PostgreSQL during the sleep to simulate database failure
+        if (postgresContainer != null) {
+            postgresContainer.stop();
+        } else {
+            System.out.println("WARNING: PostgreSQL container reference is 
null, cannot simulate failure");
+        }
+
+        // Verify that NO message was sent to the tx_test_end topic
+        // because the transaction should have rolled back
+        boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+        // In a successful rollback scenario, no message should be received
+        assertThat(messageReceived).as("No message should be sent when 
transaction rolls back").isFalse();
+        assertThat(messageCount.get()).as("Message count should be 
0").isEqualTo(0);
+    }
+
+    private String createCloudEventsMessage(String processInstanceId, String 
messageContent) throws Exception {
+        Map<String, Object> cloudEvent = new HashMap<>();
+        cloudEvent.put("specversion", "1.0");
+        cloudEvent.put("id", UUID.randomUUID().toString());
+        cloudEvent.put("source", "test");
+        cloudEvent.put("type", "tx_test_continue");
+        cloudEvent.put("data", messageContent);
+        cloudEvent.put("kogitoprocrefid", processInstanceId);
+
+        return MAPPER.writeValueAsString(cloudEvent);
+    }
+
+    /**
+     * Custom PostgreSQL test resource that exposes the container instance
+     * so tests can stop it to simulate database failure.
+     */
+    public static class ExposedPostgreSqlResource extends 
PostgreSqlQuarkusTestResource {
+
+        @Override
+        public Map<String, String> start() {
+            Map<String, String> properties = super.start();
+            // Store reference to the container for test access
+            postgresContainer = getTestResource();
+            return properties;
+        }
+
+        @Override
+        public void stop() {
+            super.stop();
+            postgresContainer = null;
+        }
+    }
+}
diff --git 
a/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
new file mode 100644
index 0000000000..0d0c0da0e1
--- /dev/null
+++ 
b/quarkus/integration-tests/integration-tests-quarkus-transactions/src/test/resources/application.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Quarkus
+quarkus.http.test-port=0
+quarkus.kafka.devservices.enabled=false
diff --git a/quarkus/integration-tests/pom.xml 
b/quarkus/integration-tests/pom.xml
index 7e6f72b621..8b50f72814 100644
--- a/quarkus/integration-tests/pom.xml
+++ b/quarkus/integration-tests/pom.xml
@@ -47,6 +47,7 @@
     <module>integration-tests-quarkus-processes</module>
     <module>integration-tests-quarkus-processes-reactive</module>
     <module>integration-tests-quarkus-processes-persistence</module>
+    <module>integration-tests-quarkus-transactions</module>
     <module>integration-tests-quarkus-usertasks</module>
     <module>integration-tests-quarkus-usertask-listeners-rollback</module>
     <module>integration-tests-quarkus-wshumantasks</module>
diff --git 
a/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
new file mode 100644
index 0000000000..0218b881cf
--- /dev/null
+++ 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/pom.xml
@@ -0,0 +1,194 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.kie.kogito</groupId>
+    <artifactId>kogito-spring-boot-integration-tests</artifactId>
+    <version>999-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>integration-tests-springboot-transactions-it</artifactId>
+
+  <properties>
+    
<java.module.name>integration.tests.springboot.transactions.it</java.module.name>
+  </properties>
+  
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.kie.kogito</groupId>
+        <artifactId>kogito-spring-boot-bom</artifactId>
+        <version>${project.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jbpm</groupId>
+      <artifactId>jbpm-with-drools-spring-boot-starter</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kie</groupId>
+      <artifactId>kie-addons-springboot-messaging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kie</groupId>
+      <artifactId>kie-addons-springboot-events-process-kafka</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kie</groupId>
+      <artifactId>kie-addons-springboot-persistence-jdbc</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-jdbc</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.junit.vintage</groupId>
+          <artifactId>junit-vintage-engine</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kie.kogito</groupId>
+      <artifactId>kogito-spring-boot-test-utils</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <finalName>${project.artifactId}</finalName>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${version.compiler.plugin}</version>
+        <configuration>
+          <release>${maven.compiler.release}</release>
+        </configuration>
+        <executions>
+          <execution>
+            <id>pre-kogito-generate-model</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.kie.kogito</groupId>
+        <artifactId>kogito-maven-plugin</artifactId>
+        <version>${project.version}</version> <!-- Needed, otherwise it would 
use the latest release found on Maven central -->
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>generateModel</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+        <version>${version.org.springframework.boot}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>repackage</goal>
+            </goals>
+            <configuration>
+              <classifier>executable</classifier>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <classesDirectory>${project.build.outputDirectory}</classesDirectory>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${version.surefire.plugin}</version>
+        <configuration>
+          <systemPropertyVariables combine.children="append">
+            
<container.image.kafka>${container.image.kafka}</container.image.kafka>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
+
diff --git 
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
new file mode 100644
index 0000000000..4a460a4976
--- /dev/null
+++ 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/java/org/kie/kogito/integrationtests/springboot/KogitoSpringbootApplication.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication(scanBasePackages = { "org.kie.kogito.**" })
+public class KogitoSpringbootApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(KogitoSpringbootApplication.class, args);
+    }
+}
diff --git 
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
new file mode 100644
index 0000000000..d322322578
--- /dev/null
+++ 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/application.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+server.address=0.0.0.0
+spring.kafka.consumer.group-id=kogito-group
+spring.kafka.consumer.auto-offset-reset=earliest
+
+# Transaction rollback test topics
+kogito.addon.cloudevents.kafka.kogito_incoming_stream.tx_test_continue=tx_test_continue
+kogito.addon.cloudevents.kafka.kogito_outgoing_stream.tx_test_end=tx_test_end
+
+# PostgreSQL persistence configuration for transaction rollback test
+kogito.persistence.type=postgresql
+kie.flyway.enabled=true
+spring.flyway.enabled=false
+
+kogito.transactionEnabled=true
+kogito.faultToleranceEnabled=true
+
+# Connection URI will be set by test container
+kogito.persistence.postgresql.connection.uri=postgresql://kogito-user:kogito-pass@localhost:5432/kogito
+
diff --git 
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
new file mode 100644
index 0000000000..d91589fe80
--- /dev/null
+++ 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/main/resources/tx_rollback_test.bpmn
@@ -0,0 +1,145 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<bpmn2:definitions xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"; 
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"; 
xmlns:bpsim="http://www.bpsim.org/schemas/1.0"; 
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"; 
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"; 
xmlns:drools="http://www.jboss.org/drools"; id="_TxRollbackTestSpring" 
exporter="jBPM Process Modeler" exporterVersion="2.0" 
targetNamespace="http://www.omg.org/bpmn20";>
+  <bpmn2:itemDefinition id="_sleepTimeItem" structureRef="Integer"/>
+  <bpmn2:itemDefinition id="_messageItem" structureRef="String"/>
+  <bpmn2:itemDefinition id="__MESSAGE_CATCH_eventOutputXItem" 
structureRef="String"/>
+  <bpmn2:itemDefinition id="tx_test_continueType" structureRef="String"/>
+  <bpmn2:itemDefinition id="__END_MESSAGE_eventInputXItem" 
structureRef="String"/>
+  <bpmn2:itemDefinition id="tx_test_endType" structureRef="String"/>
+  <bpmn2:message id="_msg_continue" itemRef="tx_test_continueType" 
name="tx_test_continue"/>
+  <bpmn2:message id="_msg_end" itemRef="tx_test_endType" name="tx_test_end"/>
+  <bpmn2:process id="tx_rollback_test" 
drools:packageName="org.kie.kogito.integrationtests.springboot" 
drools:version="1.0" drools:adHoc="false" name="tx_rollback_test" 
isExecutable="true" processType="Public">
+    <bpmn2:property id="sleepTime" itemSubjectRef="_sleepTimeItem" 
name="sleepTime"/>
+    <bpmn2:property id="message" itemSubjectRef="_messageItem" name="message"/>
+    <bpmn2:sequenceFlow id="_SEQ_1" sourceRef="_START_EVENT" 
targetRef="_MESSAGE_CATCH"/>
+    <bpmn2:sequenceFlow id="_SEQ_2" sourceRef="_MESSAGE_CATCH" 
targetRef="_SLEEP_TASK"/>
+    <bpmn2:sequenceFlow id="_SEQ_3" sourceRef="_SLEEP_TASK" 
targetRef="_END_MESSAGE"/>
+    <bpmn2:startEvent id="_START_EVENT" name="Start">
+      <bpmn2:outgoing>_SEQ_1</bpmn2:outgoing>
+    </bpmn2:startEvent>
+    <bpmn2:intermediateCatchEvent id="_MESSAGE_CATCH" name="Wait for Continue">
+      <bpmn2:incoming>_SEQ_1</bpmn2:incoming>
+      <bpmn2:outgoing>_SEQ_2</bpmn2:outgoing>
+      <bpmn2:dataOutput id="_MESSAGE_CATCH_eventOutputX" drools:dtype="String" 
itemSubjectRef="__MESSAGE_CATCH_eventOutputXItem" name="event"/>
+      <bpmn2:dataOutputAssociation>
+        <bpmn2:sourceRef>_MESSAGE_CATCH_eventOutputX</bpmn2:sourceRef>
+        <bpmn2:targetRef>message</bpmn2:targetRef>
+      </bpmn2:dataOutputAssociation>
+      <bpmn2:outputSet>
+        
<bpmn2:dataOutputRefs>_MESSAGE_CATCH_eventOutputX</bpmn2:dataOutputRefs>
+      </bpmn2:outputSet>
+      <bpmn2:messageEventDefinition drools:msgref="tx_test_continue" 
messageRef="_msg_continue"/>
+    </bpmn2:intermediateCatchEvent>
+    <bpmn2:scriptTask id="_SLEEP_TASK" name="Sleep Task" 
scriptFormat="http://www.java.com/java";>
+      <bpmn2:extensionElements>
+        <drools:metaData name="elementname">
+          <drools:metaValue><![CDATA[Sleep Task]]></drools:metaValue>
+        </drools:metaData>
+      </bpmn2:extensionElements>
+      <bpmn2:incoming>_SEQ_2</bpmn2:incoming>
+      <bpmn2:outgoing>_SEQ_3</bpmn2:outgoing>
+      <bpmn2:script><![CDATA[if (sleepTime != null && sleepTime > 0) {
+    System.out.println("Sleeping for " + sleepTime + " milliseconds");
+    Thread.sleep(sleepTime);
+    System.out.println("Sleep completed");
+}]]></bpmn2:script>
+    </bpmn2:scriptTask>
+    <bpmn2:endEvent id="_END_MESSAGE" name="End">
+      <bpmn2:incoming>_SEQ_3</bpmn2:incoming>
+      <bpmn2:dataInput id="_END_MESSAGE_eventInputX" drools:dtype="String" 
itemSubjectRef="__END_MESSAGE_eventInputXItem" name="event"/>
+      <bpmn2:dataInputAssociation>
+        <bpmn2:sourceRef>message</bpmn2:sourceRef>
+        <bpmn2:targetRef>_END_MESSAGE_eventInputX</bpmn2:targetRef>
+      </bpmn2:dataInputAssociation>
+      <bpmn2:inputSet>
+        <bpmn2:dataInputRefs>_END_MESSAGE_eventInputX</bpmn2:dataInputRefs>
+      </bpmn2:inputSet>
+      <bpmn2:messageEventDefinition drools:msgref="tx_test_end" 
messageRef="_msg_end"/>
+    </bpmn2:endEvent>
+  </bpmn2:process>
+  <bpmndi:BPMNDiagram>
+    <bpmndi:BPMNPlane bpmnElement="tx_rollback_test">
+      <bpmndi:BPMNShape id="shape__START_EVENT" bpmnElement="_START_EVENT">
+        <dc:Bounds height="56" width="56" x="100" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__MESSAGE_CATCH" bpmnElement="_MESSAGE_CATCH">
+        <dc:Bounds height="56" width="56" x="230" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__SLEEP_TASK" bpmnElement="_SLEEP_TASK">
+        <dc:Bounds height="102" width="154" x="360" y="77"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNShape id="shape__END_MESSAGE" bpmnElement="_END_MESSAGE">
+        <dc:Bounds height="56" width="56" x="590" y="100"/>
+      </bpmndi:BPMNShape>
+      <bpmndi:BPMNEdge id="edge__START_EVENT_to__MESSAGE_CATCH" 
bpmnElement="_SEQ_1">
+        <di:waypoint x="156" y="128"/>
+        <di:waypoint x="230" y="128"/>
+      </bpmndi:BPMNEdge>
+      <bpmndi:BPMNEdge id="edge__MESSAGE_CATCH_to__SLEEP_TASK" 
bpmnElement="_SEQ_2">
+        <di:waypoint x="286" y="128"/>
+        <di:waypoint x="360" y="128"/>
+      </bpmndi:BPMNEdge>
+      <bpmndi:BPMNEdge id="edge__SLEEP_TASK_to__END_MESSAGE" 
bpmnElement="_SEQ_3">
+        <di:waypoint x="514" y="128"/>
+        <di:waypoint x="590" y="128"/>
+      </bpmndi:BPMNEdge>
+    </bpmndi:BPMNPlane>
+  </bpmndi:BPMNDiagram>
+  <bpmn2:relationship type="BPSimData">
+    <bpmn2:extensionElements>
+      <bpsim:BPSimData>
+        <bpsim:Scenario id="default" name="Simulation scenario">
+          <bpsim:ScenarioParameters/>
+          <bpsim:ElementParameters elementRef="_START_EVENT">
+            <bpsim:TimeParameters>
+              <bpsim:ProcessingTime>
+                <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+              </bpsim:ProcessingTime>
+            </bpsim:TimeParameters>
+          </bpsim:ElementParameters>
+          <bpsim:ElementParameters elementRef="_SLEEP_TASK">
+            <bpsim:TimeParameters>
+              <bpsim:ProcessingTime>
+                <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+              </bpsim:ProcessingTime>
+            </bpsim:TimeParameters>
+            <bpsim:ResourceParameters>
+              <bpsim:Availability>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:Availability>
+              <bpsim:Quantity>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:Quantity>
+            </bpsim:ResourceParameters>
+            <bpsim:CostParameters>
+              <bpsim:UnitCost>
+                <bpsim:FloatingParameter value="0"/>
+              </bpsim:UnitCost>
+            </bpsim:CostParameters>
+          </bpsim:ElementParameters>
+        </bpsim:Scenario>
+      </bpsim:BPSimData>
+    </bpmn2:extensionElements>
+    <bpmn2:source>_TxRollbackTestSpring</bpmn2:source>
+    <bpmn2:target>_TxRollbackTestSpring</bpmn2:target>
+  </bpmn2:relationship>
+</bpmn2:definitions>
+
diff --git 
a/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
new file mode 100644
index 0000000000..22e560b61a
--- /dev/null
+++ 
b/springboot/integration-tests/integration-tests-springboot-transactions-it/src/test/java/org/kie/kogito/integrationtests/springboot/TransactionRollbackMessageTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.integrationtests.springboot;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.kie.kogito.test.springboot.kafka.KafkaTestClient;
+import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
+import org.kie.kogito.testcontainers.springboot.KafkaSpringBootTestResource;
+import 
org.kie.kogito.testcontainers.springboot.PostgreSqlSpringBootTestResource;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.test.context.ContextConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+
+import static io.restassured.RestAssured.given;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Integration test to verify that when a transaction rolls back due to 
persistence failure,
+ * no Kafka message is sent. This validates the transactional behavior of the 
event emitter.
+ */
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, 
classes = KogitoSpringbootApplication.class)
+@ContextConfiguration(initializers = { KafkaSpringBootTestResource.class, 
TransactionRollbackMessageTest.ExposedPostgreSqlResource.class })
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TransactionRollbackMessageTest {
+
+    private static final String TX_TEST_END_TOPIC = "tx_test_end";
+    private static final String TX_TEST_CONTINUE_TOPIC = "tx_test_continue";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @LocalServerPort
+    int randomServerPort;
+
+    @Autowired
+    private KafkaTestClient kafkaClient;
+
+    private AtomicInteger messageCount;
+    private CountDownLatch messageLatch;
+
+    // Static reference to PostgreSQL container for stopping it during test
+    private static KogitoPostgreSqlContainer postgresContainer;
+
+    static {
+        RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+    }
+
+    @BeforeEach
+    public void setup() {
+        RestAssured.port = randomServerPort;
+        messageCount = new AtomicInteger(0);
+        messageLatch = new CountDownLatch(1);
+
+        // Ensure PostgreSQL is running before each test
+        if (postgresContainer != null && !postgresContainer.isRunning()) {
+            postgresContainer.start();
+        }
+    }
+
+    @Test
+    @Order(1)
+    void testMessageSentWhenTransactionSucceeds() throws Exception {
+        kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+            messageCount.incrementAndGet();
+            messageLatch.countDown();
+        });
+
+        String pId = given().body(String.format("{ \"sleepTime\": 1000 }"))
+                .contentType(ContentType.JSON)
+                .when()
+                .post("/tx_rollback_test")
+                .then()
+                .statusCode(201)
+                .extract().body().path("id");
+
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> given()
+                        .contentType(ContentType.JSON)
+                        .when()
+                        .get("/tx_rollback_test/" + pId)
+                        .then()
+                        .statusCode(200));
+
+        String continueMessage = createCloudEventsMessage(pId, "continue");
+        kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+        // Wait for the process to complete and message to be sent
+        boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+        // In a successful transaction, the message should be sent
+        assertThat(messageReceived).as("Message should be sent when 
transaction succeeds").isTrue();
+        assertThat(messageCount.get()).as("Message count should be at least 
1").isGreaterThanOrEqualTo(1);
+    }
+
+    @Test
+    @Order(2)
+    void testNoMessageSentWhenTransactionRollsBack() throws Exception {
+        kafkaClient.consume(TX_TEST_END_TOPIC, s -> {
+            System.out.println("Received end message: " + s);
+            messageCount.incrementAndGet();
+            messageLatch.countDown();
+        });
+
+        String pId = given().body(String.format("{ \"sleepTime\": 5000 }"))
+                .contentType(ContentType.JSON)
+                .when()
+                .post("/tx_rollback_test")
+                .then()
+                .statusCode(201)
+                .extract().body().path("id");
+
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> given()
+                        .contentType(ContentType.JSON)
+                        .when()
+                        .get("/tx_rollback_test/" + pId)
+                        .then()
+                        .statusCode(200));
+
+        String continueMessage = createCloudEventsMessage(pId, "continue");
+        kafkaClient.produce(continueMessage, TX_TEST_CONTINUE_TOPIC);
+
+        // Wait for the message to be consumed and sleep to start
+        Thread.sleep(1000);
+
+        // Stop PostgreSQL during the sleep to simulate database failure
+        if (postgresContainer != null) {
+            postgresContainer.stop();
+        } else {
+            System.out.println("WARNING: PostgreSQL container reference is 
null, cannot simulate failure");
+        }
+
+        // Verify that NO message was sent to the tx_test_end topic
+        // because the transaction should have rolled back
+        boolean messageReceived = messageLatch.await(5, TimeUnit.SECONDS);
+
+        // In a successful rollback scenario, no message should be received
+        assertThat(messageReceived).as("No message should be sent when 
transaction rolls back").isFalse();
+        assertThat(messageCount.get()).as("Message count should be 
0").isEqualTo(0);
+    }
+
+    private String createCloudEventsMessage(String processInstanceId, String 
messageContent) throws Exception {
+        Map<String, Object> cloudEvent = new HashMap<>();
+        cloudEvent.put("specversion", "1.0");
+        cloudEvent.put("id", UUID.randomUUID().toString());
+        cloudEvent.put("source", "test");
+        cloudEvent.put("type", "tx_test_continue");
+        cloudEvent.put("data", messageContent);
+        cloudEvent.put("kogitoprocrefid", processInstanceId);
+
+        return MAPPER.writeValueAsString(cloudEvent);
+    }
+
+    /**
+     * Custom PostgreSQL test resource that exposes the container instance
+     * so tests can stop it to simulate database failure.
+     */
+    public static class ExposedPostgreSqlResource extends 
PostgreSqlSpringBootTestResource
+            implements 
ApplicationContextInitializer<ConfigurableApplicationContext> {
+
+        @Override
+        public void initialize(ConfigurableApplicationContext 
applicationContext) {
+            // Call parent to start the container and configure properties
+            super.initialize(applicationContext);
+            // Store reference to the container for test access
+            postgresContainer = getTestResource();
+        }
+
+        @Override
+        public void onApplicationEvent(ContextClosedEvent event) {
+            // Call parent to stop the container
+            super.onApplicationEvent(event);
+            postgresContainer = null;
+        }
+    }
+}
diff --git a/springboot/integration-tests/pom.xml 
b/springboot/integration-tests/pom.xml
index d39f56167f..2ba2b9046d 100644
--- a/springboot/integration-tests/pom.xml
+++ b/springboot/integration-tests/pom.xml
@@ -42,6 +42,7 @@
     <module>integration-tests-springboot-norest-it</module>
     <module>integration-tests-springboot-processes-it</module>
     <module>integration-tests-springboot-processes-persistence-it</module>
+    <module>integration-tests-springboot-transactions-it</module>
     <module>integration-tests-springboot-usertasks-it</module>
     
<module>integration-tests-springboot-usertask-listeners-rollback-it</module>
     <module>integration-tests-springboot-wshumantasks-it</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to