This is an automated email from the ASF dual-hosted git repository. jstastnycz pushed a commit to branch sync-20250907 in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
commit 69ef6e4144e562f282cecfd1281bd9d74c389687 Author: Enrique <[email protected]> AuthorDate: Mon Sep 1 10:05:13 2025 +0200 [incubator-kie-issues-2079] Set on error process instance when a job reaches the error state (#2264) * [incubator-kie-issues-2079] Set on error process instance when a job reaches the error state * fix exception handling registered * correction in error handling * add test case for error handling --- ...utInterceptor.java => JobTimeoutExecution.java} | 25 ++++--- .../kogito/app/jobs/api/JobTimeoutInterceptor.java | 2 +- .../kogito/app/jobs/impl/VertxJobScheduler.java | 82 +++++++++++----------- .../ErrorHandlingJobTimeoutInterceptor.java | 66 +++++++++++++++++ jobs/kogito-addons-quarkus-embedded-jobs/pom.xml | 4 ++ .../app/jobs/quarkus/QuarkusJobsService.java | 27 +++---- .../quarkus/TransactionJobTimeoutInterceptor.java} | 22 +++--- .../jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java | 28 +++++++- .../jobs/jpa/quarkus/TestExceptionHandler.java} | 31 +++++--- ...SchedulerListener.java => TestJobExecutor.java} | 39 ++++++---- .../jobs/jpa/quarkus/TestJobSchedulerListener.java | 5 ++ .../kogito-addons-springboot-embedded-jobs/pom.xml | 4 ++ .../app/jobs/springboot/SpringbootJobsService.java | 36 +++------- .../TransactionJobTimeoutInterceptor.java | 54 ++++++++++++++ .../jobs/springboot/SpringbootJPAJobStoreTest.java | 26 +++++++ ...ulerListener.java => TestExceptionHandler.java} | 32 +++++---- ...SchedulerListener.java => TestJobExecutor.java} | 38 ++++++---- .../jobs/springboot/TestJobSchedulerListener.java | 5 ++ 18 files changed, 370 insertions(+), 156 deletions(-) diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java similarity index 61% copy from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java copy to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java index ed3e7b20e..3fef7a854 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutExecution.java @@ -18,18 +18,27 @@ */ package org.kie.kogito.app.jobs.api; -import java.util.concurrent.Callable; +import org.kie.kogito.jobs.service.model.JobDetails; -public interface JobTimeoutInterceptor extends Comparable<JobTimeoutInterceptor> { +public class JobTimeoutExecution { - Callable<Void> chainIntercept(Callable<Void> callable); + private JobDetails jobDetails; + private Exception exception; - default Integer priority() { - return 10; + public JobTimeoutExecution(JobDetails jobDetails) { + this(jobDetails, null); } - @Override - default int compareTo(JobTimeoutInterceptor o) { - return this.priority().compareTo(o.priority()); + public JobTimeoutExecution(JobDetails jobDetails, Exception exception) { + this.jobDetails = jobDetails; + this.exception = exception; + } + + public Exception getException() { + return exception; + } + + public JobDetails getJobDetails() { + return jobDetails; } } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java index ed3e7b20e..5c8d57d28 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java @@ -22,7 +22,7 @@ import java.util.concurrent.Callable; public interface JobTimeoutInterceptor extends Comparable<JobTimeoutInterceptor> { - Callable<Void> chainIntercept(Callable<Void> callable); + Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution> callable); default Integer priority() { return 10; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java index d8d15d1c7..3f9974a74 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java @@ -38,6 +38,7 @@ import org.kie.kogito.app.jobs.api.JobScheduler; import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; +import org.kie.kogito.app.jobs.api.JobTimeoutExecution; import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionMerger; import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionMerger; @@ -232,11 +233,11 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { @Override public void handle(Long timerId) { - Callable<Void> current = new Callable<Void>() { + Callable<JobTimeoutExecution> current = new Callable<JobTimeoutExecution>() { @Override - public Void call() throws Exception { + public JobTimeoutExecution call() throws Exception { syncWithJobStores(); - return null; + return new JobTimeoutExecution(null); } }; for (JobTimeoutInterceptor interceptor : interceptors) { @@ -304,12 +305,12 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { this.refreshJobsIntervalTimerId = this.vertx.setPeriodic(0L, refreshJobsInterval, this); LOG.info("Initializing Job Service Logic \n" + - "MaxRefreshJobsIntervalWindow: {} (millis)\n" + - "MaxIntervalLimitToRetryMillis: {} (millis)\n" + - "MaxNumberOfRetries: {}\n" + - "RefreshJobsInterval: {} (millis)\n" + - "Number of worker threads {}\n" + - "Store: {}", + "\tMaxRefreshJobsIntervalWindow: {} (millis)\n" + + "\tMaxIntervalLimitToRetryMillis: {} (millis)\n" + + "\tMaxNumberOfRetries: {}\n" + + "\tRefreshJobsInterval: {} (millis)\n" + + "\tNumber of worker threads {}\n" + + "\tStore: {}", maxRefreshJobsIntervalWindow, retryInterval, maxNumberOfRetries, @@ -375,41 +376,40 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { workerExecutor.executeBlocking(newTimeoutTask(timerId, jobId)); } - private Callable<Void> newTimeoutTask(Long timerId, String jobId) { - Callable<Void> current = new Callable<Void>() { + private Callable<JobTimeoutExecution> newTimeoutTask(Long timerId, String jobId) { + Callable<JobTimeoutExecution> current = new Callable<JobTimeoutExecution>() { @Override - public Void call() throws Exception { + public JobTimeoutExecution call() throws Exception { + + LOG.trace("Timeout task {} with jobId {} newTimeoutTask", timerId, jobId); + JobContext jobContext = jobContextFactory.newContext(); + // we check now if we should run + boolean shouldRun = jobStore.shouldRun(jobContext, jobId); + if (!shouldRun) { + LOG.trace("Timeout {} with jobId {} won't run", timerId, jobId); + VertxJobScheduler.this.jobsScheduled.remove(jobId); + return null; + } + + LOG.debug("Timeout {} with jobId {} will be executed", timerId, jobId); + JobDetails jobDetails = jobStore.find(jobContext, jobId); try { - LOG.trace("Timeout task {} with jobId {} newTimeoutTask", timerId, jobId); - JobContext jobContext = jobContextFactory.newContext(); - // we check now if we should run - boolean shouldRun = jobStore.shouldRun(jobContext, jobId); - if (!shouldRun) { - LOG.trace("Timeout {} with jobId {} won't run", timerId, jobId); - VertxJobScheduler.this.jobsScheduled.remove(jobId); - return null; - } - - LOG.debug("Timeout {} with jobId {} will be executed", timerId, jobId); - JobDetails jobDetails = jobStore.find(jobContext, jobId); - try { - JobDetails runningJobDetails = doRun(jobDetails); - LOG.trace("Timeout {} with jobId {} have been executed", timerId, jobId); - JobDetails executeJobDetails = doExecute(runningJobDetails); - LOG.trace("Timeout {} with jobId {} will be rescheduled if required", timerId, jobId); - JobDetails nextJobDetails = computeNextJobDetailsIfAny(executeJobDetails); - removeIfFinal(timerId, jobContext, nextJobDetails); - jobSchedulerListeners.forEach(l -> l.onExecution(jobDetails)); - } catch (Exception exception) { - LOG.trace("Timeout {} with jobId {} will be retried if possible", timerId, jobId, exception); - JobDetails nextJobDetails = computeRetryIfAny(jobDetails); - removeIfFinal(timerId, jobContext, nextJobDetails); - jobSchedulerListeners.forEach(l -> l.onFailure(jobDetails)); - } - } catch (Exception e) { - LOG.error("unexpected error during timeout execution", e); + JobDetails runningJobDetails = doRun(jobDetails); + LOG.trace("Timeout {} with jobId {} have been executed", timerId, jobId); + JobDetails executeJobDetails = doExecute(runningJobDetails); + LOG.trace("Timeout {} with jobId {} will be rescheduled if required", timerId, jobId); + JobDetails nextJobDetails = computeNextJobDetailsIfAny(executeJobDetails); + removeIfFinal(timerId, jobContext, nextJobDetails); + jobSchedulerListeners.forEach(l -> l.onExecution(jobDetails)); + return new JobTimeoutExecution(nextJobDetails); + } catch (Exception exception) { + LOG.trace("Timeout {} with jobId {} will be retried if possible", timerId, jobId, exception); + JobDetails nextJobDetails = computeRetryIfAny(jobDetails); + removeIfFinal(timerId, jobContext, nextJobDetails); + jobSchedulerListeners.forEach(l -> l.onFailure(jobDetails)); + return new JobTimeoutExecution(nextJobDetails, exception); } - return null; + } }; for (JobTimeoutInterceptor interceptor : interceptors) { diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java new file mode 100644 index 000000000..c879088e8 --- /dev/null +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ErrorHandlingJobTimeoutInterceptor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.app.jobs.integrations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.kie.kogito.app.jobs.api.JobTimeoutExecution; +import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; +import org.kie.kogito.handler.ExceptionHandler; +import org.kie.kogito.jobs.service.model.JobStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorHandlingJobTimeoutInterceptor implements JobTimeoutInterceptor { + + private static Logger LOG = LoggerFactory.getLogger(ErrorHandlingJobTimeoutInterceptor.class); + + private List<ExceptionHandler> exceptionHandlers; + + public ErrorHandlingJobTimeoutInterceptor(List<ExceptionHandler> exceptionHandlers) { + this.exceptionHandlers = new ArrayList<>(exceptionHandlers); + } + + @Override + public Integer priority() { + return 50; + } + + @Override + public Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution> callable) { + return new Callable<JobTimeoutExecution>() { + @Override + public JobTimeoutExecution call() throws Exception { + JobTimeoutExecution execution = callable.call(); + if (execution.getJobDetails() != null && JobStatus.ERROR.equals(execution.getJobDetails().getStatus())) { + if (exceptionHandlers.isEmpty()) { + LOG.warn("there was an error in job {} but not handler were registered", execution.getJobDetails()); + } else { + LOG.error("there was error in job {}. Handling error {}", execution.getJobDetails(), execution.getException().getMessage()); + exceptionHandlers.stream().forEach(e -> e.handle(execution.getException())); + } + } + return execution; + } + }; + } + +} diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml index 8d0d65573..64397a2c7 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml +++ b/jobs/kogito-addons-quarkus-embedded-jobs/pom.xml @@ -114,6 +114,10 @@ <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java index a182ec713..686364c11 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java @@ -18,15 +18,13 @@ */ package org.kie.kogito.app.jobs.quarkus; -import java.util.concurrent.Callable; - import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.api.JobScheduler; import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; -import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; +import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor; import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; @@ -34,10 +32,10 @@ import org.kie.kogito.app.jobs.quarkus.resource.RestApiConstants; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.handler.ExceptionHandler; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.JobsService; -import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.Startup; import jakarta.annotation.PostConstruct; @@ -90,22 +88,11 @@ public class QuarkusJobsService implements JobsService { @Inject protected TransactionSynchronizationRegistry registry; + @Inject + Instance<ExceptionHandler> exceptionHandlers; + @PostConstruct public void init() { - JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() { - - @Override - public Callable<Void> chainIntercept(Callable<Void> callable) { - return new Callable<Void>() { - - @Override - public Void call() throws Exception { - return QuarkusTransaction.requiringNew().call(callable); - } - - }; - } - }; this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder() .withEventPublishers(eventPublisher.stream().toArray(EventPublisher[]::new)) .withJobSchedulerListeners(jobSchedulerListeners.stream().toArray(JobSchedulerListener[]::new)) @@ -120,7 +107,9 @@ public class QuarkusJobsService implements JobsService { .withRetryInterval(retryMillis) .withMaxNumberOfRetries(maxNumberOfRetries) .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 1000L) - .withTimeoutInterceptor(txInterceptor) + .withTimeoutInterceptor( + new TransactionJobTimeoutInterceptor(), + new ErrorHandlingJobTimeoutInterceptor(exceptionHandlers.stream().toList())) .withNumberOfWorkerThreads(numberOfWorkerThreads) .withJobSynchronization(new JobSynchronization() { diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java similarity index 58% copy from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java copy to jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java index ed3e7b20e..40984a1bf 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/TransactionJobTimeoutInterceptor.java @@ -16,20 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.api; +package org.kie.kogito.app.jobs.quarkus; import java.util.concurrent.Callable; -public interface JobTimeoutInterceptor extends Comparable<JobTimeoutInterceptor> { +import org.kie.kogito.app.jobs.api.JobTimeoutExecution; +import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; - Callable<Void> chainIntercept(Callable<Void> callable); +import io.quarkus.narayana.jta.QuarkusTransaction; - default Integer priority() { - return 10; - } +public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor { @Override - default int compareTo(JobTimeoutInterceptor o) { - return this.priority().compareTo(o.priority()); + public Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution> callable) { + return new Callable<JobTimeoutExecution>() { + + @Override + public JobTimeoutExecution call() throws Exception { + return QuarkusTransaction.requiringNew().call(callable); + } + + }; } } diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java index 334827e49..755636bf4 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java @@ -23,6 +23,8 @@ import java.time.Instant; import java.time.ZoneId; import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.kie.kogito.jobs.ExactExpirationTime; import org.kie.kogito.jobs.JobsService; @@ -43,9 +45,20 @@ public class QuarkusJPAJobStoreTest { @Inject TestJobSchedulerListener listener; + @Inject + TestJobExecutor testJobExecutor; + + @Inject + TestExceptionHandler exceptionHandler; + + @BeforeEach + public void init() { + testJobExecutor.reset(); + exceptionHandler.reset(); + } + @Test public void testBasicPersistence() throws Exception { - ProcessInstanceJobDescription jobDescription = new ProcessInstanceJobDescription("1", "-1", ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))), 5, "processInstanceId", null, "processId", null, "nodeInstanceId"); @@ -57,4 +70,17 @@ public class QuarkusJPAJobStoreTest { } + @Test + public void testBasicError() throws Exception { + testJobExecutor.setNumberOfFailures(4); + ProcessInstanceJobDescription jobDescription = new ProcessInstanceJobDescription("1", "-1", + ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))), 5, + "processInstanceId", null, "processId", null, "nodeInstanceId"); + + listener.setCount(4); + jobsService.scheduleJob(jobDescription); + + Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> assertThat(exceptionHandler.isError()).isTrue()); + } + } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java similarity index 59% copy from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java copy to jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java index ed3e7b20e..213c04fc2 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobTimeoutInterceptor.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestExceptionHandler.java @@ -16,20 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.api; +package org.kie.kogito.app.jobs.jpa.quarkus; -import java.util.concurrent.Callable; +import org.kie.kogito.handler.ExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public interface JobTimeoutInterceptor extends Comparable<JobTimeoutInterceptor> { +import jakarta.inject.Singleton; - Callable<Void> chainIntercept(Callable<Void> callable); +@Singleton +public class TestExceptionHandler implements ExceptionHandler { - default Integer priority() { - return 10; - } + private static final Logger LOG = LoggerFactory.getLogger(TestExceptionHandler.class); + + private boolean error; @Override - default int compareTo(JobTimeoutInterceptor o) { - return this.priority().compareTo(o.priority()); + public void handle(Exception th) { + LOG.info("error", th); + error = true; + } + + public void reset() { + error = false; } + + public boolean isError() { + return error; + } + } diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java similarity index 54% copy from jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java copy to jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java index 0957e4ddb..8cec79896 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobExecutor.java @@ -18,30 +18,39 @@ */ package org.kie.kogito.app.jobs.jpa.quarkus; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.kie.kogito.app.jobs.api.JobSchedulerListener; +import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.jobs.service.model.JobDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import jakarta.inject.Singleton; -import jakarta.enterprise.context.ApplicationScoped; +@Singleton +public class TestJobExecutor implements JobExecutor { -@ApplicationScoped -public class TestJobSchedulerListener implements JobSchedulerListener { + private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class); + private int numberOfFailures; - private CountDownLatch latch; + @Override + public boolean accept(JobDetails jobDescription) { + return true; + } - void setCount(Integer count) { - latch = new CountDownLatch(count); + @Override + public void execute(JobDetails jobDescription) { + LOG.info("executing {}", jobDescription); + if (numberOfFailures > 0) { + --numberOfFailures; + throw new RuntimeException(); + } } - public boolean await(long timeout, TimeUnit unit) throws Exception { - return latch.await(timeout, unit); + public void setNumberOfFailures(int numberOfFailures) { + this.numberOfFailures = numberOfFailures; } - @Override - public void onExecution(JobDetails jobDetails) { - latch.countDown(); + public void reset() { + numberOfFailures = 0; } } diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java index 0957e4ddb..8dd195e25 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/TestJobSchedulerListener.java @@ -39,6 +39,11 @@ public class TestJobSchedulerListener implements JobSchedulerListener { return latch.await(timeout, unit); } + @Override + public void onFailure(JobDetails jobDetails) { + latch.countDown(); + } + @Override public void onExecution(JobDetails jobDetails) { latch.countDown(); diff --git a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml index 672395b8a..66665e4f5 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/pom.xml +++ b/jobs/kogito-addons-springboot-embedded-jobs/pom.xml @@ -91,6 +91,10 @@ <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> <profiles> diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java index 8bdcaecda..ffc22a1c5 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java @@ -20,14 +20,13 @@ package org.kie.kogito.app.jobs.springboot; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.api.JobScheduler; import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; -import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; +import org.kie.kogito.app.jobs.integrations.ErrorHandlingJobTimeoutInterceptor; import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; @@ -35,6 +34,7 @@ import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.app.jobs.springboot.resource.RestApiConstants; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.handler.ExceptionHandler; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.JobsService; import org.springframework.beans.factory.annotation.Autowired; @@ -44,7 +44,6 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; -import org.springframework.transaction.support.TransactionTemplate; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -86,32 +85,13 @@ public class SpringbootJobsService implements JobsService { protected String serviceURL; @Autowired - private PlatformTransactionManager transactionManager; + protected PlatformTransactionManager transactionManager; + + @Autowired(required = false) + protected List<ExceptionHandler> exceptionHandlers; @PostConstruct public void init() { - JobTimeoutInterceptor txInterceptor = new JobTimeoutInterceptor() { - - @Override - public Callable<Void> chainIntercept(Callable<Void> callable) { - TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); - return new Callable<Void>() { - - @Override - public Void call() throws Exception { - return transactionTemplate.execute(status -> { - try { - return callable.call(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - }; - } - }; - this.jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder() .withEventPublishers(ofNullable(eventPublisher).toArray(EventPublisher[]::new)) .withJobSchedulerListeners(ofNullable(jobSchedulerListeners).stream().toArray(JobSchedulerListener[]::new)) @@ -126,7 +106,9 @@ public class SpringbootJobsService implements JobsService { .withRetryInterval(retryMillis) .withMaxNumberOfRetries(maxNumberOfRetries) .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 1000L) - .withTimeoutInterceptor(txInterceptor) + .withTimeoutInterceptor( + new TransactionJobTimeoutInterceptor(transactionManager), + new ErrorHandlingJobTimeoutInterceptor(ofNullable(exceptionHandlers).stream().toList())) .withNumberOfWorkerThreads(numberOfWorkerThreads) .withJobSynchronization(new JobSynchronization() { diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java new file mode 100644 index 000000000..58d4b562d --- /dev/null +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/TransactionJobTimeoutInterceptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.app.jobs.springboot; + +import java.util.concurrent.Callable; + +import org.kie.kogito.app.jobs.api.JobTimeoutExecution; +import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; + +public class TransactionJobTimeoutInterceptor implements JobTimeoutInterceptor { + + private TransactionTemplate transactionTemplate; + + public TransactionJobTimeoutInterceptor(PlatformTransactionManager transactionManager) { + this.transactionTemplate = new TransactionTemplate(transactionManager); + } + + @Override + public Callable<JobTimeoutExecution> chainIntercept(Callable<JobTimeoutExecution> callable) { + return new Callable<JobTimeoutExecution>() { + + @Override + public JobTimeoutExecution call() throws Exception { + return transactionTemplate.execute(status -> { + try { + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + }; + } + +} diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java index 82f5d9533..6ad4e709b 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/SpringbootJPAJobStoreTest.java @@ -23,6 +23,8 @@ import java.time.Instant; import java.time.ZoneId; import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.kie.kogito.jobs.ExactExpirationTime; import org.kie.kogito.jobs.JobsService; @@ -41,6 +43,18 @@ public class SpringbootJPAJobStoreTest { @Autowired TestJobSchedulerListener listener; + @Autowired + TestJobExecutor testJobExecutor; + + @Autowired + TestExceptionHandler exceptionHandler; + + @BeforeEach + public void init() { + testJobExecutor.reset(); + exceptionHandler.reset(); + } + @Test public void testBasicPersistence() throws Exception { @@ -55,4 +69,16 @@ public class SpringbootJPAJobStoreTest { } + @Test + public void testBasicError() throws Exception { + testJobExecutor.setNumberOfFailures(4); + ProcessInstanceJobDescription jobDescription = new ProcessInstanceJobDescription("1", "-1", + ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))), 5, + "processInstanceId", null, "processId", null, "nodeInstanceId"); + + listener.setCount(4); + jobsService.scheduleJob(jobDescription); + + Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> assertThat(exceptionHandler.isError()).isTrue()); + } } diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java similarity index 58% copy from jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java copy to jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java index 79046f2f3..92e15bfb3 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestExceptionHandler.java @@ -18,29 +18,33 @@ */ package org.kie.kogito.app.jobs.springboot; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.kie.kogito.app.jobs.api.JobSchedulerListener; -import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.handler.ExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component -public class TestJobSchedulerListener implements JobSchedulerListener { +@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON) +public class TestExceptionHandler implements ExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TestExceptionHandler.class); - private CountDownLatch latch; + private boolean error; - void setCount(Integer count) { - latch = new CountDownLatch(count); + @Override + public void handle(Exception th) { + LOG.info("error", th); + error = true; } - public boolean await(long timeout, TimeUnit unit) throws Exception { - return latch.await(timeout, unit); + public void reset() { + error = false; } - @Override - public void onExecution(JobDetails jobDetails) { - latch.countDown(); + public boolean isError() { + return error; } } diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java similarity index 51% copy from jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java copy to jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java index 79046f2f3..52ee0627b 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobExecutor.java @@ -18,29 +18,41 @@ */ package org.kie.kogito.app.jobs.springboot; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.kie.kogito.app.jobs.api.JobSchedulerListener; +import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.jobs.service.model.JobDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component -public class TestJobSchedulerListener implements JobSchedulerListener { +@Scope(scopeName = ConfigurableBeanFactory.SCOPE_SINGLETON) +public class TestJobExecutor implements JobExecutor { + + private Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class); + private int numberOfFailures; - private CountDownLatch latch; + @Override + public boolean accept(JobDetails jobDescription) { + return true; + } - void setCount(Integer count) { - latch = new CountDownLatch(count); + @Override + public void execute(JobDetails jobDescription) { + LOG.info("executing {}", jobDescription); + if (numberOfFailures > 0) { + --numberOfFailures; + throw new RuntimeException(); + } } - public boolean await(long timeout, TimeUnit unit) throws Exception { - return latch.await(timeout, unit); + public void setNumberOfFailures(int numberOfFailures) { + this.numberOfFailures = numberOfFailures; } - @Override - public void onExecution(JobDetails jobDetails) { - latch.countDown(); + public void reset() { + numberOfFailures = 0; } } diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java index 79046f2f3..56da55a26 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/springboot/TestJobSchedulerListener.java @@ -38,6 +38,11 @@ public class TestJobSchedulerListener implements JobSchedulerListener { return latch.await(timeout, unit); } + @Override + public void onFailure(JobDetails jobDetails) { + latch.countDown(); + } + @Override public void onExecution(JobDetails jobDetails) { latch.countDown(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
