This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 4ad01d7b4b [Fix #3979] Fix InMemoryJobService handling in
StaticWorkflowApplication (#3980)
4ad01d7b4b is described below
commit 4ad01d7b4b23be6e08cd89dfd393ba4dc5c48102
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Thu Jul 10 15:29:43 2025 +0200
[Fix #3979] Fix InMemoryJobService handling in StaticWorkflowApplication
(#3980)
* [Fix #3979] Fix InMemoryJobService handling in StaticWorkflowApplication
* [Fix #3979] Allow setting up a custom JobService
---
.../services/jobs/impl/InMemoryJobService.java | 45 +++++++---------------
.../services/jobs/impl/StaticJobService.java | 9 ++---
.../kogito/process/impl/StaticProcessConfig.java | 10 ++++-
.../executor/StaticWorkflowApplication.java | 21 ++++++++--
.../StaticFluentWorkflowApplicationTest.java | 14 -------
5 files changed, 45 insertions(+), 54 deletions(-)
diff --git
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
index fb6fd55e96..d3dc0c3b23 100644
---
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
+++
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/InMemoryJobService.java
@@ -66,24 +66,15 @@ public class InMemoryJobService implements JobsService,
AutoCloseable {
@Override
public String scheduleJob(JobDescription jobDescription) {
LOGGER.debug("ScheduleProcessJob: {}", jobDescription);
- ScheduledFuture<?> future;
- long delay = calculateDelay(jobDescription);
- Long interval = jobDescription.expirationTime().repeatInterval();
- Optional<JobExecutorFactory> jobExecutorFactoryFound =
findJobExecutorFactory(jobDescription);
-
- if (jobExecutorFactoryFound.isEmpty()) {
- throw new IllegalArgumentException("Could not schedule " +
jobDescription + ". No job executor factory provided");
- }
-
- JobExecutorFactory jobExecutorFactory = jobExecutorFactoryFound.get();
-
- if (interval != null) {
- future =
scheduler.scheduleAtFixedRate(jobExecutorFactory.createNewRepeteableRunnable(this,
jobDescription), delay, interval, TimeUnit.MILLISECONDS);
- } else {
- future =
scheduler.schedule(jobExecutorFactory.createNewRunnable(this, jobDescription),
delay, TimeUnit.MILLISECONDS);
- }
- scheduledJobs.put(jobDescription.id(), future);
- return jobDescription.id();
+ return findJobExecutorFactory(jobDescription).map(jobExecutorFactory
-> {
+ long delay = calculateDelay(jobDescription);
+ Long interval = jobDescription.expirationTime().repeatInterval();
+ String jobId = jobDescription.id();
+ scheduledJobs.put(jobId,
+ interval != null ?
scheduler.scheduleAtFixedRate(jobExecutorFactory.createNewRepeteableRunnable(this,
jobDescription), delay, interval, TimeUnit.MILLISECONDS)
+ :
scheduler.schedule(jobExecutorFactory.createNewRunnable(this, jobDescription),
delay, TimeUnit.MILLISECONDS));
+ return jobId;
+ }).orElseThrow(() -> new IllegalArgumentException("Could not schedule
ProcessInstanceJobDescription " + jobDescription + ". No job executor factory
provided"));
}
private Optional<JobExecutorFactory> findJobExecutorFactory(JobDescription
jobDescription) {
@@ -97,13 +88,8 @@ public class InMemoryJobService implements JobsService,
AutoCloseable {
public boolean cancelJob(String id, boolean force) {
LOGGER.debug("Cancel Job: {}", id);
- if (scheduledJobs.containsKey(id)) {
- ScheduledFuture<?> future = scheduledJobs.remove(id);
- if (!future.isDone()) {
- return future.cancel(force);
- }
- }
- return false;
+ ScheduledFuture<?> future = scheduledJobs.remove(id);
+ return future != null && !future.isDone() && future.cancel(force);
}
@Override
@@ -117,18 +103,15 @@ public class InMemoryJobService implements JobsService,
AutoCloseable {
protected long calculateDelay(JobDescription description) {
long delay = Duration.between(ZonedDateTime.now(),
description.expirationTime().get()).toMillis();
- if (delay <= 0) {
- return 1;
- }
- return delay;
+ return delay <= 0 ? 1 : delay;
}
@Override
public void close() throws Exception {
LOGGER.info("closing in memory job service");
- scheduledJobs.clear();
- scheduledJobs.forEach((k, v) -> v.cancel(true));
scheduler.shutdownNow();
+ scheduledJobs.values().forEach(v -> v.cancel(true));
+ scheduledJobs.clear();
}
public void clearJobExecutorFactories() {
diff --git
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
index 94c8027a34..21bdac6b6e 100644
---
a/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
+++
b/api/kogito-services/src/main/java/org/kie/kogito/services/jobs/impl/StaticJobService.java
@@ -20,12 +20,11 @@ package org.kie.kogito.services.jobs.impl;
public class StaticJobService {
- private static InMemoryJobService INSTANCE;
+ private static class InstanceHolder {
+ private static InMemoryJobService INSTANCE = new InMemoryJobService();
+ }
public static InMemoryJobService staticJobService() {
- if (INSTANCE == null) {
- INSTANCE = new InMemoryJobService();
- }
- return INSTANCE;
+ return InstanceHolder.INSTANCE;
}
}
diff --git
a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
index 654c828649..456248960a 100644
---
a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
+++
b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/StaticProcessConfig.java
@@ -61,7 +61,15 @@ public class StaticProcessConfig implements ProcessConfig {
WorkItemHandlerConfig workItemHandlerConfig,
ProcessEventListenerConfig processEventListenerConfig,
UnitOfWorkManager unitOfWorkManager) {
- this(workItemHandlerConfig, processEventListenerConfig,
unitOfWorkManager, staticJobService(), null, new NoOpIdentityProvider(), null);
+ this(workItemHandlerConfig, processEventListenerConfig,
unitOfWorkManager, staticJobService());
+ }
+
+ public StaticProcessConfig(
+ WorkItemHandlerConfig workItemHandlerConfig,
+ ProcessEventListenerConfig processEventListenerConfig,
+ UnitOfWorkManager unitOfWorkManager,
+ JobsService jobsService) {
+ this(workItemHandlerConfig, processEventListenerConfig,
unitOfWorkManager, jobsService, null, new NoOpIdentityProvider(), null);
}
public StaticProcessConfig(
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
index 405c19d767..2a51a2669e 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java
@@ -54,6 +54,7 @@ import org.kie.kogito.event.impl.EventFactoryUtils;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.internal.process.event.KogitoProcessEventListener;
import org.kie.kogito.internal.process.workitem.KogitoWorkItemHandler;
+import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
@@ -66,6 +67,10 @@ import
org.kie.kogito.serverless.workflow.models.JsonNodeModel;
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;
import org.kie.kogito.serverless.workflow.utils.ConfigResolverHolder;
import org.kie.kogito.serverless.workflow.utils.MultiSourceConfigResolver;
+import org.kie.kogito.services.jobs.impl.InMemoryJobContext;
+import org.kie.kogito.services.jobs.impl.InMemoryJobService;
+import org.kie.kogito.services.jobs.impl.InMemoryProcessJobExecutorFactory;
+import org.kie.kogito.services.jobs.impl.StaticJobService;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;
@@ -139,6 +144,7 @@ public class StaticWorkflowApplication extends
StaticApplication implements Auto
private Optional<UnitOfWorkManager> manager = Optional.empty();
private Collection<EventPublisher> publishers = new ArrayList<>();
private ExecutorService executor;
+ private Optional<JobsService> jobsService = Optional.empty();
private WorkflowApplicationBuilder() {
}
@@ -179,6 +185,11 @@ public class StaticWorkflowApplication extends
StaticApplication implements Auto
return this;
}
+ public WorkflowApplicationBuilder withJobsService(JobsService
jobsService) {
+ this.jobsService = Optional.ofNullable(jobsService);
+ return this;
+ }
+
public StaticWorkflowApplication build() {
if (properties == null) {
this.properties = loadApplicationDotProperties();
@@ -186,7 +197,8 @@ public class StaticWorkflowApplication extends
StaticApplication implements Auto
Map<String, SynchronousQueue<JsonNodeModel>> queues = new
ConcurrentHashMap<>();
listeners.add(new StaticCompletionEventListener(queues));
StaticWorkflowApplication application =
- new StaticWorkflowApplication(properties, queues,
listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new
CollectingUnitOfWorkFactory())), executor);
+ new StaticWorkflowApplication(properties, queues,
listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new
CollectingUnitOfWorkFactory())), executor,
+ jobsService.orElseGet(() ->
StaticJobService.staticJobService()));
application.applicationRegisters.forEach(register ->
register.register(application));
EventManager eventManager = application.manager.eventManager();
eventManager.setService(serviceName);
@@ -228,9 +240,12 @@ public class StaticWorkflowApplication extends
StaticApplication implements Auto
}
private StaticWorkflowApplication(Map<String, Object> properties,
Map<String, SynchronousQueue<JsonNodeModel>> queues,
Collection<KogitoProcessEventListener> listeners,
- UnitOfWorkManager manager, ExecutorService executor) {
+ UnitOfWorkManager manager, ExecutorService executor, JobsService
jobsService) {
super(new StaticConfig(new Addons(Collections.emptySet()), new
StaticProcessConfig(new CachedWorkItemHandlerConfig(),
- new DefaultProcessEventListenerConfig(listeners), manager),
new StaticConfigBean()));
+ new DefaultProcessEventListenerConfig(listeners), manager,
jobsService), new StaticConfigBean()));
+ if (jobsService instanceof InMemoryJobService inMemoryJobService) {
+ inMemoryJobService.registerJobExecutorFactory(new
InMemoryProcessJobExecutorFactory(new InMemoryJobContext(null, manager,
processes, null)));
+ }
if (!properties.isEmpty()) {
ConfigResolverHolder.setConfigResolver(MultiSourceConfigResolver.withSystemProperties(properties));
}
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
index 37eb91e4b9..be9fab9961 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
@@ -28,10 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
-import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.process.Process;
-import org.kie.kogito.process.ProcessConfig;
-import org.kie.kogito.process.Processes;
import org.kie.kogito.process.validation.ValidationException;
import org.kie.kogito.serverless.workflow.actions.SysoutAction;
import org.kie.kogito.serverless.workflow.actions.WorkflowLogLevel;
@@ -41,10 +38,6 @@ import
org.kie.kogito.serverless.workflow.models.JsonNodeModel;
import org.kie.kogito.serverless.workflow.parser.types.SysOutTypeHandler;
import org.kie.kogito.serverless.workflow.utils.ExpressionHandlerUtils;
import org.kie.kogito.serverless.workflow.utils.KogitoProcessContextResolver;
-import org.kie.kogito.services.jobs.impl.InMemoryJobContext;
-import org.kie.kogito.services.jobs.impl.InMemoryJobService;
-import org.kie.kogito.services.jobs.impl.InMemoryProcessJobExecutorFactory;
-import org.kie.kogito.uow.UnitOfWorkManager;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
@@ -165,13 +158,6 @@ public class StaticFluentWorkflowApplicationTest {
OperationStateBuilder sleepState = operation().action(call(expr("inc",
".count=.count+1")).sleepAfter(Duration.ofSeconds(1)));
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create()) {
- ProcessConfig processConfig =
application.config().get(ProcessConfig.class);
- JobsService jobService = processConfig.jobsService();
- Processes processes = application.get(Processes.class);
- UnitOfWorkManager unitOfWorkManager =
processConfig.unitOfWorkManager();
- if (jobService instanceof InMemoryJobService inMemoryJobService) {
- inMemoryJobService.registerJobExecutorFactory(new
InMemoryProcessJobExecutorFactory(new InMemoryJobContext(null,
unitOfWorkManager, processes, null)));
- }
Workflow workflow = workflow("Polling").start(startTask)
.next(sleepState)
.next(pollTask)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]