This is an automated email from the ASF dual-hosted git repository.

egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 986c3b816 [incubator-kie-issues#2108] Workflow Engine: Make 
JobScheduler publish necessary DataEvents when job execution fails (#2269)
986c3b816 is described below

commit 986c3b8160fa071862fbb673efcae49a29efc063
Author: Pere Fernández <[email protected]>
AuthorDate: Tue Sep 23 09:17:52 2025 +0200

    [incubator-kie-issues#2108] Workflow Engine: Make JobScheduler publish 
necessary DataEvents when job execution fails (#2269)
---
 .../kogito/app/jobs/impl/VertxJobScheduler.java    |  1 +
 .../kogito/app/jobs/impl/TestEventPublisher.java   | 14 ++++-
 .../app/jobs/impl/VertxJobSchedulerTest.java       | 70 +++++++++++++++++++++-
 3 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index 3f9974a74..b80ebdc8d 100644
--- 
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++ 
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -405,6 +405,7 @@ public class VertxJobScheduler implements JobScheduler, 
Handler<Long> {
                 } catch (Exception exception) {
                     LOG.trace("Timeout {} with jobId {} will be retried if 
possible", timerId, jobId, exception);
                     JobDetails nextJobDetails = computeRetryIfAny(jobDetails);
+                    fireEvents(nextJobDetails);
                     removeIfFinal(timerId, jobContext, nextJobDetails);
                     jobSchedulerListeners.forEach(l -> 
l.onFailure(jobDetails));
                     return new JobTimeoutExecution(nextJobDetails, exception);
diff --git 
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
 
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
index 28d07e599..10ea22ea2 100644
--- 
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
+++ 
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
@@ -19,6 +19,7 @@
 package org.kie.kogito.app.jobs.impl;
 
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.EventPublisher;
@@ -30,10 +31,17 @@ public class TestEventPublisher implements EventPublisher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestEventPublisher.class);
 
+    private final AtomicInteger publishedEventsCount;
+
+    public TestEventPublisher() {
+        this.publishedEventsCount = new AtomicInteger(0);
+    }
+
     @Override
     public void publish(DataEvent<?> event) {
         JobInstanceDataEvent jobInstanceDataEvent = (JobInstanceDataEvent) 
event;
-        LOG.info("job event {}", new String(jobInstanceDataEvent.getData()));
+        this.publishedEventsCount.incrementAndGet();
+        LOG.info("job event {}, publishedEventsCount {}", new 
String(jobInstanceDataEvent.getData()), publishedEventsCount.get());
     }
 
     @Override
@@ -41,4 +49,8 @@ public class TestEventPublisher implements EventPublisher {
         events.forEach(this::publish);
     }
 
+    public int getPublishedEventsCount() {
+        return publishedEventsCount.get();
+    }
+
 }
diff --git 
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
 
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
index fc9427dcd..057dca7f1 100644
--- 
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
+++ 
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
@@ -207,11 +207,12 @@ public class VertxJobSchedulerTest {
         JobContextFactory jobContextFactory = new MemoryJobContextFactory();
         TestFailureJobExecutor latchJobExecutor = new 
TestFailureJobExecutor(2);
         LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener 
= new LatchExecutionJobSchedulerListener();
+        TestEventPublisher eventPublisher = new TestEventPublisher();
         JobScheduler jobScheduler = 
JobSchedulerBuilder.newJobSchedulerBuilder()
                 .withJobExecutors(latchJobExecutor)
                 .withRetryInterval(1000L)
                 .withJobEventAdapters(new TestJobDetailsEventAdapter())
-                .withEventPublishers(new TestEventPublisher())
+                .withEventPublishers(eventPublisher)
                 .withJobContextFactory(jobContextFactory)
                 .withJobStore(memoryJobStore)
                 .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
@@ -285,6 +286,73 @@ public class VertxJobSchedulerTest {
         jobScheduler.close();
     }
 
+    @Test
+    public void testEventPublishedOnErrorWithNoRetries() throws Exception {
+
+        int EXPECTED_EVENTS = 3; // SCHEDULED, RUNNING, ERROR
+
+        final String jobId = "1";
+        JobStore memoryJobStore = new MemoryJobStore();
+        JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+        TestFailureJobExecutor latchJobExecutor = new 
TestFailureJobExecutor(1);
+        TestEventPublisher eventPublisher = new TestEventPublisher();
+        LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener = 
new LatchFailureJobSchedulerListener(1);
+        JobScheduler jobScheduler = 
JobSchedulerBuilder.newJobSchedulerBuilder()
+                .withMaxNumberOfRetries(0)
+                .withJobExecutors(latchJobExecutor)
+                .withRetryInterval(1000L)
+                .withJobEventAdapters(new TestJobDetailsEventAdapter())
+                .withEventPublishers(eventPublisher)
+                .withJobContextFactory(jobContextFactory)
+                .withJobStore(memoryJobStore)
+                .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+                .withJobDescriptorMergers(new TestJobDescriptionMerger())
+                .build();
+
+        jobScheduler.init();
+
+        jobScheduler.schedule(new TestJobDescription(jobId, 
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
+        latchExecutionJobSchedulerListener.waitForExecution();
+        
assertThat(eventPublisher.getPublishedEventsCount()).isEqualTo(EXPECTED_EVENTS);
+        assertThat(memoryJobStore.find(jobContextFactory.newContext(), 
jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR);
+
+        jobScheduler.close();
+    }
+
+    @Test
+    public void testEventPublishedOnErrorWithRetry() throws Exception {
+        final int NUMBER_OF_FAILURES = 2; // first execution + number of 
retries
+        final int NUMBER_OF_RETRIES = NUMBER_OF_FAILURES - 1;
+        int EXPECTED_EVENTS = 6; // SCHEDULED, RUNNING, 2xRETRY, RUNNING, ERROR
+
+        final String jobId = "1";
+        JobStore memoryJobStore = new MemoryJobStore();
+        JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+        TestFailureJobExecutor latchJobExecutor = new 
TestFailureJobExecutor(NUMBER_OF_FAILURES);
+        TestEventPublisher eventPublisher = new TestEventPublisher();
+        LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener = 
new LatchFailureJobSchedulerListener(NUMBER_OF_FAILURES);
+        JobScheduler jobScheduler = 
JobSchedulerBuilder.newJobSchedulerBuilder()
+                .withMaxNumberOfRetries(NUMBER_OF_RETRIES)
+                .withJobExecutors(latchJobExecutor)
+                .withRetryInterval(1000L)
+                .withJobEventAdapters(new TestJobDetailsEventAdapter())
+                .withEventPublishers(eventPublisher)
+                .withJobContextFactory(jobContextFactory)
+                .withJobStore(memoryJobStore)
+                .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+                .withJobDescriptorMergers(new TestJobDescriptionMerger())
+                .build();
+
+        jobScheduler.init();
+
+        jobScheduler.schedule(new TestJobDescription(jobId, 
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
+        latchExecutionJobSchedulerListener.waitForExecution();
+        
assertThat(eventPublisher.getPublishedEventsCount()).isEqualTo(EXPECTED_EVENTS);
+        assertThat(memoryJobStore.find(jobContextFactory.newContext(), 
jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR);
+
+        jobScheduler.close();
+    }
+
     @Test
     public void testBasicOverdueTime() throws Exception {
         final String jobId = "1";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to