This is an automated email from the ASF dual-hosted git repository.

tiagodolphine pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new ede781454 [KOGITO-9842] The initial load job task from DB is not 
executed Job Service (#1888)
ede781454 is described below

commit ede78145494e4cf8278d3268c5db5868d065a548
Author: Tiago Dolphine <[email protected]>
AuthorDate: Mon Nov 6 08:26:53 2023 -0300

    [KOGITO-9842] The initial load job task from DB is not executed Job Service 
(#1888)
    
    * KOGITO-9842 The initial load job task from DB is not executed Job Service
---
 .../service/scheduler/JobSchedulerManager.java     | 55 ++++++++++++++--------
 .../service/scheduler/JobSchedulerManagerTest.java | 55 ++++++++++++++--------
 2 files changed, 71 insertions(+), 39 deletions(-)

diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
index 040cba283..3649eafcd 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java
@@ -21,12 +21,11 @@ package org.kie.kogito.jobs.service.scheduler;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Priority;
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.event.Observes;
 import javax.inject.Inject;
-import javax.interceptor.Interceptor;
 
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
@@ -40,7 +39,6 @@ import org.kie.kogito.jobs.service.utils.ErrorHandling;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.quarkus.runtime.StartupEvent;
 import io.vertx.mutiny.core.Vertx;
 
 @ApplicationScoped
@@ -76,26 +74,45 @@ public class JobSchedulerManager {
 
     @Inject
     Vertx vertx;
-    private AtomicBoolean enabled = new AtomicBoolean(false);
-
-    void onStartup(@Observes @Priority(Interceptor.Priority.PLATFORM_AFTER) 
StartupEvent startupEvent) {
-        if (loadJobIntervalInMinutes > schedulerChunkInMinutes) {
-            LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be greater 
than schedulerChunkInMinutes ({}), " +
-                    "setting value {} for both",
-                    loadJobIntervalInMinutes,
-                    schedulerChunkInMinutes,
-                    schedulerChunkInMinutes);
-            loadJobIntervalInMinutes = schedulerChunkInMinutes;
+    final AtomicBoolean enabled = new AtomicBoolean(false);
+
+    final AtomicLong periodicTimerIdForLoadJobs = new AtomicLong(-1l);
+
+    private void startJobsLoadingFromRepositoryTask() {
+        //guarantee it starts the task just in case it is not already active
+        if (periodicTimerIdForLoadJobs.get() < 0) {
+            if (loadJobIntervalInMinutes > schedulerChunkInMinutes) {
+                LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be 
greater than schedulerChunkInMinutes ({}), " +
+                        "setting value {} for both",
+                        loadJobIntervalInMinutes,
+                        schedulerChunkInMinutes,
+                        schedulerChunkInMinutes);
+                loadJobIntervalInMinutes = schedulerChunkInMinutes;
+            }
+            //first execution
+            vertx.runOnContext(this::loadJobDetails);
+            //next executions to run periodically
+            
periodicTimerIdForLoadJobs.set(vertx.setPeriodic(TimeUnit.MINUTES.toMillis(loadJobIntervalInMinutes),
 id -> loadJobDetails()));
         }
+    }
 
-        //first execution
-        vertx.runOnContext(this::loadJobDetails);
-        //periodic execution
-        vertx.setPeriodic(TimeUnit.MINUTES.toMillis(loadJobIntervalInMinutes), 
id -> loadJobDetails());
+    private void cancelJobsLoadingFromRepositoryTask() {
+        if (periodicTimerIdForLoadJobs.get() > 0) {
+            vertx.cancelTimer(periodicTimerIdForLoadJobs.get());
+            //negative id indicates this is not active anymore
+            periodicTimerIdForLoadJobs.set(-1);
+        }
     }
 
-    protected void onMessagingStatusChange(@Observes MessagingChangeEvent 
event) {
-        this.enabled.set(event.isEnabled());
+    protected synchronized void onMessagingStatusChange(@Observes 
MessagingChangeEvent event) {
+        boolean wasEnabled = enabled.getAndSet(event.isEnabled());
+        if (enabled.get() && !wasEnabled) {
+            // good, avoid starting twice if we receive two consecutive 
enabled = true
+            startJobsLoadingFromRepositoryTask();
+        } else if (!enabled.get()) {
+            // but only cancel if we receive enabled = false, otherwise with 
two consecutive enable we are also cancelling.
+            cancelJobsLoadingFromRepositoryTask();
+        }
     }
 
     //Runs periodically loading the jobs from the repository in chunks
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
index 32da35def..17d7b8528 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java
@@ -21,6 +21,7 @@ package org.kie.kogito.jobs.service.scheduler;
 import java.time.ZonedDateTime;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
@@ -41,14 +42,15 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import io.quarkus.runtime.StartupEvent;
 import io.vertx.mutiny.core.Vertx;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -80,6 +82,8 @@ class JobSchedulerManagerTest {
 
     @BeforeEach
     void setUp() {
+        reset(tested);
+        reset(scheduler);
         this.scheduledJob = JobDetails
                 .builder()
                 .id(JOB_ID)
@@ -95,24 +99,14 @@ class JobSchedulerManagerTest {
                 .thenReturn(Optional.empty());
         lenient().when(scheduler.schedule(scheduledJob))
                 .thenReturn(ReactiveStreams.of(scheduledJob).buildRs());
-        tested.onMessagingStatusChange(new MessagingChangeEvent(true));
-    }
-
-    @Test
-    void testOnStartup(@Mock StartupEvent event) {
-        tested.onStartup(event);
-        verify(vertx).runOnContext(captorFirstExecution.capture());
-        verify(vertx).setPeriodic(eq(tested.loadJobIntervalInMinutes), 
captorPeriodic.capture());
-    }
-
-    @Test
-    void testOnStartupInvalidInterval(@Mock StartupEvent event) {
-        tested.schedulerChunkInMinutes = 10;
-        tested.loadJobIntervalInMinutes = 20;
-
-        tested.onStartup(event);
-
-        
assertThat(tested.loadJobIntervalInMinutes).isEqualTo(tested.schedulerChunkInMinutes);
+        ArgumentCaptor<Runnable> action = 
ArgumentCaptor.forClass(Runnable.class);
+        lenient().doAnswer(a -> {
+            ((Runnable) a.getArgument(0)).run();
+            return a;
+        }).when(vertx).runOnContext(action.capture());
+        AtomicLong counter = new AtomicLong(1);
+        lenient().when(vertx.setPeriodic(anyLong(), 
any(Consumer.class))).thenReturn(counter.incrementAndGet());
+        tested.enabled.set(true);
     }
 
     @Test
@@ -128,4 +122,25 @@ class JobSchedulerManagerTest {
         tested.loadJobDetails();
         verify(scheduler, never()).schedule(scheduledJob);
     }
+
+    @Test
+    void onMessagingStatusChange() {
+        tested.enabled.set(false);
+        MessagingChangeEvent messagingChangeEvent = new 
MessagingChangeEvent(true);
+        tested.onMessagingStatusChange(messagingChangeEvent);
+        verify(tested).loadJobDetails();//called once
+        assertThat(tested.periodicTimerIdForLoadJobs.get()).isPositive();
+        assertThat(tested.enabled.get()).isTrue();
+
+        MessagingChangeEvent messagingChangeEventToFalse = new 
MessagingChangeEvent(false);
+        tested.onMessagingStatusChange(messagingChangeEventToFalse);
+        assertThat(tested.periodicTimerIdForLoadJobs.get()).isNegative();
+        assertThat(tested.enabled.get()).isFalse();
+        verify(tested).loadJobDetails();//still called once
+
+        tested.onMessagingStatusChange(messagingChangeEvent);
+        verify(tested, times(2)).loadJobDetails(); //called twice
+        assertThat(tested.periodicTimerIdForLoadJobs.get()).isPositive();
+        assertThat(tested.enabled.get()).isTrue();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to