This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 5f8c28cb5a [KOGITO-9785] Option to configure timeout error handler
(#3255)
5f8c28cb5a is described below
commit 5f8c28cb5a1be625c253aa09320cea1f18f47151
Author: Daniele Martinoli <[email protected]>
AuthorDate: Tue Oct 24 20:38:03 2023 +0200
[KOGITO-9785] Option to configure timeout error handler (#3255)
* Timer node throws a TimerExpiredException in case there is an exception
scope configured for it
* Added standard header
* Added IT for timeout with error handler
* [KOGITO-9785] Implementing as error code
* Removed new exception
---------
Co-authored-by: Francisco Javier Tirado Sarti <[email protected]>
---
.../workflow/instance/node/TimerNodeInstance.java | 16 ++-
...lback-state-with-timeouts-error-handler.sw.json | 134 +++++++++++++++++++++
.../src/main/resources/specs/callbackResults.yaml | 49 ++++++++
.../quarkus/workflows/AbstractCallbackStateIT.java | 6 +-
.../CallbackStateWithTimeoutsErrorHandlerIT.java | 102 ++++++++++++++++
5 files changed, 304 insertions(+), 3 deletions(-)
diff --git
a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
index 4588ddebed..c095d861bc 100755
---
a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
+++
b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
@@ -27,7 +27,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import org.jbpm.process.core.context.exception.ExceptionScope;
import org.jbpm.process.instance.InternalProcessRuntime;
+import org.jbpm.process.instance.context.exception.ExceptionScopeInstance;
import org.jbpm.workflow.core.Node;
import org.jbpm.workflow.core.node.TimerNode;
import org.kie.api.runtime.process.EventListener;
@@ -37,11 +39,15 @@ import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.process.BaseEventDescription;
import org.kie.kogito.process.EventDescription;
+import org.kie.kogito.process.workitem.WorkItemExecutionException;
import org.kie.kogito.services.uow.BaseWorkUnit;
import org.kie.kogito.timer.TimerInstance;
import org.kie.kogito.uow.WorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TimerNodeInstance extends StateBasedNodeInstance implements
EventListener {
+ private static final Logger logger =
LoggerFactory.getLogger(TimerNodeInstance.class);
private static final long serialVersionUID = 510l;
public static final String TIMER_TRIGGERED_EVENT = "timerTriggered";
@@ -111,7 +117,15 @@ public class TimerNodeInstance extends
StateBasedNodeInstance implements EventLi
@Override
public void triggerCompleted(boolean remove) {
- triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, remove);
+ Exception e = new WorkItemExecutionException("TimedOut");
+ ExceptionScopeInstance esi = (ExceptionScopeInstance)
resolveContextInstance(ExceptionScope.EXCEPTION_SCOPE, e);
+ if (esi != null) {
+ logger.debug("Triggering exception handler for {}",
e.getClass().getName());
+ esi.handleException(e, getProcessContext(e));
+ } else {
+ logger.trace("No exception handler for {}",
e.getClass().getName());
+ triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, remove);
+ }
}
@Override
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
new file mode 100644
index 0000000000..d522faa1f2
--- /dev/null
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
@@ -0,0 +1,134 @@
+{
+ "id": "callback_state_with_timeouts_error_handler",
+ "version": "1.0",
+ "name": "Callback State With Timeouts Error Handler",
+ "expressionLang": "jsonpath",
+ "description": "Callback State With Timeouts Error Handler Test",
+ "start": "CallbackState",
+ "events": [
+ {
+ "name": "callbackEvent",
+ "source": "",
+ "type": "callback_state_timeouts_event_type"
+ }
+ ],
+ "errors": [
+ {
+ "name": "callbackError",
+ "code": "java.lang.Exception"
+ },
+ {
+ "name": "timeoutError",
+ "code": "TimedOut"
+ }
+ ],
+ "functions": [
+ {
+ "name": "callbackFunction",
+ "type": "rest",
+ "operation": "classpath:specs/external-service.yaml#sendRequest"
+ },
+ {
+ "name": "publishSuccess",
+ "type": "asyncapi",
+ "operation": "specs/callbackResults.yaml#sendSuccess"
+ },
+ {
+ "name": "publishTimeoutExpired",
+ "type": "asyncapi",
+ "operation": "specs/callbackResults.yaml#sendTimeoutExpired"
+ },
+ {
+ "name": "publishFailure",
+ "type": "asyncapi",
+ "operation": "specs/callbackResults.yaml#sendFailed"
+ }
+ ],
+ "states": [
+ {
+ "name": "CallbackState",
+ "type": "callback",
+ "action": {
+ "name": "callbackAction",
+ "functionRef": {
+ "refName": "callbackFunction",
+ "arguments": {
+ "query": "$.query"
+ }
+ }
+ },
+ "eventRef": "callbackEvent",
+ "transition": "PublishSuccess",
+ "onErrors": [
+ {
+ "errorRef": "callbackError",
+ "transition": "PublishError"
+ },
+ {
+ "errorRef": "timeoutError",
+ "transition": "PublishTimeout"
+ }
+ ],
+ "timeouts": {
+ "eventTimeout": "PT5S"
+ }
+ },
+ {
+ "name": "PublishSuccess",
+ "type": "operation",
+ "actions": [
+ {
+ "name": "publishSuccess",
+ "functionRef": "publishSuccess"
+ }
+ ],
+ "transition": "FinalizeSuccessful"
+ },
+ {
+ "name": "FinalizeSuccessful",
+ "type": "inject",
+ "data": {
+ "lastExecutedState": "FinalizeSuccessful"
+ },
+ "end": true
+ },
+ {
+ "name": "PublishTimeout",
+ "type": "operation",
+ "actions": [
+ {
+ "name": "publishTimeoutExpired",
+ "functionRef": "publishTimeoutExpired"
+ }
+ ],
+ "transition": "FinalizeTimeout"
+ },
+ {
+ "name": "FinalizeTimeout",
+ "type": "inject",
+ "data": {
+ "lastExecutedState": "FinalizeTimeout"
+ },
+ "end": true
+ },
+ {
+ "name": "PublishError",
+ "type": "operation",
+ "actions": [
+ {
+ "name": "publishFailure",
+ "functionRef": "publishFailure"
+ }
+ ],
+ "transition": "FinalizeWithError"
+ },
+ {
+ "name": "FinalizeWithError",
+ "type": "inject",
+ "data": {
+ "lastExecutedState": "FinalizeWithError"
+ },
+ "end": true
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
new file mode 100644
index 0000000000..d5b962c7c7
--- /dev/null
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
@@ -0,0 +1,49 @@
+asyncapi: '2.0.0'
+id: 'callbackResults'
+info:
+ title: Kafka Application
+ version: '1.0.0'
+ description: Kafka Application
+ license:
+ name: Apache 2.0
+ url: https://www.apache.org/licenses/LICENSE-2.0
+servers:
+ production:
+ url: localhost:9092
+ description: Development server
+ protocol: kafka
+ protocolVersion: '1.0.0'
+channels:
+ success:
+ description: A message channel for successful completions
+ publish:
+ operationId: sendSuccess
+ summary: Success
+ message:
+ $ref: '#/components/messages/message'
+ timeout:
+ description: A message channel for expired timeouts
+ publish:
+ operationId: sendTimeoutExpired
+ summary: Timeout Expired
+ message:
+ $ref: '#/components/messages/message'
+ error:
+ description: A message channel for failed executions
+ publish:
+ operationId: sendFailed
+ summary: Failed
+ message:
+ $ref: '#/components/messages/message'
+components:
+ messages:
+ message:
+ name: message
+ title: A message
+ summary: A message
+ contentType: application/json
+ payload:
+ $ref: "#/components/schemas/message"
+ schemas:
+ message:
+ type: object
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
index 9d8d34f200..9b753f9566 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
@@ -73,7 +73,7 @@ abstract class AbstractCallbackStateIT {
}
}
- void executeCallbackStateSuccessfulPath(String callbackProcessPostUrl,
+ String executeCallbackStateSuccessfulPath(String callbackProcessPostUrl,
String callbackProcessGetByIdUrl,
String answer,
String callbackEventType,
@@ -104,9 +104,10 @@ abstract class AbstractCallbackStateIT {
// give some time for the event to be processed and the process to
finish.
assertProcessInstanceHasFinished(callbackProcessGetByIdUrl,
processInstanceId, 1, 180);
+ return processInstanceId;
}
- void executeCallbackStateWithErrorPath(String callbackProcessPostUrl,
String callbackProcessGetByIdUrl) throws Exception {
+ String executeCallbackStateWithErrorPath(String callbackProcessPostUrl,
String callbackProcessGetByIdUrl) throws Exception {
// start a new process instance and collect the results.
String processInput = buildProcessInput(GENERATE_ERROR_QUERY);
JsonPath result = newProcessInstance(callbackProcessPostUrl,
processInput);
@@ -125,6 +126,7 @@ abstract class AbstractCallbackStateIT {
// the process instance should not be there since an end state was
reached.
assertProcessInstanceNotExists(callbackProcessGetByIdUrl,
processInstanceId);
+ return processInstanceId;
}
protected static String buildProcessInput(String query) {
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
new file mode 100644
index 0000000000..b4f7a4ed73
--- /dev/null
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.quarkus.workflows;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.event.Converter;
+import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
+import org.kie.kogito.event.impl.StringCloudEventUnmarshallerFactory;
+import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.cloudevents.CloudEvent;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static
org.kie.kogito.quarkus.workflows.ExternalServiceMock.SUCCESSFUL_QUERY;
+import static
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceExists;
+import static
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceHasFinished;
+import static
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.newProcessInstanceAndGetId;
+
+@QuarkusIntegrationTest
+@QuarkusTestResource(ExternalServiceMock.class)
+@QuarkusTestResource(KafkaQuarkusTestResource.class)
+class CallbackStateWithTimeoutsErrorHandlerIT extends AbstractCallbackStateIT {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CallbackStateWithTimeoutsErrorHandlerIT.class);
+
+ private static final String CALLBACK_STATE_TIMEOUTS_SERVICE_URL =
"/callback_state_with_timeouts_error_handler";
+ private static final String CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL =
CALLBACK_STATE_TIMEOUTS_SERVICE_URL + "/{id}";
+ private static final String CALLBACK_STATE_TIMEOUTS_EVENT_TYPE =
"callback_state_timeouts_event_type";
+ private static final String CALLBACK_STATE_TIMEOUTS_TOPIC =
"callback_state_timeouts_event_type";
+
+ @Test
+ @SuppressWarnings("squid:S2699")
+ void callbackStateTimeoutsSuccessful() throws Exception {
+ String processInstanceId =
executeCallbackStateSuccessfulPath(CALLBACK_STATE_TIMEOUTS_SERVICE_URL,
+ CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL,
+ ANSWER,
+ CALLBACK_STATE_TIMEOUTS_EVENT_TYPE,
+ CALLBACK_STATE_TIMEOUTS_TOPIC);
+ waitForFinalizedEvent(processInstanceId, "success");
+ }
+
+ @Test
+ void callbackStateTimeoutsExceeded() throws Exception {
+ String processInput = buildProcessInput(SUCCESSFUL_QUERY);
+ String processInstanceId =
newProcessInstanceAndGetId(CALLBACK_STATE_TIMEOUTS_SERVICE_URL, processInput);
+ System.out.println("processInstanceId is " + processInstanceId);
+
+ assertProcessInstanceExists(CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL,
processInstanceId);
+
+
assertProcessInstanceHasFinished(CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL,
processInstanceId, 1, 10);
+ waitForFinalizedEvent(processInstanceId, "timeout");
+ }
+
+ @Test
+ @SuppressWarnings("squid:S2699")
+ void callbackStateWithError() throws Exception {
+ String processInstanceId =
executeCallbackStateWithErrorPath(CALLBACK_STATE_TIMEOUTS_SERVICE_URL,
CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL);
+ waitForFinalizedEvent(processInstanceId, "error");
+ }
+
+ private void waitForFinalizedEvent(String processInstanceId, String topic)
throws InterruptedException {
+ Converter<String, CloudEvent> converter = new
StringCloudEventUnmarshallerFactory(objectMapper).unmarshaller(Map.class).cloudEvent();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ kafkaClient.consume(topic, v -> {
+ try {
+ CloudEvent event = converter.convert(v);
+ LOGGER.debug("Found on topic {} CE {}", topic, event);
+ if
(processInstanceId.equals(event.getExtension(CloudEventExtensionConstants.PROCESS_INSTANCE_ID)))
{
+ countDownLatch.countDown();
+ }
+ } catch (IOException e) {
+ LOGGER.info("Unmarshall exception", e);
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ assertThat(countDownLatch.getCount()).isZero();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]