This is an automated email from the ASF dual-hosted git repository.
mweiler pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 52348294fd [incubator-kie-issues#2211] Data not correctly handled for
broadcast messages (#4168)
52348294fd is described below
commit 52348294fdeb85e4b67dcac39affb14294f1dcae
Author: Martin Weiler <[email protected]>
AuthorDate: Wed Jan 28 08:59:20 2026 -0700
[incubator-kie-issues#2211] Data not correctly handled for broadcast
messages (#4168)
* [incubator-kie-issues#2211] Data not correctly handled for broadcast
messages
* Add test case
* Add missing licence header
---
.../kogito/event/impl/ProcessEventDispatcher.java | 34 ++---
.../src/main/resources/pong_message_signal.bpmn | 152 +++++++++++++++++++++
.../quarkus/PingPongMessageIT.java | 48 +++++--
3 files changed, 208 insertions(+), 26 deletions(-)
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
index 3992608a0a..1f6dc5bb69 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
@@ -73,40 +73,42 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
// now see if we have a particular one to check
Optional<ProcessInstance<M>> processInstance = null;
+ // obtain data from the event
+ Object data = dataResolver.apply(event);
// check correlation key
String processInstanceId = resolveCorrelationId(event).orElse(null);
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, event, this::findById);
+ processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findById);
if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with correlation key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
event);
+ LOGGER.debug("sending event to process {} with correlation key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
return processInstance.get();
}
// check processInstanceId
processInstanceId = event.getKogitoReferenceId();
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, event, this::findById);
+ processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findById);
if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with reference key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
event);
+ LOGGER.debug("sending event to process {} with reference key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
return processInstance.get();
}
// check businessKey
processInstanceId = event.getKogitoBusinessKey();
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, event, this::findByBusinessKey);
+ processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findByBusinessKey);
if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with business key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
event);
+ LOGGER.debug("sending event to process {} with business key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
return processInstance.get();
}
// we signal all the processes waiting for trigger (this covers
intermediate catch events)
- LOGGER.debug("sending event to process {} with trigger {} and payload
{}", process.id(), trigger, event);
- process.send(SignalFactory.of("Message-" + trigger, event));
+ LOGGER.debug("sending event to process {} with trigger {} and payload
{}", process.id(), trigger, data);
+ process.send(SignalFactory.of("Message-" + trigger, data));
// try to start a new instance if possible (this covers start events)
return startNewInstance(trigger, event);
}
- private Optional<ProcessInstance<M>> signalTargetProcessInstance(String
processInstanceId, String trigger, DataEvent<D> event, Function<String,
Optional<ProcessInstance<M>>> findProcessInstance) {
+ private Optional<ProcessInstance<M>> signalTargetProcessInstance(String
processInstanceId, String trigger, Object data, Function<String,
Optional<ProcessInstance<M>>> findProcessInstance) {
if (processInstanceId == null) {
return Optional.empty();
}
@@ -116,15 +118,15 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
return Optional.empty();
}
- signalProcess(processInstance.get(), trigger, event);
+ signalProcess(processInstance.get(), trigger, data);
return processInstance;
}
- private void signalProcess(ProcessInstance<M> pi, String trigger,
DataEvent<D> event) {
+ private void signalProcess(ProcessInstance<M> pi, String trigger, Object
data) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending signal {} to process instance id '{}'",
trigger, pi.id());
+ LOGGER.debug("Sending signal {} to process instance id '{}' with
data {}", trigger, pi.id(), data);
}
- signalProcessInstance(trigger, pi.id(), event);
+ signalProcessInstance(trigger, pi.id(), data);
}
private Optional<ProcessInstance<M>> findById(String id) {
@@ -169,15 +171,15 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
}
}
- private Optional<M> signalProcessInstance(String trigger, String id,
DataEvent<D> event) {
- return processService.signalProcessInstance((Process) process, id,
dataResolver.apply(event), "Message-" + trigger);
+ private Optional<M> signalProcessInstance(String trigger, String id,
Object data) {
+ return processService.signalProcessInstance((Process) process, id,
data, "Message-" + trigger);
}
private ProcessInstance<M> startNewInstance(String trigger, DataEvent<D>
event) {
if (modelConverter.isEmpty()) {
return null;
}
- LOGGER.info("Starting new process instance with signal '{}'", trigger);
+ LOGGER.info("Starting new process instance with signal '{}' for event
{}", trigger, event);
return processService.createProcessInstance(
process,
event.getKogitoBusinessKey(),
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-processes/src/main/resources/pong_message_signal.bpmn
b/quarkus/integration-tests/integration-tests-quarkus-processes/src/main/resources/pong_message_signal.bpmn
new file mode 100644
index 0000000000..accfba14b4
--- /dev/null
+++
b/quarkus/integration-tests/integration-tests-quarkus-processes/src/main/resources/pong_message_signal.bpmn
@@ -0,0 +1,152 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<bpmn2:definitions xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:bpsim="http://www.bpsim.org/schemas/1.0"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:di="http://www.omg.org/spec/DD/20100524/DI"
xmlns:drools="http://www.jboss.org/drools" xmlns:xsi="xsi"
id="_9uifcNtpED6_NOZ-9gzKoA"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd
http://www.jboss.org/drools drools.xsd http://www [...]
+ <bpmn2:itemDefinition id="_messageItem" structureRef="String"/>
+ <bpmn2:itemDefinition
id="__B09C78A1-B705-4C51-8320-CBD686C490B9_eventOutputXItem"
structureRef="String"/>
+ <bpmn2:itemDefinition id="pong_signalType" structureRef="String"/>
+ <bpmn2:itemDefinition id="_077EDF08-E4AA-4A5D-A937-FE6B8D122399"
structureRef=""/>
+ <bpmn2:itemDefinition id="_419CD643-BC29-45F4-88A6-DDC90AA3C172"
structureRef=""/>
+ <bpmn2:message id="_9ujGgNtpED6_NOZ-9gzKoA" itemRef="pong_signalType"
name="pong_signal"/>
+ <bpmn2:signal id="_100571" name="end"/>
+ <bpmn2:collaboration id="_7ECF2770-4FFD-4590-9E78-51926E1093C5"
name="Default Collaboration">
+ <bpmn2:participant id="_EA3D6BC1-FC7B-47B3-A86C-41A01DE96645" name="Pool
Participant" processRef="pong_message_signal"/>
+ </bpmn2:collaboration>
+ <bpmn2:process id="pong_message_signal" drools:packageName="com.example"
drools:version="1.0" drools:adHoc="false" name="pong_message_signal"
isExecutable="true" processType="Public">
+ <bpmn2:property id="message" itemSubjectRef="_messageItem" name="message"/>
+ <bpmn2:sequenceFlow id="_513B1506-4306-4ADA-8260-27E9FA48E6CE"
sourceRef="_25DE4666-14B3-4D1B-AE78-9A6E57AF0483"
targetRef="_D3B28191-88F6-45A5-B529-643F1160163E"/>
+ <bpmn2:sequenceFlow id="_FD0C7CCF-41FC-4867-9038-843076464724"
sourceRef="_8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB"
targetRef="_25DE4666-14B3-4D1B-AE78-9A6E57AF0483"/>
+ <bpmn2:sequenceFlow id="_E5A9EC9C-3967-4610-84D7-9B9B8709648B"
sourceRef="_B09C78A1-B705-4C51-8320-CBD686C490B9"
targetRef="_8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB"/>
+ <bpmn2:sequenceFlow id="_2B6C5698-4E02-47C9-80A8-BC5C640F2390"
sourceRef="_476F8DCD-530B-4C5D-A4A9-98238B051F3A"
targetRef="_B09C78A1-B705-4C51-8320-CBD686C490B9">
+ <bpmn2:extensionElements>
+ <drools:metaData name="isAutoConnection.target">
+ <drools:metaValue><![CDATA[true]]></drools:metaValue>
+ </drools:metaData>
+ </bpmn2:extensionElements>
+ </bpmn2:sequenceFlow>
+ <bpmn2:endEvent id="_D3B28191-88F6-45A5-B529-643F1160163E">
+ <bpmn2:incoming>_513B1506-4306-4ADA-8260-27E9FA48E6CE</bpmn2:incoming>
+ </bpmn2:endEvent>
+ <bpmn2:intermediateCatchEvent id="_25DE4666-14B3-4D1B-AE78-9A6E57AF0483">
+ <bpmn2:incoming>_FD0C7CCF-41FC-4867-9038-843076464724</bpmn2:incoming>
+ <bpmn2:outgoing>_513B1506-4306-4ADA-8260-27E9FA48E6CE</bpmn2:outgoing>
+ <bpmn2:signalEventDefinition signalRef="_100571"/>
+ </bpmn2:intermediateCatchEvent>
+ <bpmn2:scriptTask id="_8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB" name="Task"
scriptFormat="http://www.java.com/java">
+ <bpmn2:extensionElements>
+ <drools:metaData name="elementname">
+ <drools:metaValue><![CDATA[Task]]></drools:metaValue>
+ </drools:metaData>
+ </bpmn2:extensionElements>
+ <bpmn2:incoming>_E5A9EC9C-3967-4610-84D7-9B9B8709648B</bpmn2:incoming>
+ <bpmn2:outgoing>_FD0C7CCF-41FC-4867-9038-843076464724</bpmn2:outgoing>
+ <bpmn2:script>System.out.println("Message " + message);
+kcontext.setVariable("message", message + " world");</bpmn2:script>
+ </bpmn2:scriptTask>
+ <bpmn2:intermediateCatchEvent id="_B09C78A1-B705-4C51-8320-CBD686C490B9">
+ <bpmn2:incoming>_2B6C5698-4E02-47C9-80A8-BC5C640F2390</bpmn2:incoming>
+ <bpmn2:outgoing>_E5A9EC9C-3967-4610-84D7-9B9B8709648B</bpmn2:outgoing>
+ <bpmn2:dataOutput
id="_B09C78A1-B705-4C51-8320-CBD686C490B9_eventOutputX" drools:dtype="String"
itemSubjectRef="__B09C78A1-B705-4C51-8320-CBD686C490B9_eventOutputXItem"
name="event"/>
+ <bpmn2:dataOutputAssociation>
+
<bpmn2:sourceRef>_B09C78A1-B705-4C51-8320-CBD686C490B9_eventOutputX</bpmn2:sourceRef>
+ <bpmn2:targetRef>message</bpmn2:targetRef>
+ </bpmn2:dataOutputAssociation>
+ <bpmn2:outputSet>
+
<bpmn2:dataOutputRefs>_B09C78A1-B705-4C51-8320-CBD686C490B9_eventOutputX</bpmn2:dataOutputRefs>
+ </bpmn2:outputSet>
+ <bpmn2:messageEventDefinition drools:msgref="pong_signal"
messageRef="_9ujGgNtpED6_NOZ-9gzKoA"/>
+ </bpmn2:intermediateCatchEvent>
+ <bpmn2:startEvent id="_476F8DCD-530B-4C5D-A4A9-98238B051F3A">
+ <bpmn2:outgoing>_2B6C5698-4E02-47C9-80A8-BC5C640F2390</bpmn2:outgoing>
+ </bpmn2:startEvent>
+ </bpmn2:process>
+ <bpmndi:BPMNDiagram>
+ <bpmndi:BPMNPlane bpmnElement="pong_message_signal">
+ <bpmndi:BPMNShape id="shape__476F8DCD-530B-4C5D-A4A9-98238B051F3A"
bpmnElement="_476F8DCD-530B-4C5D-A4A9-98238B051F3A">
+ <dc:Bounds height="56" width="56" x="146" y="232"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__B09C78A1-B705-4C51-8320-CBD686C490B9"
bpmnElement="_B09C78A1-B705-4C51-8320-CBD686C490B9">
+ <dc:Bounds height="56" width="56" x="263" y="232"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB"
bpmnElement="_8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB">
+ <dc:Bounds height="102" width="154" x="399" y="209"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__25DE4666-14B3-4D1B-AE78-9A6E57AF0483"
bpmnElement="_25DE4666-14B3-4D1B-AE78-9A6E57AF0483">
+ <dc:Bounds height="56" width="56" x="633" y="232"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNShape id="shape__D3B28191-88F6-45A5-B529-643F1160163E"
bpmnElement="_D3B28191-88F6-45A5-B529-643F1160163E">
+ <dc:Bounds height="56" width="56" x="769" y="232"/>
+ </bpmndi:BPMNShape>
+ <bpmndi:BPMNEdge
id="edge_shape__476F8DCD-530B-4C5D-A4A9-98238B051F3A_to_shape__B09C78A1-B705-4C51-8320-CBD686C490B9"
bpmnElement="_2B6C5698-4E02-47C9-80A8-BC5C640F2390">
+ <di:waypoint x="174" y="260"/>
+ <di:waypoint x="263" y="260"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge
id="edge_shape__B09C78A1-B705-4C51-8320-CBD686C490B9_to_shape__8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB"
bpmnElement="_E5A9EC9C-3967-4610-84D7-9B9B8709648B">
+ <di:waypoint x="291" y="260"/>
+ <di:waypoint x="476" y="260"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge
id="edge_shape__8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB_to_shape__25DE4666-14B3-4D1B-AE78-9A6E57AF0483"
bpmnElement="_FD0C7CCF-41FC-4867-9038-843076464724">
+ <di:waypoint x="476" y="260"/>
+ <di:waypoint x="661" y="260"/>
+ </bpmndi:BPMNEdge>
+ <bpmndi:BPMNEdge
id="edge_shape__25DE4666-14B3-4D1B-AE78-9A6E57AF0483_to_shape__D3B28191-88F6-45A5-B529-643F1160163E"
bpmnElement="_513B1506-4306-4ADA-8260-27E9FA48E6CE">
+ <di:waypoint x="661" y="260"/>
+ <di:waypoint x="797" y="260"/>
+ </bpmndi:BPMNEdge>
+ </bpmndi:BPMNPlane>
+ </bpmndi:BPMNDiagram>
+ <bpmn2:relationship type="BPSimData">
+ <bpmn2:extensionElements>
+ <bpsim:BPSimData>
+ <bpsim:Scenario id="default" name="Simulationscenario">
+ <bpsim:ScenarioParameters/>
+ <bpsim:ElementParameters
elementRef="_476F8DCD-530B-4C5D-A4A9-98238B051F3A">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ </bpsim:ElementParameters>
+ <bpsim:ElementParameters
elementRef="_8C258CA8-EA9F-4A52-B1B4-0B37A5E502CB">
+ <bpsim:TimeParameters>
+ <bpsim:ProcessingTime>
+ <bpsim:NormalDistribution mean="0" standardDeviation="0"/>
+ </bpsim:ProcessingTime>
+ </bpsim:TimeParameters>
+ <bpsim:ResourceParameters>
+ <bpsim:Availability>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Availability>
+ <bpsim:Quantity>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:Quantity>
+ </bpsim:ResourceParameters>
+ <bpsim:CostParameters>
+ <bpsim:UnitCost>
+ <bpsim:FloatingParameter value="0"/>
+ </bpsim:UnitCost>
+ </bpsim:CostParameters>
+ </bpsim:ElementParameters>
+ </bpsim:Scenario>
+ </bpsim:BPSimData>
+ </bpmn2:extensionElements>
+ <bpmn2:source>_9uifcNtpED6_NOZ-9gzKoA</bpmn2:source>
+ <bpmn2:target>_9uifcNtpED6_NOZ-9gzKoA</bpmn2:target>
+ </bpmn2:relationship>
+</bpmn2:definitions>
diff --git
a/quarkus/integration-tests/integration-tests-quarkus-processes/src/test/java/org/kie/kogito/integrationtests/quarkus/PingPongMessageIT.java
b/quarkus/integration-tests/integration-tests-quarkus-processes/src/test/java/org/kie/kogito/integrationtests/quarkus/PingPongMessageIT.java
index b4d8bd5910..bac0ef2387 100644
---
a/quarkus/integration-tests/integration-tests-quarkus-processes/src/test/java/org/kie/kogito/integrationtests/quarkus/PingPongMessageIT.java
+++
b/quarkus/integration-tests/integration-tests-quarkus-processes/src/test/java/org/kie/kogito/integrationtests/quarkus/PingPongMessageIT.java
@@ -63,7 +63,7 @@ public class PingPongMessageIT {
.statusCode(201)
.extract().body().path("id");
- validateSubProcess();
+ validateSubProcess("pong_message");
await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> given()
@@ -89,12 +89,12 @@ public class PingPongMessageIT {
.statusCode(404);
}
- private void validateSubProcess() {
+ private void validateSubProcess(String subProcessName) {
await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> given()
.contentType(ContentType.JSON)
.when()
- .get("/pong_message/")
+ .get("/" + subProcessName + "/")
.then()
.statusCode(200)
.body("$.size()", equalTo(1)));
@@ -102,23 +102,32 @@ public class PingPongMessageIT {
String pId = given()
.contentType(ContentType.JSON)
.when()
- .get("/pong_message/")
+ .get("/" + subProcessName + "/")
.then()
.statusCode(200)
.body("$.size()", equalTo(1))
.extract().body().path("[0].id");
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .when()
+ .get("/" + subProcessName + "/{pId}", pId)
+ .then()
+ .statusCode(200)
+ .body("message", equalTo("hello world")));
+
given()
.contentType(ContentType.JSON)
.when()
- .post("/pong_message/{pId}/end", pId)
+ .post("/" + subProcessName + "/{pId}/end", pId)
.then()
.statusCode(200);
given()
.contentType(ContentType.JSON)
.when()
- .get("/pong_message/{pId}", pId)
+ .get("/" + subProcessName + "/{pId}", pId)
.then()
.statusCode(404);
}
@@ -126,9 +135,10 @@ public class PingPongMessageIT {
@Test
void testPongWithValidMessage() throws InterruptedException {
kafkaClient.produce(
- "{\"specversion\": \"1.0\", \"id\":\"id1\", \"source\":
\"junit\", \"type\": \"pong_start\", \"event\": \"Hello World\" }",
+ "{\"specversion\": \"1.0\", \"id\":\"id1\", \"source\":
\"junit\", \"type\": \"pong_start\", \"data\": \"hello\" }",
"kogito_it_test");
- validateSubProcess();
+
+ validateSubProcess("pong_message");
}
@Test
@@ -139,9 +149,27 @@ public class PingPongMessageIT {
"kogito_it_test");
// sending valid message
kafkaClient.produce(
- "{\"specversion\": \"1.0\", \"id\":\"id1\", \"source\":
\"junit\", \"type\": \"pong_start\", \"event\": \"Hello World\" }",
+ "{\"specversion\": \"1.0\", \"id\":\"id1\", \"source\":
\"junit\", \"type\": \"pong_start\", \"data\": \"hello\" }",
"kogito_it_test");
- validateSubProcess();
+
+ validateSubProcess("pong_message");
+ }
+
+ @Test
+ void testPongWithValidMessageAfterSignal() throws InterruptedException {
+ String pId = given().body("{ \"message\": \"hello\" }")
+ .contentType(ContentType.JSON)
+ .when()
+ .post("/pong_message_signal")
+ .then()
+ .statusCode(201)
+ .extract().body().path("id");
+
+ kafkaClient.produce(
+ "{\"specversion\": \"1.0\", \"id\":\"id1\", \"source\":
\"junit\", \"type\": \"pong_signal\", \"data\": \"hello\" }",
+ "kogito_it_test");
+
+ validateSubProcess("pong_message_signal");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]