This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f8c28cb5a [KOGITO-9785] Option to configure timeout error handler 
(#3255)
5f8c28cb5a is described below

commit 5f8c28cb5a1be625c253aa09320cea1f18f47151
Author: Daniele Martinoli <[email protected]>
AuthorDate: Tue Oct 24 20:38:03 2023 +0200

    [KOGITO-9785] Option to configure timeout error handler (#3255)
    
    * Timer node throws a TimerExpiredException in case there is an exception 
scope configured for it
    
    * Added standard header
    
    * Added IT for timeout with error handler
    
    * [KOGITO-9785] Implementing as error code
    
    * Removed new exception
    
    ---------
    
    Co-authored-by: Francisco Javier Tirado Sarti <[email protected]>
---
 .../workflow/instance/node/TimerNodeInstance.java  |  16 ++-
 ...lback-state-with-timeouts-error-handler.sw.json | 134 +++++++++++++++++++++
 .../src/main/resources/specs/callbackResults.yaml  |  49 ++++++++
 .../quarkus/workflows/AbstractCallbackStateIT.java |   6 +-
 .../CallbackStateWithTimeoutsErrorHandlerIT.java   | 102 ++++++++++++++++
 5 files changed, 304 insertions(+), 3 deletions(-)

diff --git 
a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
 
b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
index 4588ddebed..c095d861bc 100755
--- 
a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
+++ 
b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/TimerNodeInstance.java
@@ -27,7 +27,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
+import org.jbpm.process.core.context.exception.ExceptionScope;
 import org.jbpm.process.instance.InternalProcessRuntime;
+import org.jbpm.process.instance.context.exception.ExceptionScopeInstance;
 import org.jbpm.workflow.core.Node;
 import org.jbpm.workflow.core.node.TimerNode;
 import org.kie.api.runtime.process.EventListener;
@@ -37,11 +39,15 @@ import org.kie.kogito.jobs.JobsService;
 import org.kie.kogito.jobs.ProcessInstanceJobDescription;
 import org.kie.kogito.process.BaseEventDescription;
 import org.kie.kogito.process.EventDescription;
+import org.kie.kogito.process.workitem.WorkItemExecutionException;
 import org.kie.kogito.services.uow.BaseWorkUnit;
 import org.kie.kogito.timer.TimerInstance;
 import org.kie.kogito.uow.WorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TimerNodeInstance extends StateBasedNodeInstance implements 
EventListener {
+    private static final Logger logger = 
LoggerFactory.getLogger(TimerNodeInstance.class);
 
     private static final long serialVersionUID = 510l;
     public static final String TIMER_TRIGGERED_EVENT = "timerTriggered";
@@ -111,7 +117,15 @@ public class TimerNodeInstance extends 
StateBasedNodeInstance implements EventLi
 
     @Override
     public void triggerCompleted(boolean remove) {
-        triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, remove);
+        Exception e = new WorkItemExecutionException("TimedOut");
+        ExceptionScopeInstance esi = (ExceptionScopeInstance) 
resolveContextInstance(ExceptionScope.EXCEPTION_SCOPE, e);
+        if (esi != null) {
+            logger.debug("Triggering exception handler for {}", 
e.getClass().getName());
+            esi.handleException(e, getProcessContext(e));
+        } else {
+            logger.trace("No exception handler for {}", 
e.getClass().getName());
+            triggerCompleted(Node.CONNECTION_DEFAULT_TYPE, remove);
+        }
     }
 
     @Override
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
new file mode 100644
index 0000000000..d522faa1f2
--- /dev/null
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/callback-state-with-timeouts-error-handler.sw.json
@@ -0,0 +1,134 @@
+{
+  "id": "callback_state_with_timeouts_error_handler",
+  "version": "1.0",
+  "name": "Callback State With Timeouts Error Handler",
+  "expressionLang": "jsonpath",
+  "description": "Callback State With Timeouts Error Handler Test",
+  "start": "CallbackState",
+  "events": [
+    {
+      "name": "callbackEvent",
+      "source": "",
+      "type": "callback_state_timeouts_event_type"
+    }
+  ],
+  "errors": [
+    {
+      "name": "callbackError",
+      "code": "java.lang.Exception"
+    },
+    {
+      "name": "timeoutError",
+      "code": "TimedOut"
+    }
+  ],
+  "functions": [
+    {
+      "name": "callbackFunction",
+      "type": "rest",
+      "operation": "classpath:specs/external-service.yaml#sendRequest"
+    },
+    {
+      "name": "publishSuccess",
+      "type": "asyncapi",
+      "operation": "specs/callbackResults.yaml#sendSuccess"
+    },
+    {
+      "name": "publishTimeoutExpired",
+      "type": "asyncapi",
+      "operation": "specs/callbackResults.yaml#sendTimeoutExpired"
+    },
+    {
+      "name": "publishFailure",
+      "type": "asyncapi",
+      "operation": "specs/callbackResults.yaml#sendFailed"
+    }
+  ],
+  "states": [
+    {
+      "name": "CallbackState",
+      "type": "callback",
+      "action": {
+        "name": "callbackAction",
+        "functionRef": {
+          "refName": "callbackFunction",
+          "arguments": {
+            "query": "$.query"
+          }
+        }
+      },
+      "eventRef": "callbackEvent",
+      "transition": "PublishSuccess",
+      "onErrors": [
+        {
+          "errorRef": "callbackError",
+          "transition": "PublishError"
+        },
+        {
+          "errorRef": "timeoutError",
+          "transition": "PublishTimeout"
+        }
+      ],
+      "timeouts": {
+        "eventTimeout": "PT5S"
+      }
+    },
+    {
+      "name": "PublishSuccess",
+      "type": "operation",
+      "actions": [
+        {
+          "name": "publishSuccess",
+          "functionRef": "publishSuccess"
+        }
+      ],
+      "transition": "FinalizeSuccessful"
+    },
+    {
+      "name": "FinalizeSuccessful",
+      "type": "inject",
+      "data": {
+        "lastExecutedState": "FinalizeSuccessful"
+      },
+      "end": true
+    },
+    {
+      "name": "PublishTimeout",
+      "type": "operation",
+      "actions": [
+        {
+          "name": "publishTimeoutExpired",
+          "functionRef": "publishTimeoutExpired"
+        }
+      ],
+      "transition": "FinalizeTimeout"
+    },
+    {
+      "name": "FinalizeTimeout",
+      "type": "inject",
+      "data": {
+        "lastExecutedState": "FinalizeTimeout"
+      },
+      "end": true
+    },
+    {
+      "name": "PublishError",
+      "type": "operation",
+      "actions": [
+        {
+          "name": "publishFailure",
+          "functionRef": "publishFailure"
+        }
+      ],
+      "transition": "FinalizeWithError"
+    },
+    {
+      "name": "FinalizeWithError",
+      "type": "inject",
+      "data": {
+        "lastExecutedState": "FinalizeWithError"
+      },
+      "end": true
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
new file mode 100644
index 0000000000..d5b962c7c7
--- /dev/null
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml
@@ -0,0 +1,49 @@
+asyncapi: '2.0.0'
+id: 'callbackResults'
+info:
+  title: Kafka Application
+  version: '1.0.0'
+  description: Kafka Application
+  license:
+    name: Apache 2.0
+    url: https://www.apache.org/licenses/LICENSE-2.0
+servers:
+  production:
+    url: localhost:9092
+    description: Development server
+    protocol: kafka
+    protocolVersion: '1.0.0'
+channels:
+  success:
+    description: A message channel for successful completions
+    publish:
+      operationId: sendSuccess
+      summary: Success
+      message:
+        $ref: '#/components/messages/message'
+  timeout:
+    description: A message channel for expired timeouts
+    publish:
+      operationId: sendTimeoutExpired
+      summary: Timeout Expired
+      message:
+        $ref: '#/components/messages/message'
+  error:
+    description: A message channel for failed executions
+    publish:
+      operationId: sendFailed
+      summary: Failed
+      message:
+        $ref: '#/components/messages/message'
+components:
+  messages:
+    message:
+      name: message
+      title: A message
+      summary: A message
+      contentType: application/json
+      payload:
+        $ref: "#/components/schemas/message"
+  schemas:
+    message:
+      type: object
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
index 9d8d34f200..9b753f9566 100644
--- 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AbstractCallbackStateIT.java
@@ -73,7 +73,7 @@ abstract class AbstractCallbackStateIT {
         }
     }
 
-    void executeCallbackStateSuccessfulPath(String callbackProcessPostUrl,
+    String executeCallbackStateSuccessfulPath(String callbackProcessPostUrl,
             String callbackProcessGetByIdUrl,
             String answer,
             String callbackEventType,
@@ -104,9 +104,10 @@ abstract class AbstractCallbackStateIT {
 
         // give some time for the event to be processed and the process to 
finish.
         assertProcessInstanceHasFinished(callbackProcessGetByIdUrl, 
processInstanceId, 1, 180);
+        return processInstanceId;
     }
 
-    void executeCallbackStateWithErrorPath(String callbackProcessPostUrl, 
String callbackProcessGetByIdUrl) throws Exception {
+    String executeCallbackStateWithErrorPath(String callbackProcessPostUrl, 
String callbackProcessGetByIdUrl) throws Exception {
         // start a new process instance and collect the results.
         String processInput = buildProcessInput(GENERATE_ERROR_QUERY);
         JsonPath result = newProcessInstance(callbackProcessPostUrl, 
processInput);
@@ -125,6 +126,7 @@ abstract class AbstractCallbackStateIT {
 
         // the process instance should not be there since an end state was 
reached.
         assertProcessInstanceNotExists(callbackProcessGetByIdUrl, 
processInstanceId);
+        return processInstanceId;
     }
 
     protected static String buildProcessInput(String query) {
diff --git 
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
new file mode 100644
index 0000000000..b4f7a4ed73
--- /dev/null
+++ 
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/CallbackStateWithTimeoutsErrorHandlerIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.quarkus.workflows;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.event.Converter;
+import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
+import org.kie.kogito.event.impl.StringCloudEventUnmarshallerFactory;
+import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.cloudevents.CloudEvent;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.kie.kogito.quarkus.workflows.ExternalServiceMock.SUCCESSFUL_QUERY;
+import static 
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceExists;
+import static 
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceHasFinished;
+import static 
org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.newProcessInstanceAndGetId;
+
+@QuarkusIntegrationTest
+@QuarkusTestResource(ExternalServiceMock.class)
+@QuarkusTestResource(KafkaQuarkusTestResource.class)
+class CallbackStateWithTimeoutsErrorHandlerIT extends AbstractCallbackStateIT {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CallbackStateWithTimeoutsErrorHandlerIT.class);
+
+    private static final String CALLBACK_STATE_TIMEOUTS_SERVICE_URL = 
"/callback_state_with_timeouts_error_handler";
+    private static final String CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL = 
CALLBACK_STATE_TIMEOUTS_SERVICE_URL + "/{id}";
+    private static final String CALLBACK_STATE_TIMEOUTS_EVENT_TYPE = 
"callback_state_timeouts_event_type";
+    private static final String CALLBACK_STATE_TIMEOUTS_TOPIC = 
"callback_state_timeouts_event_type";
+
+    @Test
+    @SuppressWarnings("squid:S2699")
+    void callbackStateTimeoutsSuccessful() throws Exception {
+        String processInstanceId = 
executeCallbackStateSuccessfulPath(CALLBACK_STATE_TIMEOUTS_SERVICE_URL,
+                CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL,
+                ANSWER,
+                CALLBACK_STATE_TIMEOUTS_EVENT_TYPE,
+                CALLBACK_STATE_TIMEOUTS_TOPIC);
+        waitForFinalizedEvent(processInstanceId, "success");
+    }
+
+    @Test
+    void callbackStateTimeoutsExceeded() throws Exception {
+        String processInput = buildProcessInput(SUCCESSFUL_QUERY);
+        String processInstanceId = 
newProcessInstanceAndGetId(CALLBACK_STATE_TIMEOUTS_SERVICE_URL, processInput);
+        System.out.println("processInstanceId is " + processInstanceId);
+
+        assertProcessInstanceExists(CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL, 
processInstanceId);
+
+        
assertProcessInstanceHasFinished(CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL, 
processInstanceId, 1, 10);
+        waitForFinalizedEvent(processInstanceId, "timeout");
+    }
+
+    @Test
+    @SuppressWarnings("squid:S2699")
+    void callbackStateWithError() throws Exception {
+        String processInstanceId = 
executeCallbackStateWithErrorPath(CALLBACK_STATE_TIMEOUTS_SERVICE_URL, 
CALLBACK_STATE_TIMEOUTS_GET_BY_ID_URL);
+        waitForFinalizedEvent(processInstanceId, "error");
+    }
+
+    private void waitForFinalizedEvent(String processInstanceId, String topic) 
throws InterruptedException {
+        Converter<String, CloudEvent> converter = new 
StringCloudEventUnmarshallerFactory(objectMapper).unmarshaller(Map.class).cloudEvent();
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        kafkaClient.consume(topic, v -> {
+            try {
+                CloudEvent event = converter.convert(v);
+                LOGGER.debug("Found on topic {} CE {}", topic, event);
+                if 
(processInstanceId.equals(event.getExtension(CloudEventExtensionConstants.PROCESS_INSTANCE_ID)))
 {
+                    countDownLatch.countDown();
+                }
+            } catch (IOException e) {
+                LOGGER.info("Unmarshall exception", e);
+            }
+        });
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        assertThat(countDownLatch.getCount()).isZero();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to