This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ea541a140 [Fix apache/incubator-kie-issues#2238] 
ProcessEventDispatcher fix (#4185)
4ea541a140 is described below

commit 4ea541a1407b1fa46c6ba70784b0a6b58cf60f98
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Mon Feb 9 12:20:44 2026 +0100

    [Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix (#4185)
    
    * [Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix
    
    A set of priorities is defined.
    If there is a correlation key, thats the process instance to be notified
    Then, the kogito reference id
    Then, the kogito business key
    Then, when try to start a new process only if there is a starting event
    listening.
    If none, of them, then we pass the trigger to the engine for correlation
    
    If for correlation or reference id, there is no existing process
    instance, then the event is discarded.
    
    Signed-off-by: fjtirado <[email protected]>
    
    * [Fix apache/incubator-kie-issues#2238] ProcessEventDispatcher fix
    
    Start instance (if applicable, meaning startNode exist) and broadcast
    
    Signed-off-by: fjtirado <[email protected]>
    
    * Apply suggestion from @pefernan
    
    Co-authored-by: Pere Fernández <[email protected]>
    
    * Apply suggestion from @pefernan and @mweiler
    
    Co-authored-by: Pere Fernández <[email protected]>
    
    ---------
    
    Signed-off-by: fjtirado <[email protected]>
    Co-authored-by: Pere Fernández <[email protected]>
---
 .../kogito/event/impl/ProcessEventDispatcher.java  | 63 +++++++++++-----------
 .../event/impl/ProcessEventDispatcherTest.java     | 20 ++-----
 .../kie/kogito/quarkus/workflows/EventFlowIT.java  |  8 +++
 3 files changed, 45 insertions(+), 46 deletions(-)

diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
index 1f6dc5bb69..349de3a9b0 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
@@ -70,41 +70,44 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
             }
             return null;
         }
-
-        // now see if we have a particular one to check
-        Optional<ProcessInstance<M>> processInstance = null;
         // obtain data from the event
         Object data = dataResolver.apply(event);
-        // check correlation key
-        String processInstanceId = resolveCorrelationId(event).orElse(null);
-        processInstance = signalTargetProcessInstance(processInstanceId, 
trigger, data, this::findById);
-        if (processInstance.isPresent()) {
-            LOGGER.debug("sending event to process {} with correlation key {} 
with trigger {} and payload {}", process.id(), processInstanceId, trigger, 
data);
-            return processInstance.get();
+        // check correlation key, if an instance associated to that 
correlation key exist, notify the instance, if it does not exist, ignore the 
event
+        Optional<String> correlationId = resolveCorrelationId(event);
+        if (correlationId.isPresent()) {
+            return signalTargetProcessInstance(correlationId.orElseThrow(), 
trigger, data, this::findById, "correlation");
         }
-
-        // check processInstanceId
-        processInstanceId = event.getKogitoReferenceId();
-        processInstance = signalTargetProcessInstance(processInstanceId, 
trigger, data, this::findById);
-        if (processInstance.isPresent()) {
-            LOGGER.debug("sending event to process {} with reference key {} 
with trigger {} and payload {}", process.id(), processInstanceId, trigger, 
data);
-            return processInstance.get();
+        // check process reference id, if the id exist, notify the instance, 
if it does not exist, ignore the event
+        String processInstanceId = event.getKogitoReferenceId();
+        if (processInstanceId != null) {
+            return signalTargetProcessInstance(processInstanceId, trigger, 
data, this::findById, "reference");
         }
-
         // check businessKey
         processInstanceId = event.getKogitoBusinessKey();
-        processInstance = signalTargetProcessInstance(processInstanceId, 
trigger, data, this::findByBusinessKey);
-        if (processInstance.isPresent()) {
-            LOGGER.debug("sending event to process {} with business key {} 
with trigger {} and payload {}", process.id(), processInstanceId, trigger, 
data);
-            return processInstance.get();
+        if (processInstanceId != null) {
+            Optional<ProcessInstance<M>> processInstance = 
signalTargetProcessInstance(processInstanceId, trigger, data, 
this::findByBusinessKey);
+            // business key is special case, since it might be used to notify 
a process instance identified by that business key or create a new one 
+            // using that business key
+            return processInstance.isPresent() ? processInstance.orElseThrow() 
: startNewInstance(trigger, event);
         }
-
+        // if we reach this point try to start a new instance if possible 
(this covers start events)
+        ProcessInstance<M> processInstance = startNewInstance(trigger, event);
         // we signal all the processes waiting for trigger (this covers 
intermediate catch events)
         LOGGER.debug("sending event to process {} with trigger {} and payload 
{}", process.id(), trigger, data);
         process.send(SignalFactory.of("Message-" + trigger, data));
+        return processInstance;
+    }
 
-        // try to start a new instance if possible (this covers start events)
-        return startNewInstance(trigger, event);
+    private ProcessInstance<M> signalTargetProcessInstance(String 
processInstanceId, String trigger, Object data, Function<String, 
Optional<ProcessInstance<M>>> findProcessInstance,
+            String messagePart) {
+        Optional<ProcessInstance<M>> processInstance = 
signalTargetProcessInstance(processInstanceId, trigger, data, 
findProcessInstance);
+        if (processInstance.isPresent()) {
+            LOGGER.debug("Event was sent to process {} with {} key {} with 
trigger {} and payload {}", process.id(), messagePart, processInstanceId, 
trigger, data);
+            return processInstance.get();
+        } else {
+            LOGGER.warn("Process {} with {} key {} with trigger {} and payload 
{} does not exist, ignoring event", process.id(), messagePart, 
processInstanceId, trigger, data);
+            return null;
+        }
 
     }
 
@@ -124,26 +127,26 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
 
     private void signalProcess(ProcessInstance<M> pi, String trigger, Object 
data) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Sending signal {} to process instance id '{}' with 
data {}", trigger, pi.id(), data);
+            LOGGER.debug("Sending signal {} to process {} with instance id 
'{}' with data {}", trigger, process.id(), pi.id(), data);
         }
         signalProcessInstance(trigger, pi.id(), data);
     }
 
     private Optional<ProcessInstance<M>> findById(String id) {
-        LOGGER.debug("Received message with process instance id '{}'", id);
+        LOGGER.debug("Received message with process {} with instance id '{}'", 
process.id(), id);
         Optional<ProcessInstance<M>> result = process.instances().findById(id);
         if (LOGGER.isDebugEnabled() && result.isEmpty()) {
-            LOGGER.debug("No instance found for process instance id '{}'", id);
+            LOGGER.debug("No instance found for process {} with instance id 
'{}'", process.id(), id);
         }
         return result;
 
     }
 
     private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
-        LOGGER.debug("Received message with business key '{}'", key);
+        LOGGER.debug("Received message with process {} with business key 
'{}'", process.id(), key);
         Optional<ProcessInstance<M>> result = 
process.instances().findByBusinessKey(key);
         if (LOGGER.isDebugEnabled() && result.isEmpty()) {
-            LOGGER.debug("No instance found for business key '{}'", key);
+            LOGGER.debug("No instance found for process {} with business key 
'{}'", process.id(), key);
         }
         return result;
     }
@@ -179,7 +182,7 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
         if (modelConverter.isEmpty()) {
             return null;
         }
-        LOGGER.info("Starting new process instance with signal '{}' for event 
{}", trigger, event);
+        LOGGER.info("Starting new process of type {} with signal '{}' for 
event {}", process.id(), trigger, event);
         return processService.createProcessInstance(
                 process,
                 event.getKogitoBusinessKey(),
diff --git 
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
 
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
index ba2108d7c9..e6326dbc9a 100644
--- 
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
+++ 
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java
@@ -18,9 +18,6 @@
  */
 package org.kie.kogito.event.impl;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -127,22 +124,13 @@ class ProcessEventDispatcherTest {
     }
 
     @Test
-    void testCloudEventNewInstanceWithReference() throws Exception {
+    void testCloudEventNewInstanceWithInvalidReference() throws Exception {
         EventDispatcher<DummyModel, TestEvent> dispatcher = new 
ProcessEventDispatcher<>(process, modelConverter(), processService, null, o -> 
o.getData());
         ProcessInstance<DummyModel> instance = 
dispatcher.dispatch(DUMMY_TOPIC, new TestCloudEvent<>(new TestEvent("pepe"), 
DUMMY_TOPIC, "source", "invalidReference"));
-
-        ArgumentCaptor<String> signal = ArgumentCaptor.forClass(String.class);
-        ArgumentCaptor<String> referenceId = 
ArgumentCaptor.forClass(String.class);
-        ArgumentCaptor<Map<String, List<String>>> headers = 
ArgumentCaptor.forClass(Map.class);
-
+        assertThat(instance).isNull();
         verify(processInstances, times(1)).findById("invalidReference");
-        verify(processService, never()).signalProcessInstance(eq(process), 
any(), any(), signal.capture());
-        verify(processService, times(1)).createProcessInstance(eq(process), 
any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), 
referenceId.capture(), isNull());
-
-        assertThat(signal.getValue()).isEqualTo(DUMMY_TOPIC);
-        assertThat(referenceId.getValue()).isEqualTo("1");
-        assertThat(headers.getValue()).containsEntry("source", 
Arrays.asList("source"));
-        assertThat(processInstance).isEqualTo(instance);
+        verify(processService, never()).signalProcessInstance(eq(process), 
any(), any(), any());
+        verify(processService, never()).createProcessInstance(eq(process), 
any(), any(DummyModel.class), any(), any(), any(), any(), isNull());
     }
 
     @Test
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
index e236f26e8d..00e6fb988c 100644
--- 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
@@ -132,6 +132,14 @@ class EventFlowIT {
         waitForFinish(flowName, id, Duration.ofSeconds(5));
     }
 
+    @Test
+    void testWrongProcessIdNotProcessedRainy() throws IOException {
+        final String flowName = "nonStartEvent";
+        final String id = startProcess(flowName);
+        sendEvents(UUID.randomUUID().toString(), "move");
+        assertThrows(ConditionTimeoutException.class, () -> 
waitForFinish(flowName, id, Duration.ofSeconds(5)));
+    }
+
     private void sendEvents(String id, String... eventTypes) throws 
IOException {
         sendEvents(id, Optional.empty(), eventTypes);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to