This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new f68b92d884 [KOGITO-9276] Adding support for businessKey to resume a 
process (#3412)
f68b92d884 is described below

commit f68b92d8842bedb106cbe606805f11fea74b002e
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Tue Feb 27 11:58:32 2024 +0100

    [KOGITO-9276] Adding support for businessKey to resume a process (#3412)
    
    * [KOGITO-9276] Adding support for businnesKey to resume a process
    
    * [KOGITO-9276] Cleaner alternative
    
    * [KOGITO-9276] Gonzalos comments
---
 .../org/kie/kogito/process/ProcessInstances.java   |  4 ++
 .../kogito/event/impl/ProcessEventDispatcher.java  | 82 ++++++++++++++--------
 .../kogito/quarkus/workflows/AssuredTestUtils.java | 33 ++++++---
 .../kie/kogito/quarkus/workflows/EventFlowIT.java  | 19 +++--
 4 files changed, 94 insertions(+), 44 deletions(-)

diff --git 
a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java 
b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
index d6233541d4..6cc0d95649 100644
--- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
+++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
@@ -29,6 +29,10 @@ public interface ProcessInstances<T> {
 
     Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode 
mode);
 
+    default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
+        return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
+    }
+
     Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);
 
     default Stream<ProcessInstance<T>> stream() {
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
index 48d46a04fd..704e854865 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
@@ -37,7 +37,6 @@ import org.kie.kogito.correlation.CorrelationInstance;
 import org.kie.kogito.correlation.SimpleCorrelation;
 import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.EventDispatcher;
-import org.kie.kogito.internal.utils.ConversionUtils;
 import org.kie.kogito.process.Process;
 import org.kie.kogito.process.ProcessInstance;
 import org.kie.kogito.process.ProcessService;
@@ -74,20 +73,52 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
             }
             return CompletableFuture.completedFuture(null);
         }
+        return resolveCorrelationId(event)
+                .map(kogitoReferenceId -> asCompletable(trigger, event, 
findById(kogitoReferenceId)))
+                .orElseGet(() -> {
+                    // check processInstanceId
+                    String processInstanceId = event.getKogitoReferenceId();
+                    if (processInstanceId != null) {
+                        return asCompletable(trigger, event, 
findById(processInstanceId));
+                    }
+                    // check businessKey
+                    String businessKey = event.getKogitoBusinessKey();
+                    if (businessKey != null) {
+                        return asCompletable(trigger, event, 
findByBusinessKey(businessKey));
+                    }
+                    // try to start a new instance if possible
+                    return CompletableFuture.supplyAsync(() -> 
startNewInstance(trigger, event), executor);
+                });
+    }
 
-        final String kogitoReferenceId = resolveCorrelationId(event);
-        if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
-            return CompletableFuture.supplyAsync(() -> 
handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
-        }
+    private CompletableFuture<ProcessInstance<M>> asCompletable(String 
trigger, DataEvent<D> event, Optional<ProcessInstance<M>> processInstance) {
 
-        //if the trigger is for a start event (model converter is set only for 
start node)
-        if (modelConverter.isPresent()) {
-            return CompletableFuture.supplyAsync(() -> 
startNewInstance(trigger, event), executor);
+        return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Sending signal {} to process instance id '{}'", 
trigger, pi.id());
+            }
+            signalProcessInstance(trigger, pi.id(), event);
+            return pi;
+        }).orElseGet(() -> startNewInstance(trigger, event)), executor);
+    }
+
+    private Optional<ProcessInstance<M>> findById(String id) {
+        LOGGER.debug("Received message with process instance id '{}'", id);
+        Optional<ProcessInstance<M>> result = process.instances().findById(id);
+        if (LOGGER.isDebugEnabled() && result.isEmpty()) {
+            LOGGER.debug("No instance found for process instance id '{}'", id);
         }
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("No matches found for trigger {} in process {}. 
Skipping consumed message {}", trigger, process.id(), event);
+        return result;
+
+    }
+
+    private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
+        LOGGER.debug("Received message with business key '{}'", key);
+        Optional<ProcessInstance<M>> result = 
process.instances().findByBusinessKey(key);
+        if (LOGGER.isDebugEnabled() && result.isEmpty()) {
+            LOGGER.debug("No instance found for business key '{}'", key);
         }
-        return CompletableFuture.completedFuture(null);
+        return result;
     }
 
     private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> 
event) {
@@ -95,10 +126,10 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
                 correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, 
resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
     }
 
-    private String resolveCorrelationId(DataEvent<?> event) {
+    private Optional<String> resolveCorrelationId(DataEvent<?> event) {
         return 
compositeCorrelation(event).flatMap(process.correlations()::find)
-                .map(CorrelationInstance::getCorrelatedId)
-                .orElseGet(event::getKogitoReferenceId);
+                .map(CorrelationInstance::getCorrelatedId);
+
     }
 
     private Object resolve(DataEvent<?> event, String key) {
@@ -113,22 +144,6 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
         }
     }
 
-    private ProcessInstance<M> handleMessageWithReference(String trigger, 
DataEvent<D> event, String instanceId) {
-        LOGGER.debug("Received message with reference id '{}' going to use it 
to send signal '{}'",
-                instanceId,
-                trigger);
-        return process.instances()
-                .findById(instanceId)
-                .map(instance -> {
-                    signalProcessInstance(trigger, instance.id(), event);
-                    return instance;
-                })
-                .orElseGet(() -> {
-                    LOGGER.info("Process instance with id '{}' not found for 
triggering signal '{}'", instanceId, trigger);
-                    return startNewInstance(trigger, event);
-                });
-    }
-
     private Optional<M> signalProcessInstance(String trigger, String id, 
DataEvent<D> event) {
         return processService.signalProcessInstance((Process) process, id, 
dataResolver.apply(event), "Message-" + trigger);
     }
@@ -139,7 +154,12 @@ public class ProcessEventDispatcher<M extends Model, D> 
implements EventDispatch
             return processService.createProcessInstance(process, 
event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)),
                     headersFromEvent(event), event.getKogitoStartFromNode(), 
trigger,
                     event.getKogitoProcessInstanceId(), 
compositeCorrelation(event).orElse(null));
-        }).orElse(null);
+        }).orElseGet(() -> {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("No matches found for trigger {} in process {}. 
Skipping consumed message {}", trigger, process.id(), event);
+            }
+            return null;
+        });
     }
 
     protected Map<String, List<String>> headersFromEvent(DataEvent<D> event) {
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
index 92a031c10e..b620b2de52 100644
--- 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
@@ -22,13 +22,16 @@ import java.net.URI;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.UUID;
 
 import org.kie.kogito.event.CloudEventMarshaller;
+import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
 
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
 import io.restassured.http.ContentType;
+import io.restassured.specification.RequestSpecification;
 
 import static io.restassured.RestAssured.given;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -40,8 +43,11 @@ class AssuredTestUtils {
     }
 
     static String startProcess(String flowName) {
-        String id = startProcessNoCheck(flowName);
+        return startProcess(flowName, Optional.empty());
+    }
 
+    static String startProcess(String flowName, Optional<String> businessKey) {
+        String id = startProcessNoCheck(flowName, businessKey);
         given()
                 .contentType(ContentType.JSON)
                 .accept(ContentType.JSON)
@@ -52,11 +58,16 @@ class AssuredTestUtils {
     }
 
     static String startProcessNoCheck(String flowName) {
-        return given()
+        return startProcessNoCheck(flowName, Optional.empty());
+    }
+
+    static String startProcessNoCheck(String flowName, Optional<String> 
businessKey) {
+        RequestSpecification body = given()
                 .contentType(ContentType.JSON)
                 .when()
-                .body(Collections.singletonMap("workflowdata", 
Collections.emptyMap()))
-                .post("/" + flowName)
+                .body(Collections.singletonMap("workflowdata", 
Collections.emptyMap()));
+        businessKey.ifPresent(key -> body.queryParam("businessKey", key));
+        return body.post("/" + flowName)
                 .then()
                 .statusCode(201)
                 .extract().path("id");
@@ -73,15 +84,19 @@ class AssuredTestUtils {
                         .statusCode(404));
     }
 
-    static CloudEvent buildCloudEvent(String id, String type, 
CloudEventMarshaller<byte[]> marshaller) {
-        return CloudEventBuilder.v1()
+    static CloudEvent buildCloudEvent(String id, Optional<String> businessKey, 
String type, CloudEventMarshaller<byte[]> marshaller) {
+        io.cloudevents.core.v1.CloudEventBuilder builder = 
CloudEventBuilder.v1()
                 .withId(UUID.randomUUID().toString())
                 .withSource(URI.create(""))
                 .withType(type)
                 .withTime(OffsetDateTime.now())
-                .withExtension("kogitoprocrefid", id)
-                
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type,
 "This has been injected by the event")))
-                .build();
+                
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type,
 "This has been injected by the event")));
+        businessKey.ifPresentOrElse(key -> 
builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> 
builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
+        return builder.build();
+    }
+
+    static CloudEvent buildCloudEvent(String id, String type, 
CloudEventMarshaller<byte[]> marshaller) {
+        return buildCloudEvent(id, Optional.empty(), type, marshaller);
     }
 
 }
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
index 50fcb8e822..91ede4a21f 100644
--- 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
@@ -21,6 +21,7 @@ package org.kie.kogito.quarkus.workflows;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Optional;
 
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.jupiter.api.BeforeAll;
@@ -57,7 +58,7 @@ class EventFlowIT {
 
     @Test
     void testNotStartingEvent() throws IOException {
-        doIt("nonStartEvent", "move");
+        doIt("nonStartEvent", Optional.of("manolo"), "move");
     }
 
     @Test
@@ -108,25 +109,35 @@ class EventFlowIT {
     }
 
     private void sendEvents(String id, String... eventTypes) throws 
IOException {
+        sendEvents(id, Optional.empty(), eventTypes);
+    }
+
+    private void sendEvents(String id, Optional<String> businessKey, String... 
eventTypes) throws IOException {
         for (String eventType : eventTypes) {
             given()
                     .contentType(ContentType.JSON)
                     .when()
-                    .body(generateCloudEvent(id, eventType))
+                    .body(generateCloudEvent(id, businessKey, eventType))
                     .post("/" + eventType)
                     .then()
                     .statusCode(202);
         }
     }
 
+    private void doIt(String flowName, Optional<String> businessKey, String... 
eventTypes) throws IOException {
+        String id = startProcess(flowName, businessKey);
+        sendEvents(id, businessKey, eventTypes);
+        waitForFinish(flowName, id, Duration.ofSeconds(15));
+    }
+
     private void doIt(String flowName, String... eventTypes) throws 
IOException {
         String id = startProcess(flowName);
         sendEvents(id, eventTypes);
         waitForFinish(flowName, id, Duration.ofSeconds(15));
     }
 
-    private byte[] generateCloudEvent(String id, String type) throws 
IOException {
+    private byte[] generateCloudEvent(String id, Optional<String> businessKey, 
String type) throws IOException {
         CloudEventMarshaller<byte[]> marshaller = 
marshallers.getOrDefault(type, defaultMarshaller);
-        return marshaller.marshall(buildCloudEvent(id, type, marshaller));
+        return marshaller.marshall(buildCloudEvent(id, businessKey, type, 
marshaller));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to