This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new f68b92d884 [KOGITO-9276] Adding support for businessKey to resume a
process (#3412)
f68b92d884 is described below
commit f68b92d8842bedb106cbe606805f11fea74b002e
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Tue Feb 27 11:58:32 2024 +0100
[KOGITO-9276] Adding support for businessKey to resume a process (#3412)
* [KOGITO-9276] Adding support for businnesKey to resume a process
* [KOGITO-9276] Cleaner alternative
* [KOGITO-9276] Gonzalos comments
---
.../org/kie/kogito/process/ProcessInstances.java | 4 ++
.../kogito/event/impl/ProcessEventDispatcher.java | 82 ++++++++++++++--------
.../kogito/quarkus/workflows/AssuredTestUtils.java | 33 ++++++---
.../kie/kogito/quarkus/workflows/EventFlowIT.java | 19 +++--
4 files changed, 94 insertions(+), 44 deletions(-)
diff --git
a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
index d6233541d4..6cc0d95649 100644
--- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
+++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java
@@ -29,6 +29,10 @@ public interface ProcessInstances<T> {
Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode
mode);
+ default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
+ return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
+ }
+
Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);
default Stream<ProcessInstance<T>> stream() {
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
index 48d46a04fd..704e854865 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java
@@ -37,7 +37,6 @@ import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
-import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
@@ -74,20 +73,52 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
}
return CompletableFuture.completedFuture(null);
}
+ return resolveCorrelationId(event)
+ .map(kogitoReferenceId -> asCompletable(trigger, event,
findById(kogitoReferenceId)))
+ .orElseGet(() -> {
+ // check processInstanceId
+ String processInstanceId = event.getKogitoReferenceId();
+ if (processInstanceId != null) {
+ return asCompletable(trigger, event,
findById(processInstanceId));
+ }
+ // check businessKey
+ String businessKey = event.getKogitoBusinessKey();
+ if (businessKey != null) {
+ return asCompletable(trigger, event,
findByBusinessKey(businessKey));
+ }
+ // try to start a new instance if possible
+ return CompletableFuture.supplyAsync(() ->
startNewInstance(trigger, event), executor);
+ });
+ }
- final String kogitoReferenceId = resolveCorrelationId(event);
- if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
- return CompletableFuture.supplyAsync(() ->
handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
- }
+ private CompletableFuture<ProcessInstance<M>> asCompletable(String
trigger, DataEvent<D> event, Optional<ProcessInstance<M>> processInstance) {
- //if the trigger is for a start event (model converter is set only for
start node)
- if (modelConverter.isPresent()) {
- return CompletableFuture.supplyAsync(() ->
startNewInstance(trigger, event), executor);
+ return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Sending signal {} to process instance id '{}'",
trigger, pi.id());
+ }
+ signalProcessInstance(trigger, pi.id(), event);
+ return pi;
+ }).orElseGet(() -> startNewInstance(trigger, event)), executor);
+ }
+
+ private Optional<ProcessInstance<M>> findById(String id) {
+ LOGGER.debug("Received message with process instance id '{}'", id);
+ Optional<ProcessInstance<M>> result = process.instances().findById(id);
+ if (LOGGER.isDebugEnabled() && result.isEmpty()) {
+ LOGGER.debug("No instance found for process instance id '{}'", id);
}
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("No matches found for trigger {} in process {}.
Skipping consumed message {}", trigger, process.id(), event);
+ return result;
+
+ }
+
+ private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
+ LOGGER.debug("Received message with business key '{}'", key);
+ Optional<ProcessInstance<M>> result =
process.instances().findByBusinessKey(key);
+ if (LOGGER.isDebugEnabled() && result.isEmpty()) {
+ LOGGER.debug("No instance found for business key '{}'", key);
}
- return CompletableFuture.completedFuture(null);
+ return result;
}
private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?>
event) {
@@ -95,10 +126,10 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k,
resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}
- private String resolveCorrelationId(DataEvent<?> event) {
+ private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return
compositeCorrelation(event).flatMap(process.correlations()::find)
- .map(CorrelationInstance::getCorrelatedId)
- .orElseGet(event::getKogitoReferenceId);
+ .map(CorrelationInstance::getCorrelatedId);
+
}
private Object resolve(DataEvent<?> event, String key) {
@@ -113,22 +144,6 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
}
}
- private ProcessInstance<M> handleMessageWithReference(String trigger,
DataEvent<D> event, String instanceId) {
- LOGGER.debug("Received message with reference id '{}' going to use it
to send signal '{}'",
- instanceId,
- trigger);
- return process.instances()
- .findById(instanceId)
- .map(instance -> {
- signalProcessInstance(trigger, instance.id(), event);
- return instance;
- })
- .orElseGet(() -> {
- LOGGER.info("Process instance with id '{}' not found for
triggering signal '{}'", instanceId, trigger);
- return startNewInstance(trigger, event);
- });
- }
-
private Optional<M> signalProcessInstance(String trigger, String id,
DataEvent<D> event) {
return processService.signalProcessInstance((Process) process, id,
dataResolver.apply(event), "Message-" + trigger);
}
@@ -139,7 +154,12 @@ public class ProcessEventDispatcher<M extends Model, D>
implements EventDispatch
return processService.createProcessInstance(process,
event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)),
headersFromEvent(event), event.getKogitoStartFromNode(),
trigger,
event.getKogitoProcessInstanceId(),
compositeCorrelation(event).orElse(null));
- }).orElse(null);
+ }).orElseGet(() -> {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("No matches found for trigger {} in process {}.
Skipping consumed message {}", trigger, process.id(), event);
+ }
+ return null;
+ });
}
protected Map<String, List<String>> headersFromEvent(DataEvent<D> event) {
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
index 92a031c10e..b620b2de52 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java
@@ -22,13 +22,16 @@ import java.net.URI;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
+import java.util.Optional;
import java.util.UUID;
import org.kie.kogito.event.CloudEventMarshaller;
+import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.restassured.http.ContentType;
+import io.restassured.specification.RequestSpecification;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -40,8 +43,11 @@ class AssuredTestUtils {
}
static String startProcess(String flowName) {
- String id = startProcessNoCheck(flowName);
+ return startProcess(flowName, Optional.empty());
+ }
+ static String startProcess(String flowName, Optional<String> businessKey) {
+ String id = startProcessNoCheck(flowName, businessKey);
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
@@ -52,11 +58,16 @@ class AssuredTestUtils {
}
static String startProcessNoCheck(String flowName) {
- return given()
+ return startProcessNoCheck(flowName, Optional.empty());
+ }
+
+ static String startProcessNoCheck(String flowName, Optional<String>
businessKey) {
+ RequestSpecification body = given()
.contentType(ContentType.JSON)
.when()
- .body(Collections.singletonMap("workflowdata",
Collections.emptyMap()))
- .post("/" + flowName)
+ .body(Collections.singletonMap("workflowdata",
Collections.emptyMap()));
+ businessKey.ifPresent(key -> body.queryParam("businessKey", key));
+ return body.post("/" + flowName)
.then()
.statusCode(201)
.extract().path("id");
@@ -73,15 +84,19 @@ class AssuredTestUtils {
.statusCode(404));
}
- static CloudEvent buildCloudEvent(String id, String type,
CloudEventMarshaller<byte[]> marshaller) {
- return CloudEventBuilder.v1()
+ static CloudEvent buildCloudEvent(String id, Optional<String> businessKey,
String type, CloudEventMarshaller<byte[]> marshaller) {
+ io.cloudevents.core.v1.CloudEventBuilder builder =
CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(type)
.withTime(OffsetDateTime.now())
- .withExtension("kogitoprocrefid", id)
-
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type,
"This has been injected by the event")))
- .build();
+
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type,
"This has been injected by the event")));
+ businessKey.ifPresentOrElse(key ->
builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () ->
builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
+ return builder.build();
+ }
+
+ static CloudEvent buildCloudEvent(String id, String type,
CloudEventMarshaller<byte[]> marshaller) {
+ return buildCloudEvent(id, Optional.empty(), type, marshaller);
}
}
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
index 50fcb8e822..91ede4a21f 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java
@@ -21,6 +21,7 @@ package org.kie.kogito.quarkus.workflows;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
+import java.util.Optional;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
@@ -57,7 +58,7 @@ class EventFlowIT {
@Test
void testNotStartingEvent() throws IOException {
- doIt("nonStartEvent", "move");
+ doIt("nonStartEvent", Optional.of("manolo"), "move");
}
@Test
@@ -108,25 +109,35 @@ class EventFlowIT {
}
private void sendEvents(String id, String... eventTypes) throws
IOException {
+ sendEvents(id, Optional.empty(), eventTypes);
+ }
+
+ private void sendEvents(String id, Optional<String> businessKey, String...
eventTypes) throws IOException {
for (String eventType : eventTypes) {
given()
.contentType(ContentType.JSON)
.when()
- .body(generateCloudEvent(id, eventType))
+ .body(generateCloudEvent(id, businessKey, eventType))
.post("/" + eventType)
.then()
.statusCode(202);
}
}
+ private void doIt(String flowName, Optional<String> businessKey, String...
eventTypes) throws IOException {
+ String id = startProcess(flowName, businessKey);
+ sendEvents(id, businessKey, eventTypes);
+ waitForFinish(flowName, id, Duration.ofSeconds(15));
+ }
+
private void doIt(String flowName, String... eventTypes) throws
IOException {
String id = startProcess(flowName);
sendEvents(id, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}
- private byte[] generateCloudEvent(String id, String type) throws
IOException {
+ private byte[] generateCloudEvent(String id, Optional<String> businessKey,
String type) throws IOException {
CloudEventMarshaller<byte[]> marshaller =
marshallers.getOrDefault(type, defaultMarshaller);
- return marshaller.marshall(buildCloudEvent(id, type, marshaller));
+ return marshaller.marshall(buildCloudEvent(id, businessKey, type,
marshaller));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]