This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ad01d7b4b [Fix #3979] Fix InMemoryJobService handling in 
StaticWorkflowApplication (#3980)
4ad01d7b4b is described below

commit 4ad01d7b4b23be6e08cd89dfd393ba4dc5c48102
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Thu Jul 10 15:29:43 2025 +0200

    [Fix #3979] Fix InMemoryJobService handling in StaticWorkflowApplication 
(#3980)
    
    * [Fix #3979] Fix InMemoryJobService handling in StaticWorkflowApplication
    
    * [Fix #3979] Allow setting up a custom JobService
---
 .../services/jobs/impl/InMemoryJobService.java     | 45 +++++++---------------
 .../services/jobs/impl/StaticJobService.java       |  9 ++---
 .../kogito/process/impl/StaticProcessConfig.java   | 10 ++++-
 .../executor/StaticWorkflowApplication.java        | 21 ++++++++--
 .../StaticFluentWorkflowApplicationTest.java       | 14 -------
 5 files changed, 45 insertions(+), 54 deletions(-)

diff --git 
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
 
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
index fb6fd55e96..d3dc0c3b23 100644
--- 
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
+++ 
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
@@ -66,24 +66,15 @@ public class InMemoryJobService implements JobsService, 
AutoCloseable {
     @Override
     public String scheduleJob(JobDescription jobDescription) {
         LOGGER.debug("ScheduleProcessJob: {}", jobDescription);
-        ScheduledFuture<?> future;
-        long delay = calculateDelay(jobDescription);
-        Long interval = jobDescription.expirationTime().repeatInterval();
-        Optional<JobExecutorFactory> jobExecutorFactoryFound = 
findJobExecutorFactory(jobDescription);
-
-        if (jobExecutorFactoryFound.isEmpty()) {
-            throw new IllegalArgumentException("Could not schedule " + 
jobDescription + ". No job executor factory provided");
-        }
-
-        JobExecutorFactory jobExecutorFactory = jobExecutorFactoryFound.get();
-
-        if (interval != null) {
-            future = 
scheduler.scheduleAtFixedRate(jobExecutorFactory.createNewRepeteableRunnable(this,
 jobDescription), delay, interval, TimeUnit.MILLISECONDS);
-        } else {
-            future = 
scheduler.schedule(jobExecutorFactory.createNewRunnable(this, jobDescription), 
delay, TimeUnit.MILLISECONDS);
-        }
-        scheduledJobs.put(jobDescription.id(), future);
-        return jobDescription.id();
+        return findJobExecutorFactory(jobDescription).map(jobExecutorFactory 
-> {
+            long delay = calculateDelay(jobDescription);
+            Long interval = jobDescription.expirationTime().repeatInterval();
+            String jobId = jobDescription.id();
+            scheduledJobs.put(jobId,
+                    interval != null ? 
scheduler.scheduleAtFixedRate(jobExecutorFactory.createNewRepeteableRunnable(this,
 jobDescription), delay, interval, TimeUnit.MILLISECONDS)
+                            : 
scheduler.schedule(jobExecutorFactory.createNewRunnable(this, jobDescription), 
delay, TimeUnit.MILLISECONDS));
+            return jobId;
+        }).orElseThrow(() -> new IllegalArgumentException("Could not schedule 
ProcessInstanceJobDescription " + jobDescription + ". No job executor factory 
provided"));
     }
 
     private Optional<JobExecutorFactory> findJobExecutorFactory(JobDescription 
jobDescription) {
@@ -97,13 +88,8 @@ public class InMemoryJobService implements JobsService, 
AutoCloseable {
 
     public boolean cancelJob(String id, boolean force) {
         LOGGER.debug("Cancel Job: {}", id);
-        if (scheduledJobs.containsKey(id)) {
-            ScheduledFuture<?> future = scheduledJobs.remove(id);
-            if (!future.isDone()) {
-                return future.cancel(force);
-            }
-        }
-        return false;
+        ScheduledFuture<?> future = scheduledJobs.remove(id);
+        return future != null && !future.isDone() && future.cancel(force);
     }
 
     @Override
@@ -117,18 +103,15 @@ public class InMemoryJobService implements JobsService, 
AutoCloseable {
 
     protected long calculateDelay(JobDescription description) {
         long delay = Duration.between(ZonedDateTime.now(), 
description.expirationTime().get()).toMillis();
-        if (delay <= 0) {
-            return 1;
-        }
-        return delay;
+        return delay <= 0 ? 1 : delay;
     }
 
     @Override
     public void close() throws Exception {
         LOGGER.info("closing in memory job service");
-        scheduledJobs.clear();
-        scheduledJobs.forEach((k, v) -> v.cancel(true));
         scheduler.shutdownNow();
+        scheduledJobs.values().forEach(v -> v.cancel(true));
+        scheduledJobs.clear();
     }
 
     public void clearJobExecutorFactories() {
diff --git 
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
 
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
index 94c8027a34..21bdac6b6e 100644
--- 
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
+++ 
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
@@ -20,12 +20,11 @@ package org.kie.kogito.services.jobs.impl;
 
 public class StaticJobService {
 
-    private static InMemoryJobService INSTANCE;
+    private static class InstanceHolder {
+        private static InMemoryJobService INSTANCE = new InMemoryJobService();
+    }
 
     public static InMemoryJobService staticJobService() {
-        if (INSTANCE == null) {
-            INSTANCE = new InMemoryJobService();
-        }
-        return INSTANCE;
+        return InstanceHolder.INSTANCE;
     }
 }
diff --git 
a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
 
b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
index 654c828649..456248960a 100644
--- 
a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
+++ 
b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
@@ -61,7 +61,15 @@ public class StaticProcessConfig implements ProcessConfig {
             WorkItemHandlerConfig workItemHandlerConfig,
             ProcessEventListenerConfig processEventListenerConfig,
             UnitOfWorkManager unitOfWorkManager) {
-        this(workItemHandlerConfig, processEventListenerConfig, 
unitOfWorkManager, staticJobService(), null, new NoOpIdentityProvider(), null);
+        this(workItemHandlerConfig, processEventListenerConfig, 
unitOfWorkManager, staticJobService());
+    }
+
+    public StaticProcessConfig(
+            WorkItemHandlerConfig workItemHandlerConfig,
+            ProcessEventListenerConfig processEventListenerConfig,
+            UnitOfWorkManager unitOfWorkManager,
+            JobsService jobsService) {
+        this(workItemHandlerConfig, processEventListenerConfig, 
unitOfWorkManager, jobsService, null, new NoOpIdentityProvider(), null);
     }
 
     public StaticProcessConfig(
diff --git 
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
 
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
index 405c19d767..2a51a2669e 100644
--- 
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
+++ 
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
@@ -54,6 +54,7 @@ import org.kie.kogito.event.impl.EventFactoryUtils;
 import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
 import org.kie.kogito.internal.process.event.KogitoProcessEventListener;
 import org.kie.kogito.internal.process.workitem.KogitoWorkItemHandler;
+import org.kie.kogito.jobs.JobsService;
 import org.kie.kogito.process.Process;
 import org.kie.kogito.process.ProcessInstance;
 import org.kie.kogito.process.ProcessInstancesFactory;
@@ -66,6 +67,10 @@ import 
org.kie.kogito.serverless.workflow.models.JsonNodeModel;
 import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;
 import org.kie.kogito.serverless.workflow.utils.ConfigResolverHolder;
 import org.kie.kogito.serverless.workflow.utils.MultiSourceConfigResolver;
+import org.kie.kogito.services.jobs.impl.InMemoryJobContext;
+import org.kie.kogito.services.jobs.impl.InMemoryJobService;
+import org.kie.kogito.services.jobs.impl.InMemoryProcessJobExecutorFactory;
+import org.kie.kogito.services.jobs.impl.StaticJobService;
 import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
 import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
 import org.kie.kogito.services.uow.UnitOfWorkExecutor;
@@ -139,6 +144,7 @@ public class StaticWorkflowApplication extends 
StaticApplication implements Auto
         private Optional<UnitOfWorkManager> manager = Optional.empty();
         private Collection<EventPublisher> publishers = new ArrayList<>();
         private ExecutorService executor;
+        private Optional<JobsService> jobsService = Optional.empty();
 
         private WorkflowApplicationBuilder() {
         }
@@ -179,6 +185,11 @@ public class StaticWorkflowApplication extends 
StaticApplication implements Auto
             return this;
         }
 
+        public WorkflowApplicationBuilder withJobsService(JobsService 
jobsService) {
+            this.jobsService = Optional.ofNullable(jobsService);
+            return this;
+        }
+
         public StaticWorkflowApplication build() {
             if (properties == null) {
                 this.properties = loadApplicationDotProperties();
@@ -186,7 +197,8 @@ public class StaticWorkflowApplication extends 
StaticApplication implements Auto
             Map<String, SynchronousQueue<JsonNodeModel>> queues = new 
ConcurrentHashMap<>();
             listeners.add(new StaticCompletionEventListener(queues));
             StaticWorkflowApplication application =
-                    new StaticWorkflowApplication(properties, queues, 
listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new 
CollectingUnitOfWorkFactory())), executor);
+                    new StaticWorkflowApplication(properties, queues, 
listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new 
CollectingUnitOfWorkFactory())), executor,
+                            jobsService.orElseGet(() -> 
StaticJobService.staticJobService()));
             application.applicationRegisters.forEach(register -> 
register.register(application));
             EventManager eventManager = application.manager.eventManager();
             eventManager.setService(serviceName);
@@ -228,9 +240,12 @@ public class StaticWorkflowApplication extends 
StaticApplication implements Auto
     }
 
     private StaticWorkflowApplication(Map<String, Object> properties, 
Map<String, SynchronousQueue<JsonNodeModel>> queues, 
Collection<KogitoProcessEventListener> listeners,
-            UnitOfWorkManager manager, ExecutorService executor) {
+            UnitOfWorkManager manager, ExecutorService executor, JobsService 
jobsService) {
         super(new StaticConfig(new Addons(Collections.emptySet()), new 
StaticProcessConfig(new CachedWorkItemHandlerConfig(),
-                new DefaultProcessEventListenerConfig(listeners), manager), 
new StaticConfigBean()));
+                new DefaultProcessEventListenerConfig(listeners), manager, 
jobsService), new StaticConfigBean()));
+        if (jobsService instanceof InMemoryJobService inMemoryJobService) {
+            inMemoryJobService.registerJobExecutorFactory(new 
InMemoryProcessJobExecutorFactory(new InMemoryJobContext(null, manager, 
processes, null)));
+        }
         if (!properties.isEmpty()) {
             
ConfigResolverHolder.setConfigResolver(MultiSourceConfigResolver.withSystemProperties(properties));
         }
diff --git 
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
 
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
index 37eb91e4b9..be9fab9961 100644
--- 
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
+++ 
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
@@ -28,10 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.junit.jupiter.api.Test;
 import org.kie.api.event.process.ProcessCompletedEvent;
 import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
-import org.kie.kogito.jobs.JobsService;
 import org.kie.kogito.process.Process;
-import org.kie.kogito.process.ProcessConfig;
-import org.kie.kogito.process.Processes;
 import org.kie.kogito.process.validation.ValidationException;
 import org.kie.kogito.serverless.workflow.actions.SysoutAction;
 import org.kie.kogito.serverless.workflow.actions.WorkflowLogLevel;
@@ -41,10 +38,6 @@ import 
org.kie.kogito.serverless.workflow.models.JsonNodeModel;
 import org.kie.kogito.serverless.workflow.parser.types.SysOutTypeHandler;
 import org.kie.kogito.serverless.workflow.utils.ExpressionHandlerUtils;
 import org.kie.kogito.serverless.workflow.utils.KogitoProcessContextResolver;
-import org.kie.kogito.services.jobs.impl.InMemoryJobContext;
-import org.kie.kogito.services.jobs.impl.InMemoryJobService;
-import org.kie.kogito.services.jobs.impl.InMemoryProcessJobExecutorFactory;
-import org.kie.kogito.uow.UnitOfWorkManager;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -165,13 +158,6 @@ public class StaticFluentWorkflowApplicationTest {
         OperationStateBuilder sleepState = operation().action(call(expr("inc", 
".count=.count+1")).sleepAfter(Duration.ofSeconds(1)));
 
         try (StaticWorkflowApplication application = 
StaticWorkflowApplication.create()) {
-            ProcessConfig processConfig = 
application.config().get(ProcessConfig.class);
-            JobsService jobService = processConfig.jobsService();
-            Processes processes = application.get(Processes.class);
-            UnitOfWorkManager unitOfWorkManager = 
processConfig.unitOfWorkManager();
-            if (jobService instanceof InMemoryJobService inMemoryJobService) {
-                inMemoryJobService.registerJobExecutorFactory(new 
InMemoryProcessJobExecutorFactory(new InMemoryJobContext(null, 
unitOfWorkManager, processes, null)));
-            }
             Workflow workflow = workflow("Polling").start(startTask)
                     .next(sleepState)
                     .next(pollTask)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to