This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 4ea541a140 [Fix apache/incubator-kie-issues#2238]
ProcessEventDispatcher fix (#4185)
4ea541a140 is described below
commit 4ea541a1407b1fa46c6ba70784b0a6b58cf60f98
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Mon Feb 9 12:20:44 2026 +0100
[Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix (#4185)
* [Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix
A set of priorities is defined.
If there is a correlation key, thats the process instance to be notified
Then, the kogito reference id
Then, the kogito business key
Then, when try to start a new process only if there is a starting event
listening.
If none, of them, then we pass the trigger to the engine for correlation
If for correlation or reference id, there is no existing process
instance, then the event is discarded.
Signed-off-by: fjtirado <[email protected]>
* [Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix
Start instance (if applicable, meaning startNode exist) and broadcast
Signed-off-by: fjtirado <[email protected]>
* Apply suggestion from @pefernan
Co-authored-by: Pere Fernández <[email protected]>
* Apply suggestion from @pefernan and @mweiler
Co-authored-by: Pere Fernández <[email protected]>
---------
Signed-off-by: fjtirado <[email protected]>
Co-authored-by: Pere Fernández <[email protected]>
---
.../kogito/event/impl/ProcessEventDispatcher.java | 63 +++++++++++-----------
.../event/impl/ProcessEventDispatcherTest.java | 20 ++-----
.../kie/kogito/quarkus/workflows/EventFlowIT.java | 8 +++
3 files changed, 45 insertions(+), 46 deletions(-)
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
index 1f6dc5bb69..349de3a9b0 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
@@ -70,41 +70,44 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
}
return null;
}
-
- // now see if we have a particular one to check
- Optional<ProcessInstance<M>> processInstance = null;
// obtain data from the event
Object data = dataResolver.apply(event);
- // check correlation key
- String processInstanceId = resolveCorrelationId(event).orElse(null);
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findById);
- if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with correlation key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
- return processInstance.get();
+ // check correlation key, if an instance associated to that
correlation key exist, notify the instance, if it does not exist, ignore the
event
+ Optional<String> correlationId = resolveCorrelationId(event);
+ if (correlationId.isPresent()) {
+ return signalTargetProcessInstance(correlationId.orElseThrow(),
trigger, data, this::findById, "correlation");
}
-
- // check processInstanceId
- processInstanceId = event.getKogitoReferenceId();
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findById);
- if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with reference key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
- return processInstance.get();
+ // check process reference id, if the id exist, notify the instance,
if it does not exist, ignore the event
+ String processInstanceId = event.getKogitoReferenceId();
+ if (processInstanceId != null) {
+ return signalTargetProcessInstance(processInstanceId, trigger,
data, this::findById, "reference");
}
-
// check businessKey
processInstanceId = event.getKogitoBusinessKey();
- processInstance = signalTargetProcessInstance(processInstanceId,
trigger, data, this::findByBusinessKey);
- if (processInstance.isPresent()) {
- LOGGER.debug("sending event to process {} with business key {}
with trigger {} and payload {}", process.id(), processInstanceId, trigger,
data);
- return processInstance.get();
+ if (processInstanceId != null) {
+ Optional<ProcessInstance<M>> processInstance =
signalTargetProcessInstance(processInstanceId, trigger, data,
this::findByBusinessKey);
+ // business key is special case, since it might be used to notify
a process instance identified by that business key or create a new one
+ // using that business key
+ return processInstance.isPresent() ? processInstance.orElseThrow()
: startNewInstance(trigger, event);
}
-
+ // if we reach this point try to start a new instance if possible
(this covers start events)
+ ProcessInstance<M> processInstance = startNewInstance(trigger, event);
// we signal all the processes waiting for trigger (this covers
intermediate catch events)
LOGGER.debug("sending event to process {} with trigger {} and payload
{}", process.id(), trigger, data);
process.send(SignalFactory.of("Message-" + trigger, data));
+ return processInstance;
+ }
- // try to start a new instance if possible (this covers start events)
- return startNewInstance(trigger, event);
+ private ProcessInstance<M> signalTargetProcessInstance(String
processInstanceId, String trigger, Object data, Function<String,
Optional<ProcessInstance<M>>> findProcessInstance,
+ String messagePart) {
+ Optional<ProcessInstance<M>> processInstance =
signalTargetProcessInstance(processInstanceId, trigger, data,
findProcessInstance);
+ if (processInstance.isPresent()) {
+ LOGGER.debug("Event was sent to process {} with {} key {} with
trigger {} and payload {}", process.id(), messagePart, processInstanceId,
trigger, data);
+ return processInstance.get();
+ } else {
+ LOGGER.warn("Process {} with {} key {} with trigger {} and payload
{} does not exist, ignoring event", process.id(), messagePart,
processInstanceId, trigger, data);
+ return null;
+ }
}
@@ -124,26 +127,26 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
private void signalProcess(ProcessInstance<M> pi, String trigger, Object
data) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending signal {} to process instance id '{}' with
data {}", trigger, pi.id(), data);
+ LOGGER.debug("Sending signal {} to process {} with instance id
'{}' with data {}", trigger, process.id(), pi.id(), data);
}
signalProcessInstance(trigger, pi.id(), data);
}
private Optional<ProcessInstance<M>> findById(String id) {
- LOGGER.debug("Received message with process instance id '{}'", id);
+ LOGGER.debug("Received message with process {} with instance id '{}'",
process.id(), id);
Optional<ProcessInstance<M>> result = process.instances().findById(id);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
- LOGGER.debug("No instance found for process instance id '{}'", id);
+ LOGGER.debug("No instance found for process {} with instance id
'{}'", process.id(), id);
}
return result;
}
private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
- LOGGER.debug("Received message with business key '{}'", key);
+ LOGGER.debug("Received message with process {} with business key
'{}'", process.id(), key);
Optional<ProcessInstance<M>> result =
process.instances().findByBusinessKey(key);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
- LOGGER.debug("No instance found for business key '{}'", key);
+ LOGGER.debug("No instance found for process {} with business key
'{}'", process.id(), key);
}
return result;
}
@@ -179,7 +182,7 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
if (modelConverter.isEmpty()) {
return null;
}
- LOGGER.info("Starting new process instance with signal '{}' for event
{}", trigger, event);
+ LOGGER.info("Starting new process of type {} with signal '{}' for
event {}", process.id(), trigger, event);
return processService.createProcessInstance(
process,
event.getKogitoBusinessKey(),
diff --git
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
index ba2108d7c9..e6326dbc9a 100644
---
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
+++
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
@@ -18,9 +18,6 @@
*/
package org.kie.kogito.event.impl;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -127,22 +124,13 @@ class ProcessEventDispatcherTest {
}
@Test
- void testCloudEventNewInstanceWithReference() throws Exception {
+ void testCloudEventNewInstanceWithInvalidReference() throws Exception {
EventDispatcher<DummyModel, TestEvent> dispatcher = new
ProcessEventDispatcher<>(process, modelConverter(), processService, null, o ->
o.getData());
ProcessInstance<DummyModel> instance =
dispatcher.dispatch(DUMMY_TOPIC, new TestCloudEvent<>(new TestEvent("pepe"),
DUMMY_TOPIC, "source", "invalidReference"));
-
- ArgumentCaptor<String> signal = ArgumentCaptor.forClass(String.class);
- ArgumentCaptor<String> referenceId =
ArgumentCaptor.forClass(String.class);
- ArgumentCaptor<Map<String, List<String>>> headers =
ArgumentCaptor.forClass(Map.class);
-
+ assertThat(instance).isNull();
verify(processInstances, times(1)).findById("invalidReference");
- verify(processService, never()).signalProcessInstance(eq(process),
any(), any(), signal.capture());
- verify(processService, times(1)).createProcessInstance(eq(process),
any(), any(DummyModel.class), headers.capture(), any(), signal.capture(),
referenceId.capture(), isNull());
-
- assertThat(signal.getValue()).isEqualTo(DUMMY_TOPIC);
- assertThat(referenceId.getValue()).isEqualTo("1");
- assertThat(headers.getValue()).containsEntry("source",
Arrays.asList("source"));
- assertThat(processInstance).isEqualTo(instance);
+ verify(processService, never()).signalProcessInstance(eq(process),
any(), any(), any());
+ verify(processService, never()).createProcessInstance(eq(process),
any(), any(DummyModel.class), any(), any(), any(), any(), isNull());
}
@Test
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
index e236f26e8d..00e6fb988c 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
@@ -132,6 +132,14 @@ class EventFlowIT {
waitForFinish(flowName, id, Duration.ofSeconds(5));
}
+ @Test
+ void testWrongProcessIdNotProcessedRainy() throws IOException {
+ final String flowName = "nonStartEvent";
+ final String id = startProcess(flowName);
+ sendEvents(UUID.randomUUID().toString(), "move");
+ assertThrows(ConditionTimeoutException.class, () ->
waitForFinish(flowName, id, Duration.ofSeconds(5)));
+ }
+
private void sendEvents(String id, String... eventTypes) throws
IOException {
sendEvents(id, Optional.empty(), eventTypes);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]