This is an automated email from the ASF dual-hosted git repository.
tiagodolphine pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new ede781454 [KOGITO-9842] The initial load job task from DB is not
executed Job Service (#1888)
ede781454 is described below
commit ede78145494e4cf8278d3268c5db5868d065a548
Author: Tiago Dolphine <[email protected]>
AuthorDate: Mon Nov 6 08:26:53 2023 -0300
[KOGITO-9842] The initial load job task from DB is not executed Job Service
(#1888)
* KOGITO-9842 The initial load job task from DB is not executed Job Service
---
.../service/scheduler/JobSchedulerManager.java | 55 ++++++++++++++--------
.../service/scheduler/JobSchedulerManagerTest.java | 55 ++++++++++++++--------
2 files changed, 71 insertions(+), 39 deletions(-)
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
index 040cba283..3649eafcd 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
@@ -21,12 +21,11 @@ package org.kie.kogito.jobs.service.scheduler;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
-import javax.interceptor.Interceptor;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
@@ -40,7 +39,6 @@ import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
@ApplicationScoped
@@ -76,26 +74,45 @@ public class JobSchedulerManager {
@Inject
Vertx vertx;
- private AtomicBoolean enabled = new AtomicBoolean(false);
-
- void onStartup(@Observes @Priority(Interceptor.Priority.PLATFORM_AFTER)
StartupEvent startupEvent) {
- if (loadJobIntervalInMinutes > schedulerChunkInMinutes) {
- LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be greater
than schedulerChunkInMinutes ({}), " +
- "setting value {} for both",
- loadJobIntervalInMinutes,
- schedulerChunkInMinutes,
- schedulerChunkInMinutes);
- loadJobIntervalInMinutes = schedulerChunkInMinutes;
+ final AtomicBoolean enabled = new AtomicBoolean(false);
+
+ final AtomicLong periodicTimerIdForLoadJobs = new AtomicLong(-1l);
+
+ private void startJobsLoadingFromRepositoryTask() {
+ //guarantee it starts the task just in case it is not already active
+ if (periodicTimerIdForLoadJobs.get() < 0) {
+ if (loadJobIntervalInMinutes > schedulerChunkInMinutes) {
+ LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be
greater than schedulerChunkInMinutes ({}), " +
+ "setting value {} for both",
+ loadJobIntervalInMinutes,
+ schedulerChunkInMinutes,
+ schedulerChunkInMinutes);
+ loadJobIntervalInMinutes = schedulerChunkInMinutes;
+ }
+ //first execution
+ vertx.runOnContext(this::loadJobDetails);
+ //next executions to run periodically
+
periodicTimerIdForLoadJobs.set(vertx.setPeriodic(TimeUnit.MINUTES.toMillis(loadJobIntervalInMinutes),
id -> loadJobDetails()));
}
+ }
- //first execution
- vertx.runOnContext(this::loadJobDetails);
- //periodic execution
- vertx.setPeriodic(TimeUnit.MINUTES.toMillis(loadJobIntervalInMinutes),
id -> loadJobDetails());
+ private void cancelJobsLoadingFromRepositoryTask() {
+ if (periodicTimerIdForLoadJobs.get() > 0) {
+ vertx.cancelTimer(periodicTimerIdForLoadJobs.get());
+ //negative id indicates this is not active anymore
+ periodicTimerIdForLoadJobs.set(-1);
+ }
}
- protected void onMessagingStatusChange(@Observes MessagingChangeEvent
event) {
- this.enabled.set(event.isEnabled());
+ protected synchronized void onMessagingStatusChange(@Observes
MessagingChangeEvent event) {
+ boolean wasEnabled = enabled.getAndSet(event.isEnabled());
+ if (enabled.get() && !wasEnabled) {
+ // good, avoid starting twice if we receive two consecutive
enabled = true
+ startJobsLoadingFromRepositoryTask();
+ } else if (!enabled.get()) {
+ // but only cancel if we receive enabled = false, otherwise with
two consecutive enable we are also cancelling.
+ cancelJobsLoadingFromRepositoryTask();
+ }
}
//Runs periodically loading the jobs from the repository in chunks
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
index 32da35def..17d7b8528 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
+++
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
@@ -21,6 +21,7 @@ package org.kie.kogito.jobs.service.scheduler;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
@@ -41,14 +42,15 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
-import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -80,6 +82,8 @@ class JobSchedulerManagerTest {
@BeforeEach
void setUp() {
+ reset(tested);
+ reset(scheduler);
this.scheduledJob = JobDetails
.builder()
.id(JOB_ID)
@@ -95,24 +99,14 @@ class JobSchedulerManagerTest {
.thenReturn(Optional.empty());
lenient().when(scheduler.schedule(scheduledJob))
.thenReturn(ReactiveStreams.of(scheduledJob).buildRs());
- tested.onMessagingStatusChange(new MessagingChangeEvent(true));
- }
-
- @Test
- void testOnStartup(@Mock StartupEvent event) {
- tested.onStartup(event);
- verify(vertx).runOnContext(captorFirstExecution.capture());
- verify(vertx).setPeriodic(eq(tested.loadJobIntervalInMinutes),
captorPeriodic.capture());
- }
-
- @Test
- void testOnStartupInvalidInterval(@Mock StartupEvent event) {
- tested.schedulerChunkInMinutes = 10;
- tested.loadJobIntervalInMinutes = 20;
-
- tested.onStartup(event);
-
-
assertThat(tested.loadJobIntervalInMinutes).isEqualTo(tested.schedulerChunkInMinutes);
+ ArgumentCaptor<Runnable> action =
ArgumentCaptor.forClass(Runnable.class);
+ lenient().doAnswer(a -> {
+ ((Runnable) a.getArgument(0)).run();
+ return a;
+ }).when(vertx).runOnContext(action.capture());
+ AtomicLong counter = new AtomicLong(1);
+ lenient().when(vertx.setPeriodic(anyLong(),
any(Consumer.class))).thenReturn(counter.incrementAndGet());
+ tested.enabled.set(true);
}
@Test
@@ -128,4 +122,25 @@ class JobSchedulerManagerTest {
tested.loadJobDetails();
verify(scheduler, never()).schedule(scheduledJob);
}
+
+ @Test
+ void onMessagingStatusChange() {
+ tested.enabled.set(false);
+ MessagingChangeEvent messagingChangeEvent = new
MessagingChangeEvent(true);
+ tested.onMessagingStatusChange(messagingChangeEvent);
+ verify(tested).loadJobDetails();//called once
+ assertThat(tested.periodicTimerIdForLoadJobs.get()).isPositive();
+ assertThat(tested.enabled.get()).isTrue();
+
+ MessagingChangeEvent messagingChangeEventToFalse = new
MessagingChangeEvent(false);
+ tested.onMessagingStatusChange(messagingChangeEventToFalse);
+ assertThat(tested.periodicTimerIdForLoadJobs.get()).isNegative();
+ assertThat(tested.enabled.get()).isFalse();
+ verify(tested).loadJobDetails();//still called once
+
+ tested.onMessagingStatusChange(messagingChangeEvent);
+ verify(tested, times(2)).loadJobDetails(); //called twice
+ assertThat(tested.periodicTimerIdForLoadJobs.get()).isPositive();
+ assertThat(tested.enabled.get()).isTrue();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]