This is an automated email from the ASF dual-hosted git repository.
porcelli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 7d7ed071f [incubator-kie-issues#1578] Enhance JobSchedulerManager to
handle overdue jobs during the period job loading (#2131)
7d7ed071f is described below
commit 7d7ed071ff92bdd9dda280699566d617a0ca3b1a
Author: Martin Weiler <[email protected]>
AuthorDate: Fri Nov 1 09:39:47 2024 -0600
[incubator-kie-issues#1578] Enhance JobSchedulerManager to handle overdue
jobs during the period job loading (#2131)
---
.../jobs/service/scheduler/BaseTimerJobScheduler.java | 3 ++-
.../jobs/service/scheduler/JobSchedulerManager.java | 19 +++++++++++++++++++
.../scheduler/impl/TimerDelegateJobScheduler.java | 3 +++
.../kie/kogito/jobs/embedded/EmbeddedJobExecutor.java | 10 +++++++++-
4 files changed, 33 insertions(+), 2 deletions(-)
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
index ac080cdb8..49e777ba8 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java
@@ -343,7 +343,8 @@ public abstract class BaseTimerJobScheduler implements
ReactiveJobScheduler {
.build())
.map(jobRepository::save)
.flatMapCompletionStage(p -> p))
- .peek(job -> LOGGER.debug("Retry executed {}", job));
+ .peek(job -> LOGGER.debug("Retry executed {}", job))
+ .onError(errorHandler -> LOGGER.error("Failed to retrieve job
due to {}", errorHandler.getMessage()));
}
private PointInTimeTrigger getRetryTrigger() {
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
index b0bd05fea..2211c3aee 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
@@ -118,6 +118,9 @@ public class JobSchedulerManager {
}
private void startJobsLoadingFromRepositoryTask() {
+ LOGGER.info(
+ "Starting with configuration: schedulerChunkInMinutes={},
loadJobIntervalInMinutes={}, loadJobFromCurrentTimeIntervalInMinutes={},
loadJobRetries={}, loadJobErrorStrategy={}",
+ schedulerChunkInMinutes, loadJobIntervalInMinutes,
loadJobFromCurrentTimeIntervalInMinutes, loadJobRetries, loadJobErrorStrategy);
//guarantee it starts the task just in case it is not already active
initialLoading.set(true);
if (periodicTimerIdForLoadJobs.get() < 0) {
@@ -129,6 +132,13 @@ public class JobSchedulerManager {
schedulerChunkInMinutes);
loadJobIntervalInMinutes = schedulerChunkInMinutes;
}
+ if (loadJobFromCurrentTimeIntervalInMinutes <
loadJobIntervalInMinutes) {
+ LOGGER.warn("The loadJobFromCurrentTimeIntervalInMinutes value
({}) is smaller than loadJobIntervalInMinutes ({}). " +
+ "This can potentially lead to overdue timers not
getting rescheduled during the periodic job loading.",
+ loadJobFromCurrentTimeIntervalInMinutes,
+ loadJobIntervalInMinutes);
+
+ }
//first execution
vertx.runOnContext(this::loadJobDetails);
//next executions to run periodically
@@ -198,6 +208,15 @@ public class JobSchedulerManager {
Date triggerFireTime = jobDetails.getTrigger().hasNextFireTime();
ZonedDateTime nextFireTime = triggerFireTime != null ?
DateUtil.instantToZonedDateTime(triggerFireTime.toInstant()) : null;
boolean scheduled =
scheduler.scheduled(jobDetails.getId()).isPresent();
+ // cancel an overdue timer to have it rescheduled
+ if (!initialLoading.get() && nextFireTime != null &&
nextFireTime.isBefore(DateUtil.now())) {
+ LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {},
status: {} is overdue and will be rescheduled", jobDetails.getId(),
+ nextFireTime,
+ jobDetails.getCreated(),
+ jobDetails.getStatus());
+ scheduler.cancel(jobDetails.getId());
+ return true;
+ }
LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {},
status: {}, already scheduled: {}", jobDetails.getId(),
nextFireTime,
jobDetails.getCreated(),
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
index 6b30c30b0..b64bd87b9 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
@@ -62,6 +62,9 @@ public class TimerDelegateJobScheduler extends
BaseTimerJobScheduler {
@ConfigProperty(name =
"kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart", defaultValue =
"true") boolean forceExecuteExpiredJobsOnServiceStart,
JobExecutorResolver jobExecutorResolver,
VertxTimerServiceScheduler delegate) {
super(jobRepository, backoffRetryMillis,
maxIntervalLimitToRetryMillis, schedulerChunkInMinutes,
forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart);
+ LOGGER.info(
+ "Creating JobScheduler with backoffRetryMillis={},
maxIntervalLimitToRetryMillis={}, schedulerChunkInMinutes={},
forceExecuteExpiredJobs={}, forceExecuteExpiredJobsOnServiceStart={}",
+ backoffRetryMillis, maxIntervalLimitToRetryMillis,
schedulerChunkInMinutes, forceExecuteExpiredJobs,
forceExecuteExpiredJobsOnServiceStart);
this.jobExecutorResolver = jobExecutorResolver;
this.delegate = delegate;
}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
index 347eedcb6..ead3bbf2a 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -56,7 +56,15 @@ public class EmbeddedJobExecutor implements JobExecutor {
InVMRecipient recipient = (InVMRecipient)
recipientModel.getRecipient();
String timerId = recipient.getPayload().getData().timerId();
String processInstanceId =
recipient.getPayload().getData().processInstanceId();
- Optional<Process<? extends Model>> process =
processes.processByProcessInstanceId(processInstanceId);
+ Optional<Process<? extends Model>> process;
+ try {
+ process = processes.processByProcessInstanceId(processInstanceId);
+ } catch (Exception ex) {
+ return Uni.createFrom().failure(
+ new JobExecutionException(jobDetails.getId(),
+ "Unexpected error when executing Embedded request
for job: " + jobDetails.getId() + ". " + ex.getMessage(),
+ ex));
+ }
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]