This is an automated email from the ASF dual-hosted git repository.

mweiler pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 30d29e928 [incubator-kie-issues#2171] Ensure jobs added or updated are 
added to the jobsScheduled map (#2281)
30d29e928 is described below

commit 30d29e928cc0a10f2168df4f6a204188742c5269
Author: Martin Weiler <[email protected]>
AuthorDate: Fri Nov 21 07:40:52 2025 -0700

    [incubator-kie-issues#2171] Ensure jobs added or updated are added to the 
jobsScheduled map (#2281)
    
    * [incubator-kie-issues#2171] Ensure jobs added or updated are added to the 
jobsScheduled map
    
    * Fix intermittent test failures that dependend on test execution order
---
 .../kogito/app/jobs/impl/VertxJobScheduler.java    | 25 ++++------------------
 .../jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java   | 13 ++++++++---
 2 files changed, 14 insertions(+), 24 deletions(-)

diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index b80ebdc8d..057b9d8d8 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -354,10 +354,9 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
         JobDetails rescheduledJobDetails = 
JobDetailsHelper.newScheduledJobDetails(jobDescription);
         LOG.trace("doSchedule {}", rescheduledJobDetails);
         fireEvents(rescheduledJobDetails);
-
         jobSchedulerListeners.forEach(l -> 
l.onReschedule(rescheduledJobDetails));
         jobStore.update(jobContextFactory.newContext(), rescheduledJobDetails);
-        updateTxTimer(rescheduledJobDetails);
+        addOrUpdateTxTimer(rescheduledJobDetails);
 
         return rescheduledJobDetails.getId();
     }
@@ -432,7 +431,7 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
             case RETRY:
                 LOG.trace("Timeout {} with jobId {} will be updated and 
scheduled", timerId, jobId);
                 jobStore.update(jobContext, nextJobDetails);
-                doNextSchedule(nextJobDetails);
+                doSchedule(nextJobDetails);
                 break;
             case ERROR:
                 LOG.trace("Timeout {} with jobId {} will be set to error", 
timerId, jobId);
@@ -446,23 +445,14 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
     }
 
     // add tx timer and remove tx timer
-    private void updateTxTimer(JobDetails jobDetails) {
+    private void addOrUpdateTxTimer(JobDetails jobDetails) {
         this.jobSynchronization.synchronize(new Runnable() {
             @Override
             public void run() {
-                // if the timer info does not exist we should not reschedule 
as it was executed or cancelled by 
                 jobsScheduled.computeIfPresent(jobDetails.getId(), (jobId, 
timerInfo) -> {
                     removeTimerInfo(timerInfo);
                     return addTimerInfo(jobDetails);
                 });
-            }
-        });
-    }
-
-    private void addTxTimer(JobDetails jobDetails) {
-        this.jobSynchronization.synchronize(new Runnable() {
-            @Override
-            public void run() {
                 jobsScheduled.computeIfAbsent(jobDetails.getId(), jobId -> {
                     return addTimerInfo(jobDetails);
                 });
@@ -506,15 +496,8 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
     }
 
     // lifecycle calls
-    private JobDetails doNextSchedule(JobDetails jobDetails) {
-        updateTxTimer(jobDetails);
-        LOG.trace("doNextSchedule {}", jobDetails);
-        fireEvents(jobDetails);
-        return jobDetails;
-    }
-
     private JobDetails doSchedule(JobDetails jobDetails) {
-        addTxTimer(jobDetails);
+        addOrUpdateTxTimer(jobDetails);
         LOG.trace("doSchedule {}", jobDetails);
         fireEvents(jobDetails);
         return jobDetails;
diff --git 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
index 755636bf4..4a7fdf404 100644
--- 
a/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
+++ 
b/jobs/kogito-addons-quarkus-embedded-jobs/src/test/java/org/kie/kogito/app/jobs/jpa/quarkus/QuarkusJPAJobStoreTest.java
@@ -24,8 +24,10 @@ import java.time.ZoneId;
 import java.util.concurrent.TimeUnit;
 
 import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.kie.kogito.app.jobs.quarkus.QuarkusJobsService;
 import org.kie.kogito.jobs.ExactExpirationTime;
 import org.kie.kogito.jobs.JobsService;
 import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
@@ -53,13 +55,19 @@ public class QuarkusJPAJobStoreTest {
 
     @BeforeEach
     public void init() {
+        ((QuarkusJobsService) jobsService).init();
         testJobExecutor.reset();
         exceptionHandler.reset();
     }
 
+    @AfterEach
+    public void cleanup() {
+        ((QuarkusJobsService) jobsService).destroy();
+    }
+
     @Test
     public void testBasicPersistence() throws Exception {
-        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("1", "-1",
+        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("a", "-1",
                 
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
 5,
                 "processInstanceId", null, "processId", null, 
"nodeInstanceId");
 
@@ -73,7 +81,7 @@ public class QuarkusJPAJobStoreTest {
     @Test
     public void testBasicError() throws Exception {
         testJobExecutor.setNumberOfFailures(4);
-        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("1", "-1",
+        ProcessInstanceJobDescription jobDescription = new 
ProcessInstanceJobDescription("b", "-1",
                 
ExactExpirationTime.of(Instant.now().plus(Duration.ofSeconds(2)).atZone(ZoneId.of("UTC"))),
 5,
                 "processInstanceId", null, "processId", null, 
"nodeInstanceId");
 
@@ -82,5 +90,4 @@ public class QuarkusJPAJobStoreTest {
 
         Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> 
assertThat(exceptionHandler.isError()).isTrue());
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to