This is an automated email from the ASF dual-hosted git repository.

egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f1c9765a [incubator-kie-issues-2079] Set on error process instance 
when a job reaches the error state (#2264)
3f1c9765a is described below

commit 3f1c9765a0ecfe3b938cc8ca1bcc29ab0a285725
Author: Enrique <[email protected]>
AuthorDate: Mon Sep 1 10:05:13 2025 +0200

    [incubator-kie-issues-2079] Set on error process instance when a job 
reaches the error state (#2264)
    
    * [incubator-kie-issues-2079] Set on error process instance when a job 
reaches the error state
    
    * fix exception handling registered
    
    * correction in error handling
    
    * add test case for error handling
---
 ...utInterceptor.java => JobTimeoutExecution.java} | 25 ++++---
 .../kogito/app/jobs/api/JobTimeoutInterceptor.java |  2 +-
 .../kogito/app/jobs/impl/VertxJobScheduler.java    | 82 +++++++++++-----------
 .../ErrorHandlingJobTimeoutInterceptor.java        | 66 +++++++++++++++++
 jobs/kogito-addons-quarkus-embedded-jobs/pom.xml   |  4 ++
 .../app/jobs/quarkus/QuarkusJobsService.java       | 27 +++----
 .../quarkus/TransactionJobTimeoutInterceptor.java} | 22 +++---
 .../jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java   | 28 +++++++-
 .../jobs/jpa/quarkus/TestExceptionHandler.java}    | 31 +++++---
 ...SchedulerListener.java => TestJobExecutor.java} | 39 ++++++----
 .../jobs/jpa/quarkus/TestJobSchedulerListener.java |  5 ++
 .../kogito-addons-springboot-embedded-jobs/pom.xml |  4 ++
 .../app/jobs/springboot/SpringbootJobsService.java | 36 +++-------
 .../TransactionJobTimeoutInterceptor.java          | 54 ++++++++++++++
 .../jobs/springboot/SpringbootJPAJobStoreTest.java | 26 +++++++
 ...ulerListener.java => TestExceptionHandler.java} | 32 +++++----
 ...SchedulerListener.java => TestJobExecutor.java} | 38 ++++++----
 .../jobs/springboot/TestJobSchedulerListener.java  |  5 ++
 18 files changed, 370 insertions(+), 156 deletions(-)

diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
similarity index 61%
copy from 
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to 
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
index ed3e7b20e..3fef7a854 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
@@ -18,18 +18,27 @@
  */
 package org.kie.kogito.app.jobs.api;
 
-import java.util.concurrent.Callable;
+import org.kie.kogito.jobs.service.model.JobDetails;
 
-public interface JobTimeoutInterceptor extends 
Comparable<JobTimeoutInterceptor> {
+public class JobTimeoutExecution {
 
-    Callable<Void> chainIntercept(Callable<Void> callable);
+    private JobDetails jobDetails;
+    private Exception exception;
 
-    default Integer priority() {
-        return 10;
+    public JobTimeoutExecution(JobDetails jobDetails) {
+        this(jobDetails, null);
     }
 
-    @Override
-    default int compareTo(JobTimeoutInterceptor o) {
-        return this.priority().compareTo(o.priority());
+    public JobTimeoutExecution(JobDetails jobDetails, Exception exception) {
+        this.jobDetails = jobDetails;
+        this.exception = exception;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public JobDetails getJobDetails() {
+        return jobDetails;
     }
 }
diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
index ed3e7b20e..5c8d57d28 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
@@ -22,7 +22,7 @@ import java.util.concurrent.Callable;
 
 public interface JobTimeoutInterceptor extends 
Comparable<JobTimeoutInterceptor> {
 
-    Callable<Void> chainIntercept(Callable<Void> callable);
+    Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution> 
callable);
 
     default Integer priority() {
         return 10;
diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index d8d15d1c7..3f9974a74 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -38,6 +38,7 @@ import org.kie.kogito.app.jobs.api.JobScheduler;
 import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
 import org.kie.kogito.app.jobs.api.JobSchedulerListener;
 import org.kie.kogito.app.jobs.api.JobSynchronization;
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
 import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
 import 
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionMerger;
 import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionMerger;
@@ -232,11 +233,11 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
 
     @Override
     public void handle(Long timerId) {
-        Callable<Void> current = new Callable<Void>() {
+        Callable<JobTimeoutExecution> current = new 
Callable<JobTimeoutExecution>() {
             @Override
-            public Void call() throws Exception {
+            public JobTimeoutExecution call() throws Exception {
                 syncWithJobStores();
-                return null;
+                return new JobTimeoutExecution(null);
             }
         };
         for (JobTimeoutInterceptor interceptor : interceptors) {
@@ -304,12 +305,12 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
         this.refreshJobsIntervalTimerId = this.vertx.setPeriodic(0L, 
refreshJobsInterval, this);
 
         LOG.info("Initializing Job Service Logic \n" +
-                "MaxRefreshJobsIntervalWindow: {} (millis)\n" +
-                "MaxIntervalLimitToRetryMillis: {} (millis)\n" +
-                "MaxNumberOfRetries: {}\n" +
-                "RefreshJobsInterval: {} (millis)\n" +
-                "Number of worker threads {}\n" +
-                "Store: {}",
+                "\tMaxRefreshJobsIntervalWindow: {} (millis)\n" +
+                "\tMaxIntervalLimitToRetryMillis: {} (millis)\n" +
+                "\tMaxNumberOfRetries: {}\n" +
+                "\tRefreshJobsInterval: {} (millis)\n" +
+                "\tNumber of worker threads {}\n" +
+                "\tStore: {}",
                 maxRefreshJobsIntervalWindow,
                 retryInterval,
                 maxNumberOfRetries,
@@ -375,41 +376,40 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
         workerExecutor.executeBlocking(newTimeoutTask(timerId, jobId));
     }
 
-    private Callable<Void> newTimeoutTask(Long timerId, String jobId) {
-        Callable<Void> current = new Callable<Void>() {
+    private Callable<JobTimeoutExecution> newTimeoutTask(Long timerId, String 
jobId) {
+        Callable<JobTimeoutExecution> current = new 
Callable<JobTimeoutExecution>() {
             @Override
-            public Void call() throws Exception {
+            public JobTimeoutExecution call() throws Exception {
+
+                LOG.trace("Timeout task {} with jobId {} newTimeoutTask", 
timerId, jobId);
+                JobContext jobContext = jobContextFactory.newContext();
+                // we check now if we should run
+                boolean shouldRun = jobStore.shouldRun(jobContext, jobId);
+                if (!shouldRun) {
+                    LOG.trace("Timeout {} with jobId {} won't run", timerId, 
jobId);
+                    VertxJobScheduler.this.jobsScheduled.remove(jobId);
+                    return null;
+                }
+
+                LOG.debug("Timeout {} with jobId {} will be executed", 
timerId, jobId);
+                JobDetails jobDetails = jobStore.find(jobContext, jobId);
                 try {
-                    LOG.trace("Timeout task {} with jobId {} newTimeoutTask", 
timerId, jobId);
-                    JobContext jobContext = jobContextFactory.newContext();
-                    // we check now if we should run
-                    boolean shouldRun = jobStore.shouldRun(jobContext, jobId);
-                    if (!shouldRun) {
-                        LOG.trace("Timeout {} with jobId {} won't run", 
timerId, jobId);
-                        VertxJobScheduler.this.jobsScheduled.remove(jobId);
-                        return null;
-                    }
-
-                    LOG.debug("Timeout {} with jobId {} will be executed", 
timerId, jobId);
-                    JobDetails jobDetails = jobStore.find(jobContext, jobId);
-                    try {
-                        JobDetails runningJobDetails = doRun(jobDetails);
-                        LOG.trace("Timeout {} with jobId {} have been 
executed", timerId, jobId);
-                        JobDetails executeJobDetails = 
doExecute(runningJobDetails);
-                        LOG.trace("Timeout {} with jobId {} will be 
rescheduled if required", timerId, jobId);
-                        JobDetails nextJobDetails = 
computeNextJobDetailsIfAny(executeJobDetails);
-                        removeIfFinal(timerId, jobContext, nextJobDetails);
-                        jobSchedulerListeners.forEach(l -> 
l.onExecution(jobDetails));
-                    } catch (Exception exception) {
-                        LOG.trace("Timeout {} with jobId {} will be retried if 
possible", timerId, jobId, exception);
-                        JobDetails nextJobDetails = 
computeRetryIfAny(jobDetails);
-                        removeIfFinal(timerId, jobContext, nextJobDetails);
-                        jobSchedulerListeners.forEach(l -> 
l.onFailure(jobDetails));
-                    }
-                } catch (Exception e) {
-                    LOG.error("unexpected error during timeout execution", e);
+                    JobDetails runningJobDetails = doRun(jobDetails);
+                    LOG.trace("Timeout {} with jobId {} have been executed", 
timerId, jobId);
+                    JobDetails executeJobDetails = 
doExecute(runningJobDetails);
+                    LOG.trace("Timeout {} with jobId {} will be rescheduled if 
required", timerId, jobId);
+                    JobDetails nextJobDetails = 
computeNextJobDetailsIfAny(executeJobDetails);
+                    removeIfFinal(timerId, jobContext, nextJobDetails);
+                    jobSchedulerListeners.forEach(l -> 
l.onExecution(jobDetails));
+                    return new JobTimeoutExecution(nextJobDetails);
+                } catch (Exception exception) {
+                    LOG.trace("Timeout {} with jobId {} will be retried if 
possible", timerId, jobId, exception);
+                    JobDetails nextJobDetails = computeRetryIfAny(jobDetails);
+                    removeIfFinal(timerId, jobContext, nextJobDetails);
+                    jobSchedulerListeners.forEach(l -> 
l.onFailure(jobDetails));
+                    return new JobTimeoutExecution(nextJobDetails, exception);
                 }
-                return null;
+
             }
         };
         for (JobTimeoutInterceptor interceptor : interceptors) {
diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
new file mode 100644
index 000000000..c879088e8
--- /dev/null
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.integrations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ErrorHandlingJobTimeoutInterceptor implements 
JobTimeoutInterceptor {
+
+    private static Logger LOG = 
LoggerFactory.getLogger(ErrorHandlingJobTimeoutInterceptor.class);
+
+    private List<ExceptionHandler> exceptionHandlers;
+
+    public ErrorHandlingJobTimeoutInterceptor(List<ExceptionHandler> 
exceptionHandlers) {
+        this.exceptionHandlers = new ArrayList<>(exceptionHandlers);
+    }
+
+    @Override
+    public Integer priority() {
+        return 50;
+    }
+
+    @Override
+    public Callable<JobTimeoutExecution> 
chainIntercept(Callable<JobTimeoutExecution> callable) {
+        return new Callable<JobTimeoutExecution>() {
+            @Override
+            public JobTimeoutExecution call() throws Exception {
+                JobTimeoutExecution execution = callable.call();
+                if (execution.getJobDetails() != null && 
JobStatus.ERROR.equals(execution.getJobDetails().getStatus())) {
+                    if (exceptionHandlers.isEmpty()) {
+                        LOG.warn("there was an error in job {} but not handler 
were registered", execution.getJobDetails());
+                    } else {
+                        LOG.error("there was error in job {}. Handling error 
{}", execution.getJobDetails(), execution.getException().getMessage());
+                        exceptionHandlers.stream().forEach(e -> 
e.handle(execution.getException()));
+                    }
+                }
+                return execution;
+            }
+        };
+    }
+
+}
diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml 
b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
index 8d0d65573..64397a2c7 100644
--- a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
+++ b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
@@ -114,6 +114,10 @@
             <artifactId>rest-assured</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
index a182ec713..686364c11 100644
--- 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
@@ -18,15 +18,13 @@
  */
 package org.kie.kogito.app.jobs.quarkus;
 
-import java.util.concurrent.Callable;
-
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.kie.kogito.app.jobs.api.JobExecutor;
 import org.kie.kogito.app.jobs.api.JobScheduler;
 import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
 import org.kie.kogito.app.jobs.api.JobSchedulerListener;
 import org.kie.kogito.app.jobs.api.JobSynchronization;
-import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor;
 import 
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
 import 
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
 import 
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
@@ -34,10 +32,10 @@ import 
org.kie.kogito.app.jobs.quarkus.resource.RestApiConstants;
 import org.kie.kogito.app.jobs.spi.JobContextFactory;
 import org.kie.kogito.app.jobs.spi.JobStore;
 import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.handler.ExceptionHandler;
 import org.kie.kogito.jobs.JobDescription;
 import org.kie.kogito.jobs.JobsService;
 
-import io.quarkus.narayana.jta.QuarkusTransaction;
 import io.quarkus.runtime.Startup;
 
 import jakarta.annotation.PostConstruct;
@@ -90,22 +88,11 @@ public class QuarkusJobsService implements JobsService {
     @Inject
     protected TransactionSynchronizationRegistry registry;
 
+    @Inject
+    Instance<ExceptionHandler> exceptionHandlers;
+
     @PostConstruct
     public void init() {
-        JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() {
-
-            @Override
-            public Callable<Void> chainIntercept(Callable<Void> callable) {
-                return new Callable<Void>() {
-
-                    @Override
-                    public Void call() throws Exception {
-                        return 
QuarkusTransaction.requiringNew().call(callable);
-                    }
-
-                };
-            }
-        };
         this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder()
                 
.withEventPublishers(eventPublisher.stream().toArray(EventPublisher[]::new))
                 
.withJobSchedulerListeners(jobSchedulerListeners.stream().toArray(JobSchedulerListener[]::new))
@@ -120,7 +107,9 @@ public class QuarkusJobsService implements JobsService {
                 .withRetryInterval(retryMillis)
                 .withMaxNumberOfRetries(maxNumberOfRetries)
                 .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 
1000L)
-                .withTimeoutInterceptor(txInterceptor)
+                .withTimeoutInterceptor(
+                        new TransactionJobTimeoutInterceptor(),
+                        new 
ErrorHandlingJobTimeoutInterceptor(exceptionHandlers.stream().toList()))
                 .withNumberOfWorkerThreads(numberOfWorkerThreads)
                 .withJobSynchronization(new JobSynchronization() {
 
diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
similarity index 58%
copy from 
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to 
jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
index ed3e7b20e..40984a1bf 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
@@ -16,20 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.app.jobs.api;
+package org.kie.kogito.app.jobs.quarkus;
 
 import java.util.concurrent.Callable;
 
-public interface JobTimeoutInterceptor extends 
Comparable<JobTimeoutInterceptor> {
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
 
-    Callable<Void> chainIntercept(Callable<Void> callable);
+import io.quarkus.narayana.jta.QuarkusTransaction;
 
-    default Integer priority() {
-        return 10;
-    }
+public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor 
{
 
     @Override
-    default int compareTo(JobTimeoutInterceptor o) {
-        return this.priority().compareTo(o.priority());
+    public Callable<JobTimeoutExecution> 
chainIntercept(Callable<JobTimeoutExecution> callable) {
+        return new Callable<JobTimeoutExecution>() {
+
+            @Override
+            public JobTimeoutExecution call() throws Exception {
+                return QuarkusTransaction.requiringNew().call(callable);
+            }
+
+        };
     }
 }
diff --git 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
index 334827e49..755636bf4 100644
--- 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
@@ -23,6 +23,8 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.kie.kogito.jobs.ExactExpirationTime;
 import org.kie.kogito.jobs.JobsService;
@@ -43,9 +45,20 @@ public class QuarkusJPAJobStoreTest {
     @Inject
     TestJobSchedulerListener listener;
 
+    @Inject
+    TestJobExecutor testJobExecutor;
+
+    @Inject
+    TestExceptionHandler exceptionHandler;
+
+    @BeforeEach
+    public void init() {
+        testJobExecutor.reset();
+        exceptionHandler.reset();
+    }
+
     @Test
     public void testBasicPersistence() throws Exception {
-
         ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("1", "-1",
                 
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
 5,
                 "processInstanceId", null, "processId", null, 
"nodeInstanceId");
@@ -57,4 +70,17 @@ public class QuarkusJPAJobStoreTest {
 
     }
 
+    @Test
+    public void testBasicError() throws Exception {
+        testJobExecutor.setNumberOfFailures(4);
+        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("1", "-1",
+                
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
 5,
+                "processInstanceId", null, "processId", null, 
"nodeInstanceId");
+
+        listener.setCount(4);
+        jobsService.scheduleJob(jobDescription);
+
+        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> 
assertThat(exceptionHandler.isError()).isTrue());
+    }
+
 }
diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
similarity index 59%
copy from 
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to 
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
index ed3e7b20e..213c04fc2 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
@@ -16,20 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.app.jobs.api;
+package org.kie.kogito.app.jobs.jpa.quarkus;
 
-import java.util.concurrent.Callable;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public interface JobTimeoutInterceptor extends 
Comparable<JobTimeoutInterceptor> {
+import jakarta.inject.Singleton;
 
-    Callable<Void> chainIntercept(Callable<Void> callable);
+@Singleton
+public class TestExceptionHandler implements ExceptionHandler {
 
-    default Integer priority() {
-        return 10;
-    }
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestExceptionHandler.class);
+
+    private boolean error;
 
     @Override
-    default int compareTo(JobTimeoutInterceptor o) {
-        return this.priority().compareTo(o.priority());
+    public void handle(Exception th) {
+        LOG.info("error", th);
+        error = true;
+    }
+
+    public void reset() {
+        error = false;
     }
+
+    public boolean isError() {
+        return error;
+    }
+
 }
diff --git 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
similarity index 54%
copy from 
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
copy to 
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
index 0957e4ddb..8cec79896 100644
--- 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
@@ -18,30 +18,39 @@
  */
 package org.kie.kogito.app.jobs.jpa.quarkus;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.app.jobs.api.JobExecutor;
 import org.kie.kogito.jobs.service.model.JobDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jakarta.inject.Singleton;
 
-import jakarta.enterprise.context.ApplicationScoped;
+@Singleton
+public class TestJobExecutor implements JobExecutor {
 
-@ApplicationScoped
-public class TestJobSchedulerListener implements JobSchedulerListener {
+    private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
+    private int numberOfFailures;
 
-    private CountDownLatch latch;
+    @Override
+    public boolean accept(JobDetails jobDescription) {
+        return true;
+    }
 
-    void setCount(Integer count) {
-        latch = new CountDownLatch(count);
+    @Override
+    public void execute(JobDetails jobDescription) {
+        LOG.info("executing {}", jobDescription);
+        if (numberOfFailures > 0) {
+            --numberOfFailures;
+            throw new RuntimeException();
+        }
     }
 
-    public boolean await(long timeout, TimeUnit unit) throws Exception {
-        return latch.await(timeout, unit);
+    public void setNumberOfFailures(int numberOfFailures) {
+        this.numberOfFailures = numberOfFailures;
     }
 
-    @Override
-    public void onExecution(JobDetails jobDetails) {
-        latch.countDown();
+    public void reset() {
+        numberOfFailures = 0;
     }
 
 }
diff --git 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
index 0957e4ddb..8dd195e25 100644
--- 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
@@ -39,6 +39,11 @@ public class TestJobSchedulerListener implements 
JobSchedulerListener {
         return latch.await(timeout, unit);
     }
 
+    @Override
+    public void onFailure(JobDetails jobDetails) {
+        latch.countDown();
+    }
+
     @Override
     public void onExecution(JobDetails jobDetails) {
         latch.countDown();
diff --git a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml 
b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
index 672395b8a..66665e4f5 100644
--- a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
+++ b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
@@ -91,6 +91,10 @@
                        <artifactId>rest-assured</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.awaitility</groupId>
+                       <artifactId>awaitility</artifactId>
+               </dependency>
        </dependencies>
 
        <profiles>
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
index 8bdcaecda..ffc22a1c5 100644
--- 
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
@@ -20,14 +20,13 @@ package org.kie.kogito.app.jobs.springboot;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
 
 import org.kie.kogito.app.jobs.api.JobExecutor;
 import org.kie.kogito.app.jobs.api.JobScheduler;
 import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
 import org.kie.kogito.app.jobs.api.JobSchedulerListener;
 import org.kie.kogito.app.jobs.api.JobSynchronization;
-import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor;
 import 
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
 import 
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
 import 
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
@@ -35,6 +34,7 @@ import org.kie.kogito.app.jobs.spi.JobContextFactory;
 import org.kie.kogito.app.jobs.spi.JobStore;
 import org.kie.kogito.app.jobs.springboot.resource.RestApiConstants;
 import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.handler.ExceptionHandler;
 import org.kie.kogito.jobs.JobDescription;
 import org.kie.kogito.jobs.JobsService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,7 +44,6 @@ import 
org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.transaction.support.TransactionSynchronization;
 import 
org.springframework.transaction.support.TransactionSynchronizationManager;
-import org.springframework.transaction.support.TransactionTemplate;
 
 import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
@@ -86,32 +85,13 @@ public class SpringbootJobsService implements JobsService {
     protected String serviceURL;
 
     @Autowired
-    private PlatformTransactionManager transactionManager;
+    protected PlatformTransactionManager transactionManager;
+
+    @Autowired(required = false)
+    protected List<ExceptionHandler> exceptionHandlers;
 
     @PostConstruct
     public void init() {
-        JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() {
-
-            @Override
-            public Callable<Void> chainIntercept(Callable<Void> callable) {
-                TransactionTemplate transactionTemplate = new 
TransactionTemplate(transactionManager);
-                return new Callable<Void>() {
-
-                    @Override
-                    public Void call() throws Exception {
-                        return transactionTemplate.execute(status -> {
-                            try {
-                                return callable.call();
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-                    }
-
-                };
-            }
-        };
-
         this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder()
                 
.withEventPublishers(ofNullable(eventPublisher).toArray(EventPublisher[]::new))
                 
.withJobSchedulerListeners(ofNullable(jobSchedulerListeners).stream().toArray(JobSchedulerListener[]::new))
@@ -126,7 +106,9 @@ public class SpringbootJobsService implements JobsService {
                 .withRetryInterval(retryMillis)
                 .withMaxNumberOfRetries(maxNumberOfRetries)
                 .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 
1000L)
-                .withTimeoutInterceptor(txInterceptor)
+                .withTimeoutInterceptor(
+                        new 
TransactionJobTimeoutInterceptor(transactionManager),
+                        new 
ErrorHandlingJobTimeoutInterceptor(ofNullable(exceptionHandlers).stream().toList()))
                 .withNumberOfWorkerThreads(numberOfWorkerThreads)
                 .withJobSynchronization(new JobSynchronization() {
 
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
new file mode 100644
index 000000000..58d4b562d
--- /dev/null
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.springboot;
+
+import java.util.concurrent.Callable;
+
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.support.TransactionTemplate;
+
+public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor 
{
+
+    private TransactionTemplate transactionTemplate;
+
+    public TransactionJobTimeoutInterceptor(PlatformTransactionManager 
transactionManager) {
+        this.transactionTemplate = new TransactionTemplate(transactionManager);
+    }
+
+    @Override
+    public Callable<JobTimeoutExecution> 
chainIntercept(Callable<JobTimeoutExecution> callable) {
+        return new Callable<JobTimeoutExecution>() {
+
+            @Override
+            public JobTimeoutExecution call() throws Exception {
+                return transactionTemplate.execute(status -> {
+                    try {
+                        return callable.call();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+
+        };
+    }
+
+}
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
index 82f5d9533..6ad4e709b 100644
--- 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
@@ -23,6 +23,8 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.kie.kogito.jobs.ExactExpirationTime;
 import org.kie.kogito.jobs.JobsService;
@@ -41,6 +43,18 @@ public class SpringbootJPAJobStoreTest {
     @Autowired
     TestJobSchedulerListener listener;
 
+    @Autowired
+    TestJobExecutor testJobExecutor;
+
+    @Autowired
+    TestExceptionHandler exceptionHandler;
+
+    @BeforeEach
+    public void init() {
+        testJobExecutor.reset();
+        exceptionHandler.reset();
+    }
+
     @Test
     public void testBasicPersistence() throws Exception {
 
@@ -55,4 +69,16 @@ public class SpringbootJPAJobStoreTest {
 
     }
 
+    @Test
+    public void testBasicError() throws Exception {
+        testJobExecutor.setNumberOfFailures(4);
+        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("1", "-1",
+                
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
 5,
+                "processInstanceId", null, "processId", null, 
"nodeInstanceId");
+
+        listener.setCount(4);
+        jobsService.scheduleJob(jobDescription);
+
+        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> 
assertThat(exceptionHandler.isError()).isTrue());
+    }
 }
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
similarity index 58%
copy from 
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
copy to 
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
index 79046f2f3..92e15bfb3 100644
--- 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
@@ -18,29 +18,33 @@
  */
 package org.kie.kogito.app.jobs.springboot;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
-import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
 @Component
-public class TestJobSchedulerListener implements JobSchedulerListener {
+@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON)
+public class TestExceptionHandler implements ExceptionHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestExceptionHandler.class);
 
-    private CountDownLatch latch;
+    private boolean error;
 
-    void setCount(Integer count) {
-        latch = new CountDownLatch(count);
+    @Override
+    public void handle(Exception th) {
+        LOG.info("error", th);
+        error = true;
     }
 
-    public boolean await(long timeout, TimeUnit unit) throws Exception {
-        return latch.await(timeout, unit);
+    public void reset() {
+        error = false;
     }
 
-    @Override
-    public void onExecution(JobDetails jobDetails) {
-        latch.countDown();
+    public boolean isError() {
+        return error;
     }
 
 }
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
similarity index 51%
copy from 
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
copy to 
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
index 79046f2f3..52ee0627b 100644
--- 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
@@ -18,29 +18,41 @@
  */
 package org.kie.kogito.app.jobs.springboot;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.app.jobs.api.JobExecutor;
 import org.kie.kogito.jobs.service.model.JobDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
 @Component
-public class TestJobSchedulerListener implements JobSchedulerListener {
+@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON)
+public class TestJobExecutor implements JobExecutor {
+
+    private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
+    private int numberOfFailures;
 
-    private CountDownLatch latch;
+    @Override
+    public boolean accept(JobDetails jobDescription) {
+        return true;
+    }
 
-    void setCount(Integer count) {
-        latch = new CountDownLatch(count);
+    @Override
+    public void execute(JobDetails jobDescription) {
+        LOG.info("executing {}", jobDescription);
+        if (numberOfFailures > 0) {
+            --numberOfFailures;
+            throw new RuntimeException();
+        }
     }
 
-    public boolean await(long timeout, TimeUnit unit) throws Exception {
-        return latch.await(timeout, unit);
+    public void setNumberOfFailures(int numberOfFailures) {
+        this.numberOfFailures = numberOfFailures;
     }
 
-    @Override
-    public void onExecution(JobDetails jobDetails) {
-        latch.countDown();
+    public void reset() {
+        numberOfFailures = 0;
     }
 
 }
diff --git 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
index 79046f2f3..56da55a26 100644
--- 
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++ 
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
@@ -38,6 +38,11 @@ public class TestJobSchedulerListener implements 
JobSchedulerListener {
         return latch.await(timeout, unit);
     }
 
+    @Override
+    public void onFailure(JobDetails jobDetails) {
+        latch.countDown();
+    }
+
     @Override
     public void onExecution(JobDetails jobDetails) {
         latch.countDown();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to