wmedvede commented on code in PR #2214: URL: https://github.com/apache/incubator-kie-kogito-apps/pull/2214#discussion_r2094160143
########## jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/AbstractTimerJobScheduler.java: ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.scheduler; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.kogito.jobs.service.exception.InvalidScheduleTimeException; +import org.kie.kogito.jobs.service.exception.JobServiceException; +import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.jobs.service.model.JobExecutionResponse; +import org.kie.kogito.jobs.service.model.JobStatus; +import org.kie.kogito.jobs.service.model.ManageableJobHandle; +import org.kie.kogito.jobs.service.repository.JobRepository; +import org.kie.kogito.jobs.service.stream.JobEventPublisher; +import org.kie.kogito.jobs.service.utils.DateUtil; +import org.kie.kogito.timer.Trigger; +import org.kie.kogito.timer.impl.PointInTimeTrigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithStatus; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithStatusAndHandle; + +/** + * Base reactive Job Scheduler that performs the fundamental operations and let to the concrete classes to + * implement the scheduling actions. + */ +public abstract class AbstractTimerJobScheduler implements JobScheduler<JobDetails> { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTimerJobScheduler.class); + + long backoffRetryMillis; + + long maxIntervalLimitToRetryMillis; + + /** + * Flag to allow and force a job with expirationTime in the past to be executed immediately. If false an + * exception will be thrown. + */ + boolean forceExecuteExpiredJobs; + + /** + * Flag to allow that jobs that might have overdue during an eventual service shutdown should be fired at the + * next service start. + */ + boolean forceExecuteExpiredJobsOnServiceStart; + + /** + * The current chunk size in minutes the scheduler handles, it is used to keep a limit number of jobs scheduled + * in the in-memory scheduler. + */ + long schedulerChunkInMinutes; + + private JobRepository jobRepository; + + private final Map<String, SchedulerControlRecord> schedulerControl; + + protected static class SchedulerControlRecord { + private final String jobId; + private final long handleId; + private final ZonedDateTime scheduledTime; + + public SchedulerControlRecord(String jobId, long handleId, ZonedDateTime scheduledTime) { + this.jobId = jobId; + this.handleId = handleId; + this.scheduledTime = scheduledTime; + } + + public String getJobId() { + return jobId; + } + + public long getHandleId() { + return handleId; + } + + public ZonedDateTime getScheduledTime() { + return scheduledTime; + } + } + + protected AbstractTimerJobScheduler() { + this(null, 0, 0, 0, true, true); + } + + abstract protected Optional<JobEventPublisher> getJobEventPublisher(); + + protected AbstractTimerJobScheduler(JobRepository jobRepository, + long backoffRetryMillis, + long maxIntervalLimitToRetryMillis, + long schedulerChunkInMinutes, + boolean forceExecuteExpiredJobs, + boolean forceExecuteExpiredJobsOnServiceStart) { + this.jobRepository = jobRepository; + this.backoffRetryMillis = backoffRetryMillis; + this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis; + this.schedulerControl = new ConcurrentHashMap<>(); + this.schedulerChunkInMinutes = schedulerChunkInMinutes; + this.forceExecuteExpiredJobs = forceExecuteExpiredJobs; + this.forceExecuteExpiredJobsOnServiceStart = forceExecuteExpiredJobsOnServiceStart; + } + + /** + * Executed from the API to reflect client invocations. + */ + @Override + public JobDetails schedule(JobDetails job) { + LOGGER.debug("Scheduling job: {}", job); + if (jobRepository.exists(job.getId())) { + LOGGER.trace("Job already exists {}", job); + jobRepository.delete(cancel(handleExistingJob(job))); + } + if (isOnCurrentSchedulerChunk(job)) { + LOGGER.trace("Job {} will be scheduled right away", job); + job = doJobScheduling(job); + } else { + LOGGER.trace("Job will not be scheduled {} but will be saved", job); + JobDetails savedJob = jobRepository.save(jobWithStatus(job, JobStatus.SCHEDULED)); + getJobEventPublisher().ifPresent(p -> p.publishJobStatusChange(savedJob)); + } + return job; + } + + /** + * Internal use, executed by the periodic loader only. Jobs processed by this method belongs to the current chunk. + */ + @Override + public JobDetails internalSchedule(JobDetails job, boolean onServiceStart) { + LOGGER.debug("Internal Scheduling, onServiceStart: {}, job: {}", onServiceStart, job); + if (jobRepository.exists(job.getId())) { + return handleInternalSchedule(job, onServiceStart); + } else { + return handleInternalScheduleDeletedJob(job); + } + } + + @Override + public JobDetails reschedule(String jobId, Trigger trigger) { + JobDetails currentJobDetails = jobRepository.get(jobId); + if (currentJobDetails == null) { + return null; + } + LOGGER.trace("about to reschedule {} with new trigger {}", currentJobDetails, trigger); + JobDetails rescheduleJobDetails = JobDetails.builder().id(jobId).trigger(trigger).build(); + JobDetails mergedJobDetails = jobRepository.merge(jobId, rescheduleJobDetails); + if (mergedJobDetails == null) { + return null; + } + + LOGGER.trace("about to reschedule the current merge {}", currentJobDetails); + getJobEventPublisher().ifPresent(p -> p.publishJobStatusChange(jobWithStatus(currentJobDetails, JobStatus.CANCELED))); + this.doCancel(mergedJobDetails); + + if (this.isOnCurrentSchedulerChunk(mergedJobDetails)) { + return schedule(mergedJobDetails); + } else { + JobDetails newJobDetails = this.jobRepository.save(jobWithStatus(mergedJobDetails, JobStatus.SCHEDULED)); + getJobEventPublisher().ifPresent(p -> p.publishJobStatusChange(newJobDetails)); + return newJobDetails; + } + + } + + /** + * Performs the given job scheduling process on the scheduler, after all the validations already made. + */ + private JobDetails doJobScheduling(JobDetails job) { + Date date = job.getTrigger().hasNextFireTime(); + ZonedDateTime dateTime = DateUtil.fromDate(new Date(date.getTime())); + Duration delay = this.calculateDelay(dateTime); + + if (!forceExecuteExpiredJobs && delay.isNegative()) { + throw new InvalidScheduleTimeException( + String.format("The expirationTime: %s, for job: %s should be greater than current time: %s.", + job.getTrigger().hasNextFireTime(), job.getId(), ZonedDateTime.now())); + } + LOGGER.trace("Job details before scheduling {}", job); + JobDetails savedJobDetails = jobRepository.save(jobWithStatus(job, JobStatus.SCHEDULED)); + LOGGER.trace("Saved job details before scheduling {} in {}", savedJobDetails, jobRepository.getClass().getName()); + ManageableJobHandle manageableJobHandle = scheduleRegistering(savedJobDetails, job.getTrigger()); + JobDetails scheduledJob = jobWithStatusAndHandle(savedJobDetails, JobStatus.SCHEDULED, manageableJobHandle); + getJobEventPublisher().ifPresent(p -> p.publishJobStatusChange(scheduledJob)); + return jobRepository.save(scheduledJob); + } + + /** + * Check if the job should be scheduled on the current chunk or saved to be scheduled later. + */ + private boolean isOnCurrentSchedulerChunk(JobDetails job) { + ZonedDateTime jobDateTime = DateUtil.fromDate(job.getTrigger().hasNextFireTime()); + ZonedDateTime maxSchedulerChunk = DateUtil.now().plusMinutes(schedulerChunkInMinutes); + boolean isOnCurrentSchedulerChunk = jobDateTime.isBefore(maxSchedulerChunk); + LOGGER.debug("isOnCurrentSchedulerChunk job time {} < window time {} is {} for job {}", jobDateTime, maxSchedulerChunk, isOnCurrentSchedulerChunk, job); + return isOnCurrentSchedulerChunk; + } + + private JobDetails handleExistingJob(JobDetails job) { + JobDetails savedJobDetails = jobRepository.get(job.getId()); + switch (savedJobDetails.getStatus()) { + case SCHEDULED: + case RETRY: + // cancel the job. + return jobWithStatus(savedJobDetails, JobStatus.CANCELED); + default: + // uncommon, break the stream processing + return null; + } + } + + private JobDetails handleInternalSchedule(JobDetails job, boolean onStart) { + unregisterScheduledJob(job); + switch (job.getStatus()) { + case SCHEDULED: + Duration delay = calculateRawDelay(DateUtil.fromDate(job.getTrigger().hasNextFireTime())); + if (delay.isNegative() && onStart && !forceExecuteExpiredJobsOnServiceStart) { + return handleExpiredJob(job); + } else { + // other cases of potential overdue are because of slow processing of the jobs service, or the user + // configured to fire overdue triggers at service startup. Always schedule. + if (job.getScheduledId() != null) { + // cancel the existing timer if any. + doCancel(job); + } + + ManageableJobHandle handle = scheduleRegistering(job, job.getTrigger()); + JobDetails scheduledJob = jobWithStatusAndHandle(job, JobStatus.SCHEDULED, handle); + jobRepository.save(scheduledJob); + + } + case RETRY: + return handleRetry(job); + default: + // by definition there are no more cases, only SCHEDULED and RETRY cases are picked by the loader. + return job; + } + } + + private JobDetails handleInternalScheduleDeletedJob(JobDetails job) { + LOGGER.warn("Job was removed from database: {}.", job); + return job; + } + + private Duration calculateDelay(ZonedDateTime expirationTime) { + Duration delay = Duration.between(DateUtil.now(), expirationTime); + if (!delay.isNegative()) { + return delay; + } + //in case forceExecuteExpiredJobs is true, execute the job immediately. + return forceExecuteExpiredJobs ? Duration.ofSeconds(1) : Duration.ofSeconds(-1); + } + + private Duration calculateRawDelay(ZonedDateTime expirationTime) { + return Duration.between(DateUtil.now(), expirationTime); + } + + public JobDetails handleJobExecutionSuccess(JobDetails futureJob) { + futureJob.getTrigger().nextFireTime(); + if (Objects.nonNull(futureJob.getTrigger().hasNextFireTime())) { + JobDetails nextJobDetails = JobDetails.builder().of(futureJob).incrementExecutionCounter().status(JobStatus.SCHEDULED).build(); + JobDetails newScheduledJobDetails = doJobScheduling(nextJobDetails); + jobRepository.save(newScheduledJobDetails); Review Comment: The job was already prepared and saved as part of the doJobScheduling(nextJobDetails); These lines don't go I think, take a look please: jobRepository.save(newScheduledJobDetails); JobDetails excecutedJobDetails = jobWithStatus(futureJob, JobStatus.EXECUTED); getJobEventPublisher().ifPresent(p -> p.publishJobStatusChange(excecutedJobDetails)); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
