This is an automated email from the ASF dual-hosted git repository. jstastnycz pushed a commit to branch sync-20250907 in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
commit 2049ad3bc9f8afb03b4665180575682fec422e72 Author: Enrique <[email protected]> AuthorDate: Thu Aug 28 17:04:55 2025 +0200 [NO_ISSUE] fix race condition when reschedule (#2259) * [NO_ISSUE] fix race condition when reschedule * race condition revisited * fix number of retries * fix process merger * process job serialization * end time fix * fix compilation problem * fix type * fix formatting * fix marshaller * fix phatom scheduler in error state * add test exact time * fix inmediate schedule * fix retry millis * fix deserializer * fix formatting --- .../service/json/JobDescriptionDeserializer.java | 9 ++ .../service/json/JobDescriptionSerializer.java | 5 +- .../kie/kogito/jobs/service/model/JobStatus.java | 22 +-- jobs/README.md | 2 +- .../kogito/app/jobs/api/JobDescriptionMerger.java | 22 +-- .../kogito/app/jobs/api/JobSchedulerBuilder.java | 2 + .../kogito/app/jobs/impl/VertxJobScheduler.java | 162 +++++++++++++++------ ...tractJobDescriptionJobInstanceEventAdapter.java | 2 +- .../JobDescriptionHelper.java | 41 +----- ...tanceJobDescriptionJobInstanceEventAdapter.java | 2 +- .../ProcessInstanceJobDescriptionMerger.java | 51 +++++++ .../ProcessInstanceJobExecutor.java | 2 +- ...ocessJobDescriptionJobInstanceEventAdapter.java | 2 +- .../ProcessJobDescriptionMerger.java} | 26 ++-- .../ProcessJobExecutor.java | 2 +- ...tanceJobDescriptionJobInstanceEventAdapter.java | 2 +- .../UserTaskInstanceJobDescriptorMerger.java | 51 +++++++ .../UserTaskInstanceJobExecutor.java | 2 +- .../impl/LatchFailureJobSchedulerListener.java | 98 +++++++++++++ ...tAdapter.java => TestJobDescriptionMerger.java} | 23 ++- .../app/jobs/impl/TestJobDetailsEventAdapter.java | 2 +- .../app/jobs/impl/VertxJobSchedulerTest.java | 74 +++++++++- .../jobs/quarkus/QuarkusJobServiceProducer.java | 6 +- .../app/jobs/quarkus/QuarkusJobsService.java | 12 +- .../app/jobs/quarkus/resource/JobResourceV1.java | 2 +- .../SpringbootJobServiceConfiguration.java | 6 +- .../app/jobs/springboot/SpringbootJobsService.java | 12 +- .../jobs/springboot/resource/JobResourceV1.java | 2 +- 28 files changed, 479 insertions(+), 165 deletions(-) diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java index e7becd90e..16a1acb49 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java @@ -25,6 +25,7 @@ import org.kie.kogito.jobs.ExpirationTime; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder; +import org.kie.kogito.jobs.descriptors.ProcessJobDescription; import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder; @@ -50,6 +51,14 @@ public class JobDescriptionDeserializer extends StdDeserializer<JobDescription> JsonNode node = jp.getCodec().readTree(jp); String jobDescriptionType = node.get("@type").asText(); switch (jobDescriptionType) { + case "ProcessJobDescription": { + String id = ofNullable(node.get("id")).map(JsonNode::textValue).orElse(null); + String processId = ofNullable(node.get("processId")).map(JsonNode::textValue).orElse(null); + Integer priority = ofNullable(node.get("priority")).map(JsonNode::asInt).orElse(0); + String expirationTimeType = node.get("expirationTime").get("@type").asText(); + ExpirationTime expirationTime = (ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(expirationTimeType)); + return ProcessJobDescription.of(expirationTime, priority, processId, id); + } case "ProcessInstanceJobDescription": { ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder(); ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java index 37544d67b..46d9bc987 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.ProcessJobDescription; import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; import com.fasterxml.jackson.core.JsonGenerator; @@ -43,7 +44,9 @@ public class JobDescriptionSerializer extends StdSerializer<JobDescription> { jgen.writeStringField("id", value.id()); jgen.writeNumberField("priority", value.priority()); jgen.writeObjectField("expirationTime", value.expirationTime()); - if (value instanceof ProcessInstanceJobDescription jobDescription) { + if (value instanceof ProcessJobDescription processJobDescription) { + jgen.writeStringField("processId", processJobDescription.processId()); + } else if (value instanceof ProcessInstanceJobDescription jobDescription) { jgen.writeStringField("timerId", jobDescription.timerId()); jgen.writeStringField("processInstanceId", jobDescription.processInstanceId()); jgen.writeStringField("rootProcessInstanceId", jobDescription.rootProcessInstanceId()); diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java index 02bdddb9c..2b891b494 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java @@ -19,20 +19,10 @@ package org.kie.kogito.jobs.service.model; public enum JobStatus { - ERROR(false), //final - RUNNING(false), - EXECUTED(true), //final - SCHEDULED(false), //active - RETRY(false), //active - CANCELED(true); //final - - private boolean finalStatus; - - JobStatus(boolean finalStatus) { - this.finalStatus = finalStatus; - } - - public boolean isFinalStatus() { - return finalStatus; - } + ERROR, //final + EXECUTED, //final + SCHEDULED, //active + RETRY, //active + CANCELED, //final + RUNNING } diff --git a/jobs/README.md b/jobs/README.md index a650159cf..49c4fe8de 100644 --- a/jobs/README.md +++ b/jobs/README.md @@ -34,7 +34,7 @@ At present Addons jobs supports quarkus and spring boot The properties supported are: * kogito.jobs-service.numberOfWorkerThreads: maximum of number of worker thread to execute timeouts (default is 10) * kogito.jobs-service.maxNumberOfRetries: numbers of retry of a failred job. After this number is reached the job will be set to failure. (default is 3 times) -* kogito.jobs-service.maxIntervalLimitToRetryMillis: interval used to retry the new job (default 60 seconds) +* kogito.jobs-service.retryMillis: interval used to retry the new job (default 60 seconds) * kogito.jobs-service.schedulerChunkInMinutes: max window minutes from actual date to the future to load timers in memory (default is 10 minutes) * kogito.service.url: url service is this collocated service. (default is localhost:8080) diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java similarity index 66% copy from jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java copy to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java index 02bdddb9c..10bfbf05a 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java @@ -16,23 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.jobs.service.model; +package org.kie.kogito.app.jobs.api; -public enum JobStatus { - ERROR(false), //final - RUNNING(false), - EXECUTED(true), //final - SCHEDULED(false), //active - RETRY(false), //active - CANCELED(true); //final +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.timer.Trigger; - private boolean finalStatus; +public interface JobDescriptionMerger { - JobStatus(boolean finalStatus) { - this.finalStatus = finalStatus; - } + boolean accept(Object instance); + + JobDescription mergeTrigger(JobDescription jobDescription, Trigger trigger); - public boolean isFinalStatus() { - return finalStatus; - } } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java index 0ee283f46..77439524c 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java @@ -56,4 +56,6 @@ public interface JobSchedulerBuilder { JobSchedulerBuilder withNumberOfWorkerThreads(Integer numberOfWorkerThreads); JobSchedulerBuilder withJobSynchronization(JobSynchronization jobSynchronization); + + JobSchedulerBuilder withJobDescriptorMergers(JobDescriptionMerger... jobDescriptionMergers); } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java index cbbc19ae9..d8d15d1c7 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java @@ -31,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; import org.kie.kogito.app.jobs.api.JobDetailsEventAdapter; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.api.JobScheduler; @@ -38,6 +39,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; +import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionMerger; +import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionMerger; +import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptorMerger; import org.kie.kogito.app.jobs.spi.JobContext; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; @@ -48,7 +52,9 @@ import org.kie.kogito.event.EventPublisher; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobStatus; +import org.kie.kogito.jobs.service.model.RecipientInstance; import org.kie.kogito.jobs.service.utils.DateUtil; +import org.kie.kogito.timer.Trigger; import org.kie.kogito.timer.impl.SimpleTimerTrigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +93,8 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { private List<JobTimeoutInterceptor> interceptors; + private List<JobDescriptionMerger> jobDescriptionMergers; + private ConcurrentMap<String, TimerInfo> jobsScheduled; private Long refreshJobsIntervalTimerId; @@ -185,6 +193,12 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { return this; } + @Override + public JobSchedulerBuilder withJobDescriptorMergers(JobDescriptionMerger... jobDescriptionMergers) { + VertxJobScheduler.this.jobDescriptionMergers.addAll(List.of(jobDescriptionMergers)); + return this; + } + } public VertxJobScheduler() { @@ -197,6 +211,10 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { this.jobEventAdapters = new ArrayList<>(); this.jobSchedulerListeners = new ArrayList<>(); this.interceptors = new ArrayList<>(); + this.jobDescriptionMergers = new ArrayList<>(); + this.jobDescriptionMergers.add(new UserTaskInstanceJobDescriptorMerger()); + this.jobDescriptionMergers.add(new ProcessInstanceJobDescriptionMerger()); + this.jobDescriptionMergers.add(new ProcessJobDescriptionMerger()); this.jobSynchronization = new JobSynchronization() { @Override @@ -338,17 +356,7 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { jobSchedulerListeners.forEach(l -> l.onReschedule(rescheduledJobDetails)); jobStore.update(jobContextFactory.newContext(), rescheduledJobDetails); - this.jobSynchronization.synchronize(new Runnable() { - - @Override - public void run() { - jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo) -> { - removeTimerInfo(timerInfo); - return addTimerInfo(rescheduledJobDetails); - }); - } - - }); + updateTxTimer(rescheduledJobDetails); return rescheduledJobDetails.getId(); } @@ -412,34 +420,68 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { private void removeIfFinal(Long timerId, JobContext jobContext, JobDetails nextJobDetails) { String jobId = nextJobDetails.getId(); - if (!nextJobDetails.getStatus().isFinalStatus()) { - LOG.trace("Timeout {} with jobId {} will be updated", timerId, jobId); - jobStore.update(jobContext, nextJobDetails); - doSchedule(nextJobDetails); - } else { - LOG.trace("Timeout {} with jobId {} will be removed", timerId, jobId); - jobStore.remove(jobContext, jobId); + switch (nextJobDetails.getStatus()) { + case EXECUTED: + case CANCELED: + LOG.trace("Timeout {} with jobId {} will be removed", timerId, jobId); + removeTxTimer(nextJobDetails); + jobStore.remove(jobContext, jobId); + break; + case SCHEDULED: + case RETRY: + LOG.trace("Timeout {} with jobId {} will be updated and scheduled", timerId, jobId); + jobStore.update(jobContext, nextJobDetails); + doNextSchedule(nextJobDetails); + break; + case ERROR: + LOG.trace("Timeout {} with jobId {} will be set to error", timerId, jobId); + removeTxTimer(nextJobDetails); + jobStore.update(jobContext, nextJobDetails); + break; + default: + LOG.trace("Timeout {} with jobId {} is RUNNING and should not happen", timerId, jobId); + break; } } - // lifecycle calls - private JobDetails doSchedule(JobDetails jobDetails) { + // add tx timer and remove tx timer + private void updateTxTimer(JobDetails jobDetails) { this.jobSynchronization.synchronize(new Runnable() { - @Override public void run() { - jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo) -> { + // if the timer info does not exist we should not reschedule as it was executed or cancelled by + jobsScheduled.computeIfPresent(jobDetails.getId(), (jobId, timerInfo) -> { + removeTimerInfo(timerInfo); return addTimerInfo(jobDetails); }); } + }); + } + private void addTxTimer(JobDetails jobDetails) { + this.jobSynchronization.synchronize(new Runnable() { + @Override + public void run() { + jobsScheduled.computeIfAbsent(jobDetails.getId(), jobId -> { + return addTimerInfo(jobDetails); + }); + } }); + } - LOG.trace("doSchedule {}", jobDetails); - fireEvents(jobDetails); - return jobDetails; + private void removeTxTimer(JobDetails jobDetails) { + this.jobSynchronization.synchronize(new Runnable() { + @Override + public void run() { + jobsScheduled.computeIfPresent(jobDetails.getId(), (jobId, timerInfo) -> { + removeTimerInfo(timerInfo); + return null; + }); + } + }); } + // vertx calls private TimerInfo addTimerInfo(JobDetails jobDetails) { LOG.trace("addTimerInfo {}", jobDetails); // if it is negative means it should be executed right away @@ -462,6 +504,21 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { this.vertx.cancelTimer(timerId); } + // lifecycle calls + private JobDetails doNextSchedule(JobDetails jobDetails) { + updateTxTimer(jobDetails); + LOG.trace("doNextSchedule {}", jobDetails); + fireEvents(jobDetails); + return jobDetails; + } + + private JobDetails doSchedule(JobDetails jobDetails) { + addTxTimer(jobDetails); + LOG.trace("doSchedule {}", jobDetails); + fireEvents(jobDetails); + return jobDetails; + } + private JobDetails doRun(JobDetails jobDetails) { JobDetails runJobDetails = JobDetails.builder().of(jobDetails).status(JobStatus.RUNNING).build(); LOG.trace("doRun {}", runJobDetails); @@ -470,18 +527,7 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { } private JobDetails doCancel(JobDetails jobDetails) { - this.jobSynchronization.synchronize(new Runnable() { - - @Override - public void run() { - jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo) -> { - removeTimerInfo(timerInfo); - return null; - }); - } - - }); - + removeTxTimer(jobDetails); JobDetails canceledJobDetails = JobDetails.builder().of(jobDetails).status(JobStatus.CANCELED).build(); LOG.trace("doCancel {}", canceledJobDetails); fireEvents(canceledJobDetails); @@ -493,11 +539,6 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { LOG.trace("valid executors are: {}", validExecutors); validExecutors.forEach(executor -> executor.execute(jobDetails)); JobDetails executedJobDetails = JobDetails.builder().of(jobDetails).status(JobStatus.EXECUTED).incrementExecutionCounter().build(); - this.jobSynchronization.synchronize(new Runnable() { - public void run() { - jobsScheduled.remove(jobDetails.getId()); - }; - }); LOG.trace("doExecute {}", executedJobDetails); fireEvents(executedJobDetails); return executedJobDetails; @@ -507,7 +548,13 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { LOG.trace("doRetryIfAny {}", jobDetails); Integer retryCounter = jobDetails.getRetries(); if (retryCounter < this.maxNumberOfRetries) { + + Date now = Date.from(DateUtil.now().plus(Duration.ofMillis(retryInterval)).toInstant()); + Trigger newTrigger = setTriggerDate(jobDetails.getTrigger(), now); + JobDescription jobDescriptionMerged = setJobDescription(jobDetails, newTrigger); JobDetails retryJobDetails = JobDetails.builder().of(jobDetails) + .trigger(newTrigger) + .recipient(new RecipientInstance(new InVMRecipient(new InVMPayloadData(jobDescriptionMerged)))) .status(JobStatus.RETRY) .executionTimeout(jobDetails.getExecutionTimeout() + retryInterval) .incrementRetries() @@ -521,15 +568,40 @@ public class VertxJobScheduler implements JobScheduler, Handler<Long> { } } + private Trigger setTriggerDate(Trigger oldTrigger, Date newOriginDate) { + SimpleTimerTrigger oldSimpleTimerTrigger = (SimpleTimerTrigger) oldTrigger; + SimpleTimerTrigger newTrigger = new SimpleTimerTrigger( + newOriginDate, + oldSimpleTimerTrigger.getPeriod(), + oldSimpleTimerTrigger.getPeriodUnit(), + oldSimpleTimerTrigger.getRepeatCount(), + oldSimpleTimerTrigger.getEndTime(), + oldSimpleTimerTrigger.getZoneId()); + return newTrigger; + } + + private JobDescription setJobDescription(JobDetails jobDetails, Trigger newTrigger) { + JobDescription jobDescription = jobDetails.getRecipient().<InVMPayloadData> getRecipient().getPayload().getJobDescription(); + + JobDescription newJobDescription = jobDescriptionMergers.stream() + .filter(merger -> merger.accept(jobDescription)) + .map(merger -> merger.mergeTrigger(jobDescription, newTrigger)) + .findFirst() + .orElseThrow(); + return newJobDescription; + } + private JobDetails computeNextJobDetailsIfAny(JobDetails jobDetails) { // there is a problem here. If we retried the job the origin, the current time is different. // so we set the current time as the time of execution so we do execute things at fixed interval time. ((SimpleTimerTrigger) jobDetails.getTrigger()).setNextFireTime(Date.from(Instant.now())); - jobDetails.getTrigger().nextFireTime(); if (jobDetails.getTrigger().hasNextFireTime() != null) { - - JobDetails nextJobDetails = JobDetails.builder().of(jobDetails) + // we set the date for the trigger so we compute new job description + JobDescription jobDescriptionMerged = setJobDescription(jobDetails, jobDetails.getTrigger()); + JobDetails nextJobDetails = JobDetails.builder() + .of(jobDetails) + .recipient(new RecipientInstance(new InVMRecipient(new InVMPayloadData(jobDescriptionMerged)))) .status(JobStatus.SCHEDULED) .retries(0) .executionTimeout(jobDetails.getTrigger().hasNextFireTime().getTime()) diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java similarity index 99% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java index e1947934d..c242aefc3 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import java.util.Optional; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java similarity index 50% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java index bdfd350bb..66d6d1169 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java @@ -16,52 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.impl; +package org.kie.kogito.app.jobs.integrations; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.List; +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; import org.kie.kogito.jobs.DurationExpirationTime; import org.kie.kogito.jobs.ExpirationTime; import org.kie.kogito.jobs.JobDescription; -import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; -import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; import org.kie.kogito.jobs.service.utils.DateUtil; import org.kie.kogito.timer.Trigger; import org.kie.kogito.timer.impl.SimpleTimerTrigger; public class JobDescriptionHelper { - public static JobDescription newJobDescription(JobDescription jobDescription, Trigger trigger) { - if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) { - ProcessInstanceJobDescription newProcessInstanceJobDescription = new ProcessInstanceJobDescription( - processInstanceJobDescription.id(), - processInstanceJobDescription.timerId(), - toExpirationTime(trigger), - processInstanceJobDescription.priority(), - processInstanceJobDescription.processInstanceId(), - processInstanceJobDescription.rootProcessInstanceId(), - processInstanceJobDescription.processId(), - processInstanceJobDescription.rootProcessId(), - processInstanceJobDescription.nodeInstanceId()); - return newProcessInstanceJobDescription; - } else if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription) { - UserTaskInstanceJobDescription newUserTaskInstanceJobDescription = new UserTaskInstanceJobDescription( - userTaskInstanceJobDescription.id(), - toExpirationTime(trigger), - userTaskInstanceJobDescription.priority(), - userTaskInstanceJobDescription.userTaskInstanceId(), - userTaskInstanceJobDescription.processId(), - userTaskInstanceJobDescription.processInstanceId(), - userTaskInstanceJobDescription.nodeInstanceId(), - userTaskInstanceJobDescription.rootProcessInstanceId(), - userTaskInstanceJobDescription.rootProcessId()); - return newUserTaskInstanceJobDescription; - } else { - return jobDescription; - } - } - public static ExpirationTime toExpirationTime(Trigger trigger) { if (trigger instanceof SimpleTimerTrigger simpleTimerTrigger) { ZonedDateTime zoneDateTime = DateUtil.fromDate(simpleTimerTrigger.hasNextFireTime()); @@ -72,4 +42,9 @@ public class JobDescriptionHelper { } throw new IllegalArgumentException("this type of trigger is not supported " + trigger.getClass().getName()); } + + public static JobDescription newJobDescription(JobDescription jobDescription, Trigger trigger) { + List<JobDescriptionMerger> mergers = List.of(new ProcessInstanceJobDescriptionMerger(), new ProcessJobDescriptionMerger(), new UserTaskInstanceJobDescriptorMerger()); + return mergers.stream().filter(merger -> merger.accept(jobDescription)).map(merger -> merger.mergeTrigger(jobDescription, trigger)).findFirst().orElseThrow(); + } } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java similarity index 97% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java index 46f98f375..33670646a 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.api.JobBuilder; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java new file mode 100644 index 000000000..d25c21565 --- /dev/null +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.app.jobs.integrations; + +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; +import org.kie.kogito.timer.Trigger; + +public class ProcessInstanceJobDescriptionMerger implements JobDescriptionMerger { + + @Override + public boolean accept(Object instance) { + return instance instanceof ProcessInstanceJobDescription; + } + + @Override + public JobDescription mergeTrigger(JobDescription jobDescription, Trigger trigger) { + if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) { + ProcessInstanceJobDescription newProcessInstanceJobDescription = new ProcessInstanceJobDescription( + processInstanceJobDescription.id(), + processInstanceJobDescription.timerId(), + JobDescriptionHelper.toExpirationTime(trigger), + processInstanceJobDescription.priority(), + processInstanceJobDescription.processInstanceId(), + processInstanceJobDescription.rootProcessInstanceId(), + processInstanceJobDescription.processId(), + processInstanceJobDescription.rootProcessId(), + processInstanceJobDescription.nodeInstanceId()); + return newProcessInstanceJobDescription; + } + throw new IllegalArgumentException("jobDescription type not supported by this merger " + jobDescription); + } + +} diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java similarity index 98% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java index d807d1443..ed7d77325 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.impl.JobDetailsHelper; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java similarity index 97% copy from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java copy to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java index f13a8f683..d7ca4a6f7 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.api.JobBuilder; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java similarity index 53% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java index f13a8f683..e900f15d3 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java @@ -16,29 +16,31 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; import org.kie.kogito.jobs.JobDescription; -import org.kie.kogito.jobs.api.JobBuilder; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.descriptors.ProcessJobDescription; -import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.timer.Trigger; -public class ProcessJobDescriptionJobInstanceEventAdapter extends AbstractJobDescriptionJobInstanceEventAdapter { - - public ProcessJobDescriptionJobInstanceEventAdapter(String serviceURL) { - super(serviceURL); - } +public class ProcessJobDescriptionMerger implements JobDescriptionMerger { @Override - public boolean accept(JobDetails jobDetails) { - return extractJobDescription(jobDetails) instanceof ProcessJobDescription; + public boolean accept(Object instance) { + return instance instanceof ProcessInstanceJobDescription; } @Override - protected void doAdaptPayload(JobBuilder jobBuilder, JobDescription jobDescription) { + public JobDescription mergeTrigger(JobDescription jobDescription, Trigger trigger) { if (jobDescription instanceof ProcessJobDescription processJobDescription) { - jobBuilder.processId(processJobDescription.processId()); + ProcessJobDescription newProcessJobDescription = ProcessJobDescription.of( + JobDescriptionHelper.toExpirationTime(trigger), + processJobDescription.priority(), + processJobDescription.processId()); + return newProcessJobDescription; } + throw new IllegalArgumentException("jobDescription type not supported by this merger " + jobDescription); } } diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java similarity index 98% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java index be86f848e..1a31f98ee 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.impl.JobDetailsHelper; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java similarity index 97% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java index d9242b073..5dcfef63e 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.api.JobBuilder; diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java new file mode 100644 index 000000000..2d347a7f0 --- /dev/null +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.app.jobs.integrations; + +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; +import org.kie.kogito.timer.Trigger; + +public class UserTaskInstanceJobDescriptorMerger implements JobDescriptionMerger { + + @Override + public boolean accept(Object instance) { + return instance instanceof UserTaskInstanceJobDescription; + } + + @Override + public JobDescription mergeTrigger(JobDescription jobDescription, Trigger trigger) { + if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription) { + UserTaskInstanceJobDescription newUserTaskInstanceJobDescription = new UserTaskInstanceJobDescription( + userTaskInstanceJobDescription.id(), + JobDescriptionHelper.toExpirationTime(trigger), + userTaskInstanceJobDescription.priority(), + userTaskInstanceJobDescription.userTaskInstanceId(), + userTaskInstanceJobDescription.processId(), + userTaskInstanceJobDescription.processInstanceId(), + userTaskInstanceJobDescription.nodeInstanceId(), + userTaskInstanceJobDescription.rootProcessInstanceId(), + userTaskInstanceJobDescription.rootProcessId()); + return newUserTaskInstanceJobDescription; + } + throw new IllegalArgumentException("jobDescription type not supported by this merger " + jobDescription); + } + +} diff --git a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java similarity index 97% rename from jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java rename to jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java index 1a3cda6d3..86507cf93 100644 --- a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java +++ b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.kie.kogito.app.jobs.integregations; +package org.kie.kogito.app.jobs.integrations; import org.kie.kogito.app.jobs.api.JobExecutor; import org.kie.kogito.app.jobs.impl.JobDetailsHelper; diff --git a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java new file mode 100644 index 000000000..6ae3df33e --- /dev/null +++ b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.app.jobs.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.kie.kogito.app.jobs.api.JobSchedulerListener; +import org.kie.kogito.jobs.service.model.JobDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LatchFailureJobSchedulerListener implements JobSchedulerListener { + + private static final Logger LOG = LoggerFactory.getLogger(LatchFailureJobSchedulerListener.class); + + private List<JobDetails> jobDetailsList; + private CountDownLatch latch; + + private AtomicInteger count; + + public LatchFailureJobSchedulerListener() { + this(1); + } + + public LatchFailureJobSchedulerListener(Integer executions) { + latch = new CountDownLatch(executions); + count = new AtomicInteger(0); + jobDetailsList = new ArrayList<>(); + } + + @Override + public void onSchedule(JobDetails jobDetails) { + // do nothing + } + + @Override + public void onReschedule(JobDetails jobDetails) { + // do nothing + } + + @Override + public void onCancel(JobDetails jobDetails) { + // do nothing + } + + @Override + public void onFailure(JobDetails jobDetails) { + LOG.info("executing {}", jobDetails); + latch.countDown(); + count.incrementAndGet(); + jobDetailsList.add(jobDetails); + } + + @Override + public void onExecution(JobDetails jobDetails) { + + } + + public Integer getCount() { + return count.get(); + } + + public void waitForExecution() throws InterruptedException { + latch.await(); + } + + public void waitForExecution(Long timeout) throws InterruptedException { + latch.await(timeout, TimeUnit.MILLISECONDS); + } + + public List<JobDetails> getJobDetailsList() { + return jobDetailsList; + } + + public boolean isExecuted() { + return latch.getCount() == 0; + } +} diff --git a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java similarity index 59% copy from jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java copy to jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java index b76839f31..e659815e4 100644 --- a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java +++ b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java @@ -18,25 +18,24 @@ */ package org.kie.kogito.app.jobs.impl; -import org.kie.kogito.app.jobs.integregations.AbstractJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.api.JobDescriptionMerger; +import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper; import org.kie.kogito.jobs.JobDescription; -import org.kie.kogito.jobs.api.JobBuilder; -import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.timer.Trigger; -public class TestJobDetailsEventAdapter extends AbstractJobDescriptionJobInstanceEventAdapter { - - public TestJobDetailsEventAdapter() { - super("http://localhost:8080"); - } +public class TestJobDescriptionMerger implements JobDescriptionMerger { @Override - public boolean accept(JobDetails jobDetails) { - return extractJobDescription(jobDetails) instanceof TestJobDescription; + public boolean accept(Object instance) { + return instance instanceof TestJobDescription; } @Override - protected void doAdaptPayload(JobBuilder jobBuilder, JobDescription jobDescription) { - // do nothing + public JobDescription mergeTrigger(JobDescription jobDescription, Trigger trigger) { + if (jobDescription instanceof TestJobDescription testJobDescription) { + return new TestJobDescription(jobDescription.id(), JobDescriptionHelper.toExpirationTime(trigger)); + } + return null; } } diff --git a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java index b76839f31..5b839b062 100644 --- a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java +++ b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java @@ -18,7 +18,7 @@ */ package org.kie.kogito.app.jobs.impl; -import org.kie.kogito.app.jobs.integregations.AbstractJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.AbstractJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.api.JobBuilder; import org.kie.kogito.jobs.service.model.JobDetails; diff --git a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java index 1ae15e176..fc9427dcd 100644 --- a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java +++ b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java @@ -21,6 +21,7 @@ package org.kie.kogito.app.jobs.impl; import java.time.Duration; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import org.junit.jupiter.api.Test; import org.kie.kogito.app.jobs.api.JobScheduler; @@ -30,8 +31,10 @@ import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory; import org.kie.kogito.app.jobs.spi.memory.MemoryJobStore; import org.kie.kogito.jobs.DurationExpirationTime; +import org.kie.kogito.jobs.ExactExpirationTime; import org.kie.kogito.jobs.ExpirationTime; import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.timer.impl.SimpleTimerTrigger; import static org.assertj.core.api.Assertions.assertThat; @@ -52,6 +55,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); jobScheduler.schedule(new TestJobDescription(jobId, ZonedDateTime.now().plus(Duration.ofSeconds(1)))); @@ -76,6 +80,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); ExpirationTime expirationTime = DurationExpirationTime.repeat(0, 1000L, 3); @@ -94,7 +99,7 @@ public class VertxJobSchedulerTest { final String jobId = "1"; JobStore memoryJobStore = new MemoryJobStore(); JobContextFactory jobContextFactory = new MemoryJobContextFactory(); - LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener = new LatchExecutionJobSchedulerListener(3); + LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener = new LatchExecutionJobSchedulerListener(6); TestJobExecutor latchJobExecutor = new TestJobExecutor(); JobScheduler jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder() .withJobExecutors(latchJobExecutor) @@ -103,13 +108,14 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); ExpirationTime expirationTime = DurationExpirationTime.repeat(0, 1000L, SimpleTimerTrigger.INDEFINITELY); jobScheduler.schedule(new TestJobDescription(jobId, expirationTime)); latchExecutionJobSchedulerListener.waitForExecution(); - assertThat(latchJobExecutor.getJobsExecuted()).hasSize(3); + assertThat(latchJobExecutor.getJobsExecuted()).hasSize(6); assertThat(memoryJobStore.find(jobContextFactory.newContext(), jobId)).isNotNull(); assertThat(latchExecutionJobSchedulerListener.isExecuted()).isTrue(); jobScheduler.close(); @@ -128,6 +134,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); @@ -152,6 +159,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); @@ -179,6 +187,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); @@ -206,6 +215,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); @@ -216,6 +226,65 @@ public class VertxJobSchedulerTest { jobScheduler.close(); } + @Test + public void testExactTime() throws Exception { + final String jobId = "1"; + LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener = new LatchExecutionJobSchedulerListener(); + TestJobExecutor latchJobExecutor = new TestJobExecutor(); + JobStore memoryJobStore = new MemoryJobStore(); + JobContextFactory jobContextFactory = new MemoryJobContextFactory(); + JobScheduler jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder() + .withJobExecutors(latchJobExecutor) + .withJobEventAdapters(new TestJobDetailsEventAdapter()) + .withEventPublishers(new TestEventPublisher()) + .withJobContextFactory(jobContextFactory) + .withJobStore(memoryJobStore) + .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) + .withRefreshJobsInterval(100000L) + .build(); + jobScheduler.init(); + ExpirationTime expirationTime = ExactExpirationTime.of(ZonedDateTime.now().plus(1, ChronoUnit.MILLIS)); + jobScheduler.schedule(new TestJobDescription(jobId, expirationTime)); + latchExecutionJobSchedulerListener.waitForExecution(1000L); + assertThat(latchJobExecutor.getJobsExecuted()).hasSize(1); + assertThat(memoryJobStore.find(jobContextFactory.newContext(), jobId)).isNull(); + assertThat(latchExecutionJobSchedulerListener.isExecuted()).isTrue(); + jobScheduler.close(); + + } + + @Test + public void testNumberOfRetries() throws Exception { + final int NUMBER_OF_FAILURES = 4; // first execution + number of retries + final int NUMBER_OF_RETRIES = NUMBER_OF_FAILURES - 1; + + final String jobId = "1"; + JobStore memoryJobStore = new MemoryJobStore(); + JobContextFactory jobContextFactory = new MemoryJobContextFactory(); + TestFailureJobExecutor latchJobExecutor = new TestFailureJobExecutor(NUMBER_OF_FAILURES); + LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener = new LatchFailureJobSchedulerListener(NUMBER_OF_FAILURES); + JobScheduler jobScheduler = JobSchedulerBuilder.newJobSchedulerBuilder() + .withMaxNumberOfRetries(NUMBER_OF_RETRIES) + .withJobExecutors(latchJobExecutor) + .withRetryInterval(1000L) + .withJobEventAdapters(new TestJobDetailsEventAdapter()) + .withEventPublishers(new TestEventPublisher()) + .withJobContextFactory(jobContextFactory) + .withJobStore(memoryJobStore) + .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) + .build(); + + jobScheduler.init(); + + jobScheduler.schedule(new TestJobDescription(jobId, ZonedDateTime.now().plus(Duration.ofSeconds(1)))); + latchExecutionJobSchedulerListener.waitForExecution(); + assertThat(memoryJobStore.find(jobContextFactory.newContext(), jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR); + + jobScheduler.close(); + } + @Test public void testBasicOverdueTime() throws Exception { final String jobId = "1"; @@ -235,6 +304,7 @@ public class VertxJobSchedulerTest { .withJobContextFactory(jobContextFactory) .withJobStore(memoryJobStore) .withJobSchedulerListeners(latchExecutionJobSchedulerListener) + .withJobDescriptorMergers(new TestJobDescriptionMerger()) .build(); jobScheduler.init(); latchExecutionJobSchedulerListener.waitForExecution(); diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java index 7a46452f4..1a70b4cb0 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java @@ -19,9 +19,9 @@ package org.kie.kogito.app.jobs.quarkus; import org.kie.kogito.app.jobs.api.JobExecutor; -import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobExecutor; -import org.kie.kogito.app.jobs.integregations.ProcessJobExecutor; -import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobExecutor; +import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobExecutor; +import org.kie.kogito.app.jobs.integrations.ProcessJobExecutor; +import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobExecutor; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory; diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java index f1b9f5b2a..a182ec713 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java @@ -27,9 +27,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; -import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; -import org.kie.kogito.app.jobs.integregations.ProcessJobDescriptionJobInstanceEventAdapter; -import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.quarkus.resource.RestApiConstants; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; @@ -78,8 +78,8 @@ public class QuarkusJobsService implements JobsService { @ConfigProperty(name = "kogito.jobs-service.maxNumberOfRetries", defaultValue = "3") protected Integer maxNumberOfRetries; - @ConfigProperty(name = "kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue = "60000") - protected Long maxIntervalLimitToRetryMillis; + @ConfigProperty(name = "kogito.jobs-service.retryMillis", defaultValue = "100") + protected Long retryMillis; @ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") protected Long maxRefreshJobsIntervalWindow; @@ -117,7 +117,7 @@ public class QuarkusJobsService implements JobsService { new UserTaskInstanceJobDescriptionJobInstanceEventAdapter(serviceURL + RestApiConstants.JOBS_PATH)) .withJobExecutors(jobExecutors.stream().toArray(JobExecutor[]::new)) .withMaxRefreshJobsIntervalWindow(maxRefreshJobsIntervalWindow * 60 * 1000L) - .withRetryInterval(maxIntervalLimitToRetryMillis) + .withRetryInterval(retryMillis) .withMaxNumberOfRetries(maxNumberOfRetries) .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 1000L) .withTimeoutInterceptor(txInterceptor) diff --git a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java index 774b02269..1a9bb7604 100644 --- a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java +++ b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java @@ -22,7 +22,7 @@ import org.eclipse.microprofile.openapi.annotations.Operation; import org.eclipse.microprofile.openapi.annotations.parameters.RequestBody; import org.eclipse.microprofile.openapi.annotations.tags.Tag; import org.kie.kogito.app.jobs.impl.InVMPayloadData; -import org.kie.kogito.app.jobs.impl.JobDescriptionHelper; +import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.jobs.JobDescription; diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java index 60db14d30..3f66ce946 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java @@ -19,9 +19,9 @@ package org.kie.kogito.app.jobs.springboot; import org.kie.kogito.app.jobs.api.JobExecutor; -import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobExecutor; -import org.kie.kogito.app.jobs.integregations.ProcessJobExecutor; -import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobExecutor; +import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobExecutor; +import org.kie.kogito.app.jobs.integrations.ProcessJobExecutor; +import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobExecutor; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory; diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java index c9be49922..8bdcaecda 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java @@ -28,9 +28,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder; import org.kie.kogito.app.jobs.api.JobSchedulerListener; import org.kie.kogito.app.jobs.api.JobSynchronization; import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor; -import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; -import org.kie.kogito.app.jobs.integregations.ProcessJobDescriptionJobInstanceEventAdapter; -import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter; +import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.app.jobs.springboot.resource.RestApiConstants; @@ -76,8 +76,8 @@ public class SpringbootJobsService implements JobsService { @Value("${kogito.jobs-service.maxNumberOfRetries:3}") protected Integer maxNumberOfRetries; - @Value("${kogito.jobs-service.maxIntervalLimitToRetryMillis:60000}") - protected Long maxIntervalLimitToRetryMillis; + @Value("${kogito.jobs-service.retryMillis:100}") + protected Long retryMillis; @Value("${kogito.jobs-service.schedulerChunkInMinutes:10}") protected Long maxRefreshJobsIntervalWindow; @@ -123,7 +123,7 @@ public class SpringbootJobsService implements JobsService { new UserTaskInstanceJobDescriptionJobInstanceEventAdapter(serviceURL + RestApiConstants.JOBS_PATH)) .withJobExecutors(ofNullable(jobExecutors).toArray(JobExecutor[]::new)) .withMaxRefreshJobsIntervalWindow(maxRefreshJobsIntervalWindow * 60 * 1000L) - .withRetryInterval(maxIntervalLimitToRetryMillis) + .withRetryInterval(retryMillis) .withMaxNumberOfRetries(maxNumberOfRetries) .withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 * 1000L) .withTimeoutInterceptor(txInterceptor) diff --git a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java index f5cf2eaed..03d080380 100644 --- a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java +++ b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java @@ -22,7 +22,7 @@ import org.eclipse.microprofile.openapi.annotations.Operation; import org.eclipse.microprofile.openapi.annotations.parameters.RequestBody; import org.eclipse.microprofile.openapi.annotations.tags.Tag; import org.kie.kogito.app.jobs.impl.InVMPayloadData; -import org.kie.kogito.app.jobs.impl.JobDescriptionHelper; +import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper; import org.kie.kogito.app.jobs.spi.JobContextFactory; import org.kie.kogito.app.jobs.spi.JobStore; import org.kie.kogito.jobs.JobDescription; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
