This is an automated email from the ASF dual-hosted git repository.
egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 3f1c9765a [incubator-kie-issues-2079] Set on error process instance
when a job reaches the error state (#2264)
3f1c9765a is described below
commit 3f1c9765a0ecfe3b938cc8ca1bcc29ab0a285725
Author: Enrique <[email protected]>
AuthorDate: Mon Sep 1 10:05:13 2025 +0200
[incubator-kie-issues-2079] Set on error process instance when a job
reaches the error state (#2264)
* [incubator-kie-issues-2079] Set on error process instance when a job
reaches the error state
* fix exception handling registered
* correction in error handling
* add test case for error handling
---
...utInterceptor.java => JobTimeoutExecution.java} | 25 ++++---
.../kogito/app/jobs/api/JobTimeoutInterceptor.java | 2 +-
.../kogito/app/jobs/impl/VertxJobScheduler.java | 82 +++++++++++-----------
.../ErrorHandlingJobTimeoutInterceptor.java | 66 +++++++++++++++++
jobs/kogito-addons-quarkus-embedded-jobs/pom.xml | 4 ++
.../app/jobs/quarkus/QuarkusJobsService.java | 27 +++----
.../quarkus/TransactionJobTimeoutInterceptor.java} | 22 +++---
.../jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java | 28 +++++++-
.../jobs/jpa/quarkus/TestExceptionHandler.java} | 31 +++++---
...SchedulerListener.java => TestJobExecutor.java} | 39 ++++++----
.../jobs/jpa/quarkus/TestJobSchedulerListener.java | 5 ++
.../kogito-addons-springboot-embedded-jobs/pom.xml | 4 ++
.../app/jobs/springboot/SpringbootJobsService.java | 36 +++-------
.../TransactionJobTimeoutInterceptor.java | 54 ++++++++++++++
.../jobs/springboot/SpringbootJPAJobStoreTest.java | 26 +++++++
...ulerListener.java => TestExceptionHandler.java} | 32 +++++----
...SchedulerListener.java => TestJobExecutor.java} | 38 ++++++----
.../jobs/springboot/TestJobSchedulerListener.java | 5 ++
18 files changed, 370 insertions(+), 156 deletions(-)
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
similarity index 61%
copy from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
index ed3e7b20e..3fef7a854 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java
@@ -18,18 +18,27 @@
*/
package org.kie.kogito.app.jobs.api;
-import java.util.concurrent.Callable;
+import org.kie.kogito.jobs.service.model.JobDetails;
-public interface JobTimeoutInterceptor extends
Comparable<JobTimeoutInterceptor> {
+public class JobTimeoutExecution {
- Callable<Void> chainIntercept(Callable<Void> callable);
+ private JobDetails jobDetails;
+ private Exception exception;
- default Integer priority() {
- return 10;
+ public JobTimeoutExecution(JobDetails jobDetails) {
+ this(jobDetails, null);
}
- @Override
- default int compareTo(JobTimeoutInterceptor o) {
- return this.priority().compareTo(o.priority());
+ public JobTimeoutExecution(JobDetails jobDetails, Exception exception) {
+ this.jobDetails = jobDetails;
+ this.exception = exception;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public JobDetails getJobDetails() {
+ return jobDetails;
}
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
index ed3e7b20e..5c8d57d28 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
@@ -22,7 +22,7 @@ import java.util.concurrent.Callable;
public interface JobTimeoutInterceptor extends
Comparable<JobTimeoutInterceptor> {
- Callable<Void> chainIntercept(Callable<Void> callable);
+ Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution>
callable);
default Integer priority() {
return 10;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index d8d15d1c7..3f9974a74 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -38,6 +38,7 @@ import org.kie.kogito.app.jobs.api.JobScheduler;
import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionMerger;
import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionMerger;
@@ -232,11 +233,11 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
@Override
public void handle(Long timerId) {
- Callable<Void> current = new Callable<Void>() {
+ Callable<JobTimeoutExecution> current = new
Callable<JobTimeoutExecution>() {
@Override
- public Void call() throws Exception {
+ public JobTimeoutExecution call() throws Exception {
syncWithJobStores();
- return null;
+ return new JobTimeoutExecution(null);
}
};
for (JobTimeoutInterceptor interceptor : interceptors) {
@@ -304,12 +305,12 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
this.refreshJobsIntervalTimerId = this.vertx.setPeriodic(0L,
refreshJobsInterval, this);
LOG.info("Initializing Job Service Logic \n" +
- "MaxRefreshJobsIntervalWindow: {} (millis)\n" +
- "MaxIntervalLimitToRetryMillis: {} (millis)\n" +
- "MaxNumberOfRetries: {}\n" +
- "RefreshJobsInterval: {} (millis)\n" +
- "Number of worker threads {}\n" +
- "Store: {}",
+ "\tMaxRefreshJobsIntervalWindow: {} (millis)\n" +
+ "\tMaxIntervalLimitToRetryMillis: {} (millis)\n" +
+ "\tMaxNumberOfRetries: {}\n" +
+ "\tRefreshJobsInterval: {} (millis)\n" +
+ "\tNumber of worker threads {}\n" +
+ "\tStore: {}",
maxRefreshJobsIntervalWindow,
retryInterval,
maxNumberOfRetries,
@@ -375,41 +376,40 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
workerExecutor.executeBlocking(newTimeoutTask(timerId, jobId));
}
- private Callable<Void> newTimeoutTask(Long timerId, String jobId) {
- Callable<Void> current = new Callable<Void>() {
+ private Callable<JobTimeoutExecution> newTimeoutTask(Long timerId, String
jobId) {
+ Callable<JobTimeoutExecution> current = new
Callable<JobTimeoutExecution>() {
@Override
- public Void call() throws Exception {
+ public JobTimeoutExecution call() throws Exception {
+
+ LOG.trace("Timeout task {} with jobId {} newTimeoutTask",
timerId, jobId);
+ JobContext jobContext = jobContextFactory.newContext();
+ // we check now if we should run
+ boolean shouldRun = jobStore.shouldRun(jobContext, jobId);
+ if (!shouldRun) {
+ LOG.trace("Timeout {} with jobId {} won't run", timerId,
jobId);
+ VertxJobScheduler.this.jobsScheduled.remove(jobId);
+ return null;
+ }
+
+ LOG.debug("Timeout {} with jobId {} will be executed",
timerId, jobId);
+ JobDetails jobDetails = jobStore.find(jobContext, jobId);
try {
- LOG.trace("Timeout task {} with jobId {} newTimeoutTask",
timerId, jobId);
- JobContext jobContext = jobContextFactory.newContext();
- // we check now if we should run
- boolean shouldRun = jobStore.shouldRun(jobContext, jobId);
- if (!shouldRun) {
- LOG.trace("Timeout {} with jobId {} won't run",
timerId, jobId);
- VertxJobScheduler.this.jobsScheduled.remove(jobId);
- return null;
- }
-
- LOG.debug("Timeout {} with jobId {} will be executed",
timerId, jobId);
- JobDetails jobDetails = jobStore.find(jobContext, jobId);
- try {
- JobDetails runningJobDetails = doRun(jobDetails);
- LOG.trace("Timeout {} with jobId {} have been
executed", timerId, jobId);
- JobDetails executeJobDetails =
doExecute(runningJobDetails);
- LOG.trace("Timeout {} with jobId {} will be
rescheduled if required", timerId, jobId);
- JobDetails nextJobDetails =
computeNextJobDetailsIfAny(executeJobDetails);
- removeIfFinal(timerId, jobContext, nextJobDetails);
- jobSchedulerListeners.forEach(l ->
l.onExecution(jobDetails));
- } catch (Exception exception) {
- LOG.trace("Timeout {} with jobId {} will be retried if
possible", timerId, jobId, exception);
- JobDetails nextJobDetails =
computeRetryIfAny(jobDetails);
- removeIfFinal(timerId, jobContext, nextJobDetails);
- jobSchedulerListeners.forEach(l ->
l.onFailure(jobDetails));
- }
- } catch (Exception e) {
- LOG.error("unexpected error during timeout execution", e);
+ JobDetails runningJobDetails = doRun(jobDetails);
+ LOG.trace("Timeout {} with jobId {} have been executed",
timerId, jobId);
+ JobDetails executeJobDetails =
doExecute(runningJobDetails);
+ LOG.trace("Timeout {} with jobId {} will be rescheduled if
required", timerId, jobId);
+ JobDetails nextJobDetails =
computeNextJobDetailsIfAny(executeJobDetails);
+ removeIfFinal(timerId, jobContext, nextJobDetails);
+ jobSchedulerListeners.forEach(l ->
l.onExecution(jobDetails));
+ return new JobTimeoutExecution(nextJobDetails);
+ } catch (Exception exception) {
+ LOG.trace("Timeout {} with jobId {} will be retried if
possible", timerId, jobId, exception);
+ JobDetails nextJobDetails = computeRetryIfAny(jobDetails);
+ removeIfFinal(timerId, jobContext, nextJobDetails);
+ jobSchedulerListeners.forEach(l ->
l.onFailure(jobDetails));
+ return new JobTimeoutExecution(nextJobDetails, exception);
}
- return null;
+
}
};
for (JobTimeoutInterceptor interceptor : interceptors) {
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
new file mode 100644
index 000000000..c879088e8
--- /dev/null
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.integrations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ErrorHandlingJobTimeoutInterceptor implements
JobTimeoutInterceptor {
+
+ private static Logger LOG =
LoggerFactory.getLogger(ErrorHandlingJobTimeoutInterceptor.class);
+
+ private List<ExceptionHandler> exceptionHandlers;
+
+ public ErrorHandlingJobTimeoutInterceptor(List<ExceptionHandler>
exceptionHandlers) {
+ this.exceptionHandlers = new ArrayList<>(exceptionHandlers);
+ }
+
+ @Override
+ public Integer priority() {
+ return 50;
+ }
+
+ @Override
+ public Callable<JobTimeoutExecution>
chainIntercept(Callable<JobTimeoutExecution> callable) {
+ return new Callable<JobTimeoutExecution>() {
+ @Override
+ public JobTimeoutExecution call() throws Exception {
+ JobTimeoutExecution execution = callable.call();
+ if (execution.getJobDetails() != null &&
JobStatus.ERROR.equals(execution.getJobDetails().getStatus())) {
+ if (exceptionHandlers.isEmpty()) {
+ LOG.warn("there was an error in job {} but not handler
were registered", execution.getJobDetails());
+ } else {
+ LOG.error("there was error in job {}. Handling error
{}", execution.getJobDetails(), execution.getException().getMessage());
+ exceptionHandlers.stream().forEach(e ->
e.handle(execution.getException()));
+ }
+ }
+ return execution;
+ }
+ };
+ }
+
+}
diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
index 8d0d65573..64397a2c7 100644
--- a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
+++ b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml
@@ -114,6 +114,10 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
index a182ec713..686364c11 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
@@ -18,15 +18,13 @@
*/
package org.kie.kogito.app.jobs.quarkus;
-import java.util.concurrent.Callable;
-
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.api.JobScheduler;
import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
-import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor;
import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
import
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
import
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
@@ -34,10 +32,10 @@ import
org.kie.kogito.app.jobs.quarkus.resource.RestApiConstants;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.handler.ExceptionHandler;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
-import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.runtime.Startup;
import jakarta.annotation.PostConstruct;
@@ -90,22 +88,11 @@ public class QuarkusJobsService implements JobsService {
@Inject
protected TransactionSynchronizationRegistry registry;
+ @Inject
+ Instance<ExceptionHandler> exceptionHandlers;
+
@PostConstruct
public void init() {
- JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() {
-
- @Override
- public Callable<Void> chainIntercept(Callable<Void> callable) {
- return new Callable<Void>() {
-
- @Override
- public Void call() throws Exception {
- return
QuarkusTransaction.requiringNew().call(callable);
- }
-
- };
- }
- };
this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder()
.withEventPublishers(eventPublisher.stream().toArray(EventPublisher[]::new))
.withJobSchedulerListeners(jobSchedulerListeners.stream().toArray(JobSchedulerListener[]::new))
@@ -120,7 +107,9 @@ public class QuarkusJobsService implements JobsService {
.withRetryInterval(retryMillis)
.withMaxNumberOfRetries(maxNumberOfRetries)
.withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 *
1000L)
- .withTimeoutInterceptor(txInterceptor)
+ .withTimeoutInterceptor(
+ new TransactionJobTimeoutInterceptor(),
+ new
ErrorHandlingJobTimeoutInterceptor(exceptionHandlers.stream().toList()))
.withNumberOfWorkerThreads(numberOfWorkerThreads)
.withJobSynchronization(new JobSynchronization() {
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
similarity index 58%
copy from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to
jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
index ed3e7b20e..40984a1bf 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java
@@ -16,20 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.api;
+package org.kie.kogito.app.jobs.quarkus;
import java.util.concurrent.Callable;
-public interface JobTimeoutInterceptor extends
Comparable<JobTimeoutInterceptor> {
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
- Callable<Void> chainIntercept(Callable<Void> callable);
+import io.quarkus.narayana.jta.QuarkusTransaction;
- default Integer priority() {
- return 10;
- }
+public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor
{
@Override
- default int compareTo(JobTimeoutInterceptor o) {
- return this.priority().compareTo(o.priority());
+ public Callable<JobTimeoutExecution>
chainIntercept(Callable<JobTimeoutExecution> callable) {
+ return new Callable<JobTimeoutExecution>() {
+
+ @Override
+ public JobTimeoutExecution call() throws Exception {
+ return QuarkusTransaction.requiringNew().call(callable);
+ }
+
+ };
}
}
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
index 334827e49..755636bf4 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
@@ -23,6 +23,8 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.JobsService;
@@ -43,9 +45,20 @@ public class QuarkusJPAJobStoreTest {
@Inject
TestJobSchedulerListener listener;
+ @Inject
+ TestJobExecutor testJobExecutor;
+
+ @Inject
+ TestExceptionHandler exceptionHandler;
+
+ @BeforeEach
+ public void init() {
+ testJobExecutor.reset();
+ exceptionHandler.reset();
+ }
+
@Test
public void testBasicPersistence() throws Exception {
-
ProcessInstanceJobDescription jobDescription = new
ProcessInstanceJobDescription("1", "-1",
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
5,
"processInstanceId", null, "processId", null,
"nodeInstanceId");
@@ -57,4 +70,17 @@ public class QuarkusJPAJobStoreTest {
}
+ @Test
+ public void testBasicError() throws Exception {
+ testJobExecutor.setNumberOfFailures(4);
+ ProcessInstanceJobDescription jobDescription = new
ProcessInstanceJobDescription("1", "-1",
+
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
5,
+ "processInstanceId", null, "processId", null,
"nodeInstanceId");
+
+ listener.setCount(4);
+ jobsService.scheduleJob(jobDescription);
+
+ Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() ->
assertThat(exceptionHandler.isError()).isTrue());
+ }
+
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
similarity index 59%
copy from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
copy to
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
index ed3e7b20e..213c04fc2 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java
@@ -16,20 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.api;
+package org.kie.kogito.app.jobs.jpa.quarkus;
-import java.util.concurrent.Callable;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public interface JobTimeoutInterceptor extends
Comparable<JobTimeoutInterceptor> {
+import jakarta.inject.Singleton;
- Callable<Void> chainIntercept(Callable<Void> callable);
+@Singleton
+public class TestExceptionHandler implements ExceptionHandler {
- default Integer priority() {
- return 10;
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(TestExceptionHandler.class);
+
+ private boolean error;
@Override
- default int compareTo(JobTimeoutInterceptor o) {
- return this.priority().compareTo(o.priority());
+ public void handle(Exception th) {
+ LOG.info("error", th);
+ error = true;
+ }
+
+ public void reset() {
+ error = false;
}
+
+ public boolean isError() {
+ return error;
+ }
+
}
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
similarity index 54%
copy from
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
copy to
jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
index 0957e4ddb..8cec79896 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java
@@ -18,30 +18,39 @@
*/
package org.kie.kogito.app.jobs.jpa.quarkus;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.jobs.service.model.JobDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jakarta.inject.Singleton;
-import jakarta.enterprise.context.ApplicationScoped;
+@Singleton
+public class TestJobExecutor implements JobExecutor {
-@ApplicationScoped
-public class TestJobSchedulerListener implements JobSchedulerListener {
+ private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
+ private int numberOfFailures;
- private CountDownLatch latch;
+ @Override
+ public boolean accept(JobDetails jobDescription) {
+ return true;
+ }
- void setCount(Integer count) {
- latch = new CountDownLatch(count);
+ @Override
+ public void execute(JobDetails jobDescription) {
+ LOG.info("executing {}", jobDescription);
+ if (numberOfFailures > 0) {
+ --numberOfFailures;
+ throw new RuntimeException();
+ }
}
- public boolean await(long timeout, TimeUnit unit) throws Exception {
- return latch.await(timeout, unit);
+ public void setNumberOfFailures(int numberOfFailures) {
+ this.numberOfFailures = numberOfFailures;
}
- @Override
- public void onExecution(JobDetails jobDetails) {
- latch.countDown();
+ public void reset() {
+ numberOfFailures = 0;
}
}
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
index 0957e4ddb..8dd195e25 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java
@@ -39,6 +39,11 @@ public class TestJobSchedulerListener implements
JobSchedulerListener {
return latch.await(timeout, unit);
}
+ @Override
+ public void onFailure(JobDetails jobDetails) {
+ latch.countDown();
+ }
+
@Override
public void onExecution(JobDetails jobDetails) {
latch.countDown();
diff --git a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
index 672395b8a..66665e4f5 100644
--- a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
+++ b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml
@@ -91,6 +91,10 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
index 8bdcaecda..ffc22a1c5 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
@@ -20,14 +20,13 @@ package org.kie.kogito.app.jobs.springboot;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.api.JobScheduler;
import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
-import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor;
import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
import
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
import
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
@@ -35,6 +34,7 @@ import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.app.jobs.springboot.resource.RestApiConstants;
import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.handler.ExceptionHandler;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,7 +44,6 @@ import
org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import
org.springframework.transaction.support.TransactionSynchronizationManager;
-import org.springframework.transaction.support.TransactionTemplate;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -86,32 +85,13 @@ public class SpringbootJobsService implements JobsService {
protected String serviceURL;
@Autowired
- private PlatformTransactionManager transactionManager;
+ protected PlatformTransactionManager transactionManager;
+
+ @Autowired(required = false)
+ protected List<ExceptionHandler> exceptionHandlers;
@PostConstruct
public void init() {
- JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() {
-
- @Override
- public Callable<Void> chainIntercept(Callable<Void> callable) {
- TransactionTemplate transactionTemplate = new
TransactionTemplate(transactionManager);
- return new Callable<Void>() {
-
- @Override
- public Void call() throws Exception {
- return transactionTemplate.execute(status -> {
- try {
- return callable.call();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- };
- }
- };
-
this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder()
.withEventPublishers(ofNullable(eventPublisher).toArray(EventPublisher[]::new))
.withJobSchedulerListeners(ofNullable(jobSchedulerListeners).stream().toArray(JobSchedulerListener[]::new))
@@ -126,7 +106,9 @@ public class SpringbootJobsService implements JobsService {
.withRetryInterval(retryMillis)
.withMaxNumberOfRetries(maxNumberOfRetries)
.withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 *
1000L)
- .withTimeoutInterceptor(txInterceptor)
+ .withTimeoutInterceptor(
+ new
TransactionJobTimeoutInterceptor(transactionManager),
+ new
ErrorHandlingJobTimeoutInterceptor(ofNullable(exceptionHandlers).stream().toList()))
.withNumberOfWorkerThreads(numberOfWorkerThreads)
.withJobSynchronization(new JobSynchronization() {
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
new file mode 100644
index 000000000..58d4b562d
--- /dev/null
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.springboot;
+
+import java.util.concurrent.Callable;
+
+import org.kie.kogito.app.jobs.api.JobTimeoutExecution;
+import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.support.TransactionTemplate;
+
+public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor
{
+
+ private TransactionTemplate transactionTemplate;
+
+ public TransactionJobTimeoutInterceptor(PlatformTransactionManager
transactionManager) {
+ this.transactionTemplate = new TransactionTemplate(transactionManager);
+ }
+
+ @Override
+ public Callable<JobTimeoutExecution>
chainIntercept(Callable<JobTimeoutExecution> callable) {
+ return new Callable<JobTimeoutExecution>() {
+
+ @Override
+ public JobTimeoutExecution call() throws Exception {
+ return transactionTemplate.execute(status -> {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ };
+ }
+
+}
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
index 82f5d9533..6ad4e709b 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java
@@ -23,6 +23,8 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.JobsService;
@@ -41,6 +43,18 @@ public class SpringbootJPAJobStoreTest {
@Autowired
TestJobSchedulerListener listener;
+ @Autowired
+ TestJobExecutor testJobExecutor;
+
+ @Autowired
+ TestExceptionHandler exceptionHandler;
+
+ @BeforeEach
+ public void init() {
+ testJobExecutor.reset();
+ exceptionHandler.reset();
+ }
+
@Test
public void testBasicPersistence() throws Exception {
@@ -55,4 +69,16 @@ public class SpringbootJPAJobStoreTest {
}
+ @Test
+ public void testBasicError() throws Exception {
+ testJobExecutor.setNumberOfFailures(4);
+ ProcessInstanceJobDescription jobDescription = new
ProcessInstanceJobDescription("1", "-1",
+
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
5,
+ "processInstanceId", null, "processId", null,
"nodeInstanceId");
+
+ listener.setCount(4);
+ jobsService.scheduleJob(jobDescription);
+
+ Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() ->
assertThat(exceptionHandler.isError()).isTrue());
+ }
}
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
similarity index 58%
copy from
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
copy to
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
index 79046f2f3..92e15bfb3 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java
@@ -18,29 +18,33 @@
*/
package org.kie.kogito.app.jobs.springboot;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
-import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.handler.ExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
-public class TestJobSchedulerListener implements JobSchedulerListener {
+@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON)
+public class TestExceptionHandler implements ExceptionHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestExceptionHandler.class);
- private CountDownLatch latch;
+ private boolean error;
- void setCount(Integer count) {
- latch = new CountDownLatch(count);
+ @Override
+ public void handle(Exception th) {
+ LOG.info("error", th);
+ error = true;
}
- public boolean await(long timeout, TimeUnit unit) throws Exception {
- return latch.await(timeout, unit);
+ public void reset() {
+ error = false;
}
- @Override
- public void onExecution(JobDetails jobDetails) {
- latch.countDown();
+ public boolean isError() {
+ return error;
}
}
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
similarity index 51%
copy from
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
copy to
jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
index 79046f2f3..52ee0627b 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java
@@ -18,29 +18,41 @@
*/
package org.kie.kogito.app.jobs.springboot;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.jobs.service.model.JobDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
-public class TestJobSchedulerListener implements JobSchedulerListener {
+@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON)
+public class TestJobExecutor implements JobExecutor {
+
+ private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
+ private int numberOfFailures;
- private CountDownLatch latch;
+ @Override
+ public boolean accept(JobDetails jobDescription) {
+ return true;
+ }
- void setCount(Integer count) {
- latch = new CountDownLatch(count);
+ @Override
+ public void execute(JobDetails jobDescription) {
+ LOG.info("executing {}", jobDescription);
+ if (numberOfFailures > 0) {
+ --numberOfFailures;
+ throw new RuntimeException();
+ }
}
- public boolean await(long timeout, TimeUnit unit) throws Exception {
- return latch.await(timeout, unit);
+ public void setNumberOfFailures(int numberOfFailures) {
+ this.numberOfFailures = numberOfFailures;
}
- @Override
- public void onExecution(JobDetails jobDetails) {
- latch.countDown();
+ public void reset() {
+ numberOfFailures = 0;
}
}
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
index 79046f2f3..56da55a26 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java
@@ -38,6 +38,11 @@ public class TestJobSchedulerListener implements
JobSchedulerListener {
return latch.await(timeout, unit);
}
+ @Override
+ public void onFailure(JobDetails jobDetails) {
+ latch.countDown();
+ }
+
@Override
public void onExecution(JobDetails jobDetails) {
latch.countDown();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]