This is an automated email from the ASF dual-hosted git repository.
egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 986c3b816 [incubator-kie-issues#2108] Workflow Engine: Make
JobScheduler publish necessary DataEvents when job execution fails (#2269)
986c3b816 is described below
commit 986c3b8160fa071862fbb673efcae49a29efc063
Author: Pere Fernández <[email protected]>
AuthorDate: Tue Sep 23 09:17:52 2025 +0200
[incubator-kie-issues#2108] Workflow Engine: Make JobScheduler publish
necessary DataEvents when job execution fails (#2269)
---
.../kogito/app/jobs/impl/VertxJobScheduler.java | 1 +
.../kogito/app/jobs/impl/TestEventPublisher.java | 14 ++++-
.../app/jobs/impl/VertxJobSchedulerTest.java | 70 +++++++++++++++++++++-
3 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index 3f9974a74..b80ebdc8d 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -405,6 +405,7 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
} catch (Exception exception) {
LOG.trace("Timeout {} with jobId {} will be retried if
possible", timerId, jobId, exception);
JobDetails nextJobDetails = computeRetryIfAny(jobDetails);
+ fireEvents(nextJobDetails);
removeIfFinal(timerId, jobContext, nextJobDetails);
jobSchedulerListeners.forEach(l ->
l.onFailure(jobDetails));
return new JobTimeoutExecution(nextJobDetails, exception);
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
index 28d07e599..10ea22ea2 100644
---
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestEventPublisher.java
@@ -19,6 +19,7 @@
package org.kie.kogito.app.jobs.impl;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
@@ -30,10 +31,17 @@ public class TestEventPublisher implements EventPublisher {
private static final Logger LOG =
LoggerFactory.getLogger(TestEventPublisher.class);
+ private final AtomicInteger publishedEventsCount;
+
+ public TestEventPublisher() {
+ this.publishedEventsCount = new AtomicInteger(0);
+ }
+
@Override
public void publish(DataEvent<?> event) {
JobInstanceDataEvent jobInstanceDataEvent = (JobInstanceDataEvent)
event;
- LOG.info("job event {}", new String(jobInstanceDataEvent.getData()));
+ this.publishedEventsCount.incrementAndGet();
+ LOG.info("job event {}, publishedEventsCount {}", new
String(jobInstanceDataEvent.getData()), publishedEventsCount.get());
}
@Override
@@ -41,4 +49,8 @@ public class TestEventPublisher implements EventPublisher {
events.forEach(this::publish);
}
+ public int getPublishedEventsCount() {
+ return publishedEventsCount.get();
+ }
+
}
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
index fc9427dcd..057dca7f1 100644
---
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
@@ -207,11 +207,12 @@ public class VertxJobSchedulerTest {
JobContextFactory jobContextFactory = new MemoryJobContextFactory();
TestFailureJobExecutor latchJobExecutor = new
TestFailureJobExecutor(2);
LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener
= new LatchExecutionJobSchedulerListener();
+ TestEventPublisher eventPublisher = new TestEventPublisher();
JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
.withJobExecutors(latchJobExecutor)
.withRetryInterval(1000L)
.withJobEventAdapters(new TestJobDetailsEventAdapter())
- .withEventPublishers(new TestEventPublisher())
+ .withEventPublishers(eventPublisher)
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
@@ -285,6 +286,73 @@ public class VertxJobSchedulerTest {
jobScheduler.close();
}
+ @Test
+ public void testEventPublishedOnErrorWithNoRetries() throws Exception {
+
+ int EXPECTED_EVENTS = 3; // SCHEDULED, RUNNING, ERROR
+
+ final String jobId = "1";
+ JobStore memoryJobStore = new MemoryJobStore();
+ JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+ TestFailureJobExecutor latchJobExecutor = new
TestFailureJobExecutor(1);
+ TestEventPublisher eventPublisher = new TestEventPublisher();
+ LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener =
new LatchFailureJobSchedulerListener(1);
+ JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
+ .withMaxNumberOfRetries(0)
+ .withJobExecutors(latchJobExecutor)
+ .withRetryInterval(1000L)
+ .withJobEventAdapters(new TestJobDetailsEventAdapter())
+ .withEventPublishers(eventPublisher)
+ .withJobContextFactory(jobContextFactory)
+ .withJobStore(memoryJobStore)
+ .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
+ .build();
+
+ jobScheduler.init();
+
+ jobScheduler.schedule(new TestJobDescription(jobId,
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
+ latchExecutionJobSchedulerListener.waitForExecution();
+
assertThat(eventPublisher.getPublishedEventsCount()).isEqualTo(EXPECTED_EVENTS);
+ assertThat(memoryJobStore.find(jobContextFactory.newContext(),
jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR);
+
+ jobScheduler.close();
+ }
+
+ @Test
+ public void testEventPublishedOnErrorWithRetry() throws Exception {
+ final int NUMBER_OF_FAILURES = 2; // first execution + number of
retries
+ final int NUMBER_OF_RETRIES = NUMBER_OF_FAILURES - 1;
+ int EXPECTED_EVENTS = 6; // SCHEDULED, RUNNING, 2xRETRY, RUNNING, ERROR
+
+ final String jobId = "1";
+ JobStore memoryJobStore = new MemoryJobStore();
+ JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+ TestFailureJobExecutor latchJobExecutor = new
TestFailureJobExecutor(NUMBER_OF_FAILURES);
+ TestEventPublisher eventPublisher = new TestEventPublisher();
+ LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener =
new LatchFailureJobSchedulerListener(NUMBER_OF_FAILURES);
+ JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
+ .withMaxNumberOfRetries(NUMBER_OF_RETRIES)
+ .withJobExecutors(latchJobExecutor)
+ .withRetryInterval(1000L)
+ .withJobEventAdapters(new TestJobDetailsEventAdapter())
+ .withEventPublishers(eventPublisher)
+ .withJobContextFactory(jobContextFactory)
+ .withJobStore(memoryJobStore)
+ .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
+ .build();
+
+ jobScheduler.init();
+
+ jobScheduler.schedule(new TestJobDescription(jobId,
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
+ latchExecutionJobSchedulerListener.waitForExecution();
+
assertThat(eventPublisher.getPublishedEventsCount()).isEqualTo(EXPECTED_EVENTS);
+ assertThat(memoryJobStore.find(jobContextFactory.newContext(),
jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR);
+
+ jobScheduler.close();
+ }
+
@Test
public void testBasicOverdueTime() throws Exception {
final String jobId = "1";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]