This is an automated email from the ASF dual-hosted git repository.

porcelli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d7ed071f [incubator-kie-issues#1578] Enhance JobSchedulerManager to 
handle overdue jobs during the period job loading (#2131)
7d7ed071f is described below

commit 7d7ed071ff92bdd9dda280699566d617a0ca3b1a
Author: Martin Weiler <[email protected]>
AuthorDate: Fri Nov 1 09:39:47 2024 -0600

    [incubator-kie-issues#1578] Enhance JobSchedulerManager to handle overdue 
jobs during the period job loading (#2131)
---
 .../jobs/service/scheduler/BaseTimerJobScheduler.java |  3 ++-
 .../jobs/service/scheduler/JobSchedulerManager.java   | 19 +++++++++++++++++++
 .../scheduler/impl/TimerDelegateJobScheduler.java     |  3 +++
 .../kie/kogito/jobs/embedded/EmbeddedJobExecutor.java | 10 +++++++++-
 4 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
index ac080cdb8..49e777ba8 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
@@ -343,7 +343,8 @@ public abstract class BaseTimerJobScheduler implements 
ReactiveJobScheduler {
                                 .build())
                         .map(jobRepository::save)
                         .flatMapCompletionStage(p -> p))
-                .peek(job -> LOGGER.debug("Retry executed {}", job));
+                .peek(job -> LOGGER.debug("Retry executed {}", job))
+                .onError(errorHandler -> LOGGER.error("Failed to retrieve job 
due to {}", errorHandler.getMessage()));
     }
 
     private PointInTimeTrigger getRetryTrigger() {
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
index b0bd05fea..2211c3aee 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
@@ -118,6 +118,9 @@ public class JobSchedulerManager {
     }
 
     private void startJobsLoadingFromRepositoryTask() {
+        LOGGER.info(
+                "Starting with configuration: schedulerChunkInMinutes={}, 
loadJobIntervalInMinutes={}, loadJobFromCurrentTimeIntervalInMinutes={}, 
loadJobRetries={}, loadJobErrorStrategy={}",
+                schedulerChunkInMinutes, loadJobIntervalInMinutes, 
loadJobFromCurrentTimeIntervalInMinutes, loadJobRetries, loadJobErrorStrategy);
         //guarantee it starts the task just in case it is not already active
         initialLoading.set(true);
         if (periodicTimerIdForLoadJobs.get() < 0) {
@@ -129,6 +132,13 @@ public class JobSchedulerManager {
                         schedulerChunkInMinutes);
                 loadJobIntervalInMinutes = schedulerChunkInMinutes;
             }
+            if (loadJobFromCurrentTimeIntervalInMinutes < 
loadJobIntervalInMinutes) {
+                LOGGER.warn("The loadJobFromCurrentTimeIntervalInMinutes value 
({}) is smaller than loadJobIntervalInMinutes ({}). " +
+                        "This can potentially lead to overdue timers not 
getting rescheduled during the periodic job loading.",
+                        loadJobFromCurrentTimeIntervalInMinutes,
+                        loadJobIntervalInMinutes);
+
+            }
             //first execution
             vertx.runOnContext(this::loadJobDetails);
             //next executions to run periodically
@@ -198,6 +208,15 @@ public class JobSchedulerManager {
         Date triggerFireTime = jobDetails.getTrigger().hasNextFireTime();
         ZonedDateTime nextFireTime = triggerFireTime != null ? 
DateUtil.instantToZonedDateTime(triggerFireTime.toInstant()) : null;
         boolean scheduled = 
scheduler.scheduled(jobDetails.getId()).isPresent();
+        // cancel an overdue timer to have it rescheduled
+        if (!initialLoading.get() && nextFireTime != null && 
nextFireTime.isBefore(DateUtil.now())) {
+            LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, 
status: {} is overdue and will be rescheduled", jobDetails.getId(),
+                    nextFireTime,
+                    jobDetails.getCreated(),
+                    jobDetails.getStatus());
+            scheduler.cancel(jobDetails.getId());
+            return true;
+        }
         LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, 
status: {}, already scheduled: {}", jobDetails.getId(),
                 nextFireTime,
                 jobDetails.getCreated(),
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
index 6b30c30b0..b64bd87b9 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
@@ -62,6 +62,9 @@ public class TimerDelegateJobScheduler extends 
BaseTimerJobScheduler {
             @ConfigProperty(name = 
"kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart", defaultValue = 
"true") boolean forceExecuteExpiredJobsOnServiceStart,
             JobExecutorResolver jobExecutorResolver, 
VertxTimerServiceScheduler delegate) {
         super(jobRepository, backoffRetryMillis, 
maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, 
forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart);
+        LOGGER.info(
+                "Creating JobScheduler with backoffRetryMillis={}, 
maxIntervalLimitToRetryMillis={}, schedulerChunkInMinutes={}, 
forceExecuteExpiredJobs={}, forceExecuteExpiredJobsOnServiceStart={}",
+                backoffRetryMillis, maxIntervalLimitToRetryMillis, 
schedulerChunkInMinutes, forceExecuteExpiredJobs, 
forceExecuteExpiredJobsOnServiceStart);
         this.jobExecutorResolver = jobExecutorResolver;
         this.delegate = delegate;
     }
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
index 347eedcb6..ead3bbf2a 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -56,7 +56,15 @@ public class EmbeddedJobExecutor implements JobExecutor {
         InVMRecipient recipient = (InVMRecipient) 
recipientModel.getRecipient();
         String timerId = recipient.getPayload().getData().timerId();
         String processInstanceId = 
recipient.getPayload().getData().processInstanceId();
-        Optional<Process<? extends Model>> process = 
processes.processByProcessInstanceId(processInstanceId);
+        Optional<Process<? extends Model>> process;
+        try {
+            process = processes.processByProcessInstanceId(processInstanceId);
+        } catch (Exception ex) {
+            return Uni.createFrom().failure(
+                    new JobExecutionException(jobDetails.getId(),
+                            "Unexpected error when executing Embedded request 
for job: " + jobDetails.getId() + ". " + ex.getMessage(),
+                            ex));
+        }
         if (process.isEmpty()) {
             return Uni.createFrom().item(
                     JobExecutionResponse.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to