This is an automated email from the ASF dual-hosted git repository.
egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new b8c93b8ae [incubator-kie-issues-711] Refactroing job service to allow
collocated service for Quarkus (#1925)
b8c93b8ae is described below
commit b8c93b8ae2188f5c4ea51d54d27e235c6dfee286
Author: Enrique <[email protected]>
AuthorDate: Mon Jan 8 10:25:04 2024 +0100
[incubator-kie-issues-711] Refactroing job service to allow collocated
service for Quarkus (#1925)
* [incubator-kie-issues-711] Refactroing job service to allow collocated
service for Quarkus
* fix debug information
* fix in-vm events
* set correct logger level
* add lifecycle test
* fix formatting
* Quarkus 3 upgrades
* - renames
- cleanup
* - Fixed alternatives
* - Fix Data Audit Jobs Events
- Format
---------
Co-authored-by: Pere Fernández <[email protected]>
---
data-audit/.gitignore | 167 ---------------------
data-audit/data-audit-common/.gitignore | 1 -
.../kogito-addons-data-audit-jpa-common/.gitignore | 1 -
data-index/data-index-common/.gitignore | 1 -
data-index/data-index-graphql/.gitignore | 1 -
.../data-index-service-common/.gitignore | 1 -
.../data-index-service-infinispan/.gitignore | 1 -
.../data-index-storage-postgresql/.gitignore | 1 -
.../kie/kogito/jobs/service/job/DelegateJob.java | 12 +-
.../json/DurationExpirationTimeDeserializer.java | 61 ++++++++
.../json/DurationExpirationTimeSerializer.java | 51 +++++++
.../json/ExactExpirationTimeDeserializer.java | 47 ++++++
.../json/ExactExpirationTimeSerializer.java | 45 ++++++
.../jobs/service/json/JacksonConfiguration.java | 12 ++
.../ProcessInstanceJobDescriptionDeserializer.java | 67 +++++++++
.../ProcessInstanceJobDescriptionSerializer.java | 52 +++++++
.../repository/impl/BaseReactiveJobRepository.java | 12 +-
.../repository/impl/InMemoryJobRepository.java | 6 +-
.../scheduler/impl/TimerDelegateJobScheduler.java | 45 +-----
.../jobs/service/stream/JobEventPublisher.java} | 20 ++-
...bStreams.java => JobStreamsEventPublisher.java} | 42 +++++-
.../repository/impl/BaseJobRepositoryTest.java | 6 +-
.../repository/impl/InMemoryJobRepositoryTest.java | 2 +-
.../impl/TimerDelegateJobSchedulerTest.java | 39 -----
.../impl/VertxTimerServiceSchedulerTest.java | 6 +-
.../infinispan/InfinispanJobRepository.java | 6 +-
.../jobs/service/adapter/JobDetailsAdapter.java | 9 --
.../service/exception/JobExecutionException.java | 5 +
.../repository/mongodb/MongoDBJobRepository.java | 6 +-
.../postgresql/PostgreSqlJobRepository.java | 6 +-
.../src/main/resources/application.properties | 1 -
.../kogito-addons-quarkus-jobs/pom.xml | 80 ++++++++++
.../kogito/jobs/embedded/EmbeddedJobExecutor.java | 83 ++++++++++
.../jobs/embedded/EmbeddedJobServiceEvent.java} | 20 +--
.../kogito/jobs/embedded/EmbeddedJobsService.java | 98 ++++++++++++
.../kie/kogito/jobs/embedded/InVMPayloadData.java | 53 +++++++
.../kie/kogito/jobs/embedded/InVMRecipient.java} | 40 ++---
.../jobs/embedded/JobInVMEventPublisher.java | 156 +++++++++++++++++++
.../src/main/resources/META-INF/beans.xml | 0
.../src/main/resources/application.properties | 1 +
.../jobs/embedded/EmbeddedJobsServiceTests.java | 65 ++++++++
.../kie/kogito/jobs/embedded/TestApplication.java} | 39 ++---
.../kogito/jobs/embedded/TestEventPublisher.java | 61 ++++++++
.../kie/kogito/jobs/embedded/TestProcesses.java | 52 +++++++
jobs-service/kogito-addons-jobs-service/pom.xml | 6 +-
45 files changed, 1132 insertions(+), 354 deletions(-)
diff --git a/data-audit/.gitignore b/data-audit/.gitignore
deleted file mode 100644
index a3191deaf..000000000
--- a/data-audit/.gitignore
+++ /dev/null
@@ -1,167 +0,0 @@
-/target/
-!.mvn/wrapper/maven-wrapper.jar
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-
-ObjectStore/
-PutObjectStoreDirHere/
-
-# Created by https://www.gitignore.io/api/git,java,maven,eclipse,windows
-
-### Eclipse ###
-
-.metadata
-bin/
-tmp/
-*.tmp
-*.bak
-*.swp
-*~.nib
-local.properties
-.settings/
-.loadpath
-.recommenders
-
-# External tool builders
-.externalToolBuilders/
-
-# Locally stored "Eclipse launch configurations"
-*.launch
-
-# PyDev specific (Python IDE for Eclipse)
-*.pydevproject
-
-# CDT-specific (C/C++ Development Tooling)
-.cproject
-
-# CDT- autotools
-.autotools
-
-# Java annotation processor (APT)
-.factorypath
-
-# PDT-specific (PHP Development Tools)
-.buildpath
-
-# sbteclipse plugin
-.target
-
-# Tern plugin
-.tern-project
-
-# TeXlipse plugin
-.texlipse
-
-# STS (Spring Tool Suite)
-.springBeans
-
-# Code Recommenders
-.recommenders/
-
-# Annotation Processing
-.apt_generated/
-
-# Scala IDE specific (Scala & Java development for Eclipse)
-.cache-main
-.scala_dependencies
-.worksheet
-
-### Eclipse Patch ###
-# Eclipse Core
-.project
-
-# JDT-specific (Eclipse Java Development Tools)
-.classpath
-
-# Annotation Processing
-.apt_generated
-
-.sts4-cache/
-
-### Git ###
-# Created by git for backups. To disable backups in Git:
-# $ git config --global mergetool.keepBackup false
-*.orig
-
-# Created by git when using merge tools for conflicts
-*.BACKUP.*
-*.BASE.*
-*.LOCAL.*
-*.REMOTE.*
-*_BACKUP_*.txt
-*_BASE_*.txt
-*_LOCAL_*.txt
-*_REMOTE_*.txt
-
-### Java ###
-# Compiled class file
-*.class
-
-# Log file
-*.log
-
-# BlueJ files
-*.ctxt
-
-# Mobile Tools for Java (J2ME)
-.mtj.tmp/
-
-# Package Files #
-*.jar
-*.war
-*.nar
-*.ear
-*.zip
-*.tar.gz
-*.rar
-
-# virtual machine crash logs, see
http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
-
-### Maven ###
-target/
-pom.xml.tag
-pom.xml.releaseBackup
-pom.xml.versionsBackup
-pom.xml.next
-release.properties
-dependency-reduced-pom.xml
-buildNumber.properties
-.mvn/timing.properties
-.mvn/wrapper/maven-wrapper.jar
-
-### Windows ###
-# Windows thumbnail cache files
-Thumbs.db
-ehthumbs.db
-ehthumbs_vista.db
-
-# Dump file
-*.stackdump
-
-# Folder config file
-[Dd]esktop.ini
-
-# Recycle Bin used on file shares
-$RECYCLE.BIN/
-
-# Windows Installer files
-*.cab
-*.msi
-*.msix
-*.msm
-*.msp
-
-# Windows shortcuts
-*.lnk
-
-
-# End of https://www.gitignore.io/api/git,java,maven,eclipse,windows
diff --git a/data-audit/data-audit-common/.gitignore
b/data-audit/data-audit-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-audit/data-audit-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git
a/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
b/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
---
a/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-common/.gitignore
b/data-index/data-index-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-graphql/.gitignore
b/data-index/data-index-graphql/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-graphql/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-service/data-index-service-common/.gitignore
b/data-index/data-index-service/data-index-service-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-service/data-index-service-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git
a/data-index/data-index-service/data-index-service-infinispan/.gitignore
b/data-index/data-index-service/data-index-service-infinispan/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-service/data-index-service-infinispan/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git
a/data-index/data-index-storage/data-index-storage-postgresql/.gitignore
b/data-index/data-index-storage/data-index-storage-postgresql/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-storage/data-index-storage-postgresql/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
index 0cd6cc081..20796efa2 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
@@ -24,7 +24,7 @@ import
org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.timer.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,11 +40,11 @@ public class DelegateJob implements Job<JobDetailsContext> {
private final JobExecutorResolver jobExecutorResolver;
- private final JobStreams jobStreams;
+ private final JobEventPublisher jobEventPublisher;
- public DelegateJob(JobExecutorResolver executorResolver, JobStreams
jobStreams) {
+ public DelegateJob(JobExecutorResolver executorResolver, JobEventPublisher
jobEventPublisher) {
this.jobExecutorResolver = executorResolver;
- this.jobStreams = jobStreams;
+ this.jobEventPublisher = jobEventPublisher;
}
@Override
@@ -55,11 +55,11 @@ public class DelegateJob implements Job<JobDetailsContext> {
.map(jobExecutorResolver::get)
.map(executor -> executor.execute(ctx.getJobDetails()))
.orElseThrow(() -> new IllegalStateException("JobDetails
cannot be null from context " + ctx))
- .onItem().invoke(jobStreams::publishJobSuccess)
+ .onItem().invoke(jobEventPublisher::publishJobSuccess)
.onFailure(JobExecutionException.class).invoke(ex -> {
String jobId = ((JobExecutionException) ex).getJobId();
LOGGER.error("Error executing job {}", jobId, ex);
- jobStreams.publishJobError(JobExecutionResponse.builder()
+
jobEventPublisher.publishJobError(JobExecutionResponse.builder()
.message(ex.getMessage())
.now()
.jobId(jobId)
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
new file mode 100644
index 000000000..f23b7aab4
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.DurationExpirationTime;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class DurationExpirationTimeDeserializer extends
StdDeserializer<DurationExpirationTime> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public DurationExpirationTimeDeserializer() {
+ super(DurationExpirationTime.class);
+ }
+
+ @Override
+ public DurationExpirationTime deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException, JacksonException {
+
+ JsonNode node = jp.getCodec().readTree(jp);
+
+ ZonedDateTime time = ctxt.readTreeAsValue(node.get("expirationTime"),
ZonedDateTime.class);
+ Long repeatInterval = null;
+ if (node.has("repeatInterval")) {
+ repeatInterval = node.get("repeatInterval").asLong();
+ }
+ Integer repeatLimit = null;
+ if (node.has("repeatLimit")) {
+ repeatLimit = node.get("repeatLimit").asInt(0);
+ }
+
+ Duration res = Duration.between(Instant.now(), time.toInstant());
+ return DurationExpirationTime.repeat(res.toMillis(), repeatInterval,
repeatLimit);
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
new file mode 100644
index 000000000..f995dbfe4
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.DurationExpirationTime;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class DurationExpirationTimeSerializer extends
StdSerializer<DurationExpirationTime> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public DurationExpirationTimeSerializer() {
+ super(DurationExpirationTime.class);
+ }
+
+ @Override
+ public void serialize(DurationExpirationTime value, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
+ jgen.writeStartObject();
+ jgen.writeStringField("@type", DurationExpirationTime.class.getName());
+ jgen.writeObjectField("expirationTime", value.get());
+ if (value.repeatInterval() != null) {
+ jgen.writeNumberField("repeatInterval", value.repeatInterval());
+ }
+ if (value.repeatLimit() != null) {
+ jgen.writeNumberField("repeatLimit", value.repeatLimit());
+ }
+ jgen.writeEndObject();
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
new file mode 100644
index 000000000..32ffee185
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.ExactExpirationTime;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class ExactExpirationTimeDeserializer extends
StdDeserializer<ExactExpirationTime> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public ExactExpirationTimeDeserializer() {
+ super(ExactExpirationTime.class);
+ }
+
+ @Override
+ public ExactExpirationTime deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException, JacksonException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ ZonedDateTime time = ctxt.readTreeAsValue(node.get("expirationTime"),
ZonedDateTime.class);
+ return ExactExpirationTime.of(time);
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
new file mode 100644
index 000000000..f77a6f8d8
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExactExpirationTime;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ExactExpirationTimeSerializer extends
StdSerializer<ExactExpirationTime> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public ExactExpirationTimeSerializer() {
+ super(ExactExpirationTime.class);
+ }
+
+ @Override
+ public void serialize(ExactExpirationTime value, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
+ jgen.writeStartObject();
+ jgen.writeStringField("@type", ExactExpirationTime.class.getName());
+ jgen.writeObjectField("expirationTime", value.get());
+ jgen.writeEndObject();
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
index a3d5bc41e..cfbbd024a 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
@@ -18,12 +18,16 @@
*/
package org.kie.kogito.jobs.service.json;
+import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.ExactExpirationTime;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.cloudevents.jackson.JsonFormat;
@@ -41,8 +45,16 @@ public class JacksonConfiguration {
public ObjectMapperCustomizer customizer() {
return objectMapper -> {
LOGGER.debug("Jackson customization initialized.");
+ SimpleModule kogitoCustomModule = new SimpleModule();
+
kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new
ProcessInstanceJobDescriptionSerializer());
+
kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new
ProcessInstanceJobDescriptionDeserializer());
+ kogitoCustomModule.addSerializer(DurationExpirationTime.class, new
DurationExpirationTimeSerializer());
+ kogitoCustomModule.addDeserializer(DurationExpirationTime.class,
new DurationExpirationTimeDeserializer());
+ kogitoCustomModule.addSerializer(ExactExpirationTime.class, new
ExactExpirationTimeSerializer());
+ kogitoCustomModule.addDeserializer(ExactExpirationTime.class, new
ExactExpirationTimeDeserializer());
objectMapper
.registerModule(new JavaTimeModule())
+ .registerModule(kogitoCustomModule)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
.registerModule(JsonFormat.getCloudEventJacksonModule());
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
new file mode 100644
index 000000000..8b60fd644
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExpirationTime;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import static java.util.Optional.ofNullable;
+
+public class ProcessInstanceJobDescriptionDeserializer extends
StdDeserializer<ProcessInstanceJobDescription> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public ProcessInstanceJobDescriptionDeserializer() {
+ super(ProcessInstanceJobDescription.class);
+ }
+
+ @Override
+ public ProcessInstanceJobDescription deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException, JacksonException {
+ ProcessInstanceJobDescriptionBuilder builder =
ProcessInstanceJobDescription.builder();
+
+ JsonNode node = jp.getCodec().readTree(jp);
+ ofNullable(node.get("id")).ifPresent(e -> builder.id(e.asText()));
+ ofNullable(node.get("timerId")).ifPresent(e ->
builder.timerId(e.asText()));
+ ofNullable(node.get("priority")).ifPresent(e ->
builder.priority(e.asInt()));
+ ofNullable(node.get("processInstanceId")).ifPresent(e ->
builder.processInstanceId(e.asText()));
+ ofNullable(node.get("rootProcessInstanceId")).ifPresent(e ->
builder.rootProcessInstanceId(e.asText()));
+ ofNullable(node.get("processId")).ifPresent(e ->
builder.processId(e.asText()));
+ ofNullable(node.get("rootProcessId")).ifPresent(e ->
builder.rootProcessId(e.asText()));
+ ofNullable(node.get("nodeInstanceId")).ifPresent(e ->
builder.nodeInstanceId(e.asText()));
+
+ String type = node.get("expirationTime").get("@type").asText();
+ try {
+ builder.expirationTime((ExpirationTime)
ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(type)));
+ } catch (ClassNotFoundException | IOException e1) {
+ e1.printStackTrace();
+ }
+
+ return builder.build();
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
new file mode 100644
index 000000000..379da2a0e
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ProcessInstanceJobDescriptionSerializer extends
StdSerializer<ProcessInstanceJobDescription> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public ProcessInstanceJobDescriptionSerializer() {
+ super(ProcessInstanceJobDescription.class);
+ }
+
+ @Override
+ public void serialize(ProcessInstanceJobDescription value, JsonGenerator
jgen, SerializerProvider provider) throws IOException {
+ jgen.writeStartObject();
+ jgen.writeStringField("id", value.id());
+ jgen.writeStringField("timerId", value.timerId());
+ jgen.writeObjectField("expirationTime", value.expirationTime());
+ jgen.writeNumberField("priority", value.priority());
+ jgen.writeStringField("processInstanceId", value.processInstanceId());
+ jgen.writeStringField("rootProcessInstanceId",
value.rootProcessInstanceId());
+ jgen.writeStringField("processId", value.processId());
+ jgen.writeStringField("rootProcessId", value.rootProcessId());
+ jgen.writeStringField("nodeInstanceId", value.nodeInstanceId());
+ jgen.writeEndObject();
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
index 2d7a402ac..2271f46dc 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
@@ -30,7 +30,7 @@ import
org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import io.vertx.core.Vertx;
@@ -38,11 +38,11 @@ public abstract class BaseReactiveJobRepository implements
ReactiveJobRepository
private Vertx vertx;
- private JobStreams jobStreams;
+ private JobEventPublisher jobEventPublisher;
- protected BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
+ protected BaseReactiveJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher) {
this.vertx = vertx;
- this.jobStreams = jobStreams;
+ this.jobEventPublisher = jobEventPublisher;
}
public <T> CompletionStage<T> runAsync(Supplier<T> function) {
@@ -62,7 +62,7 @@ public abstract class BaseReactiveJobRepository implements
ReactiveJobRepository
@Override
public CompletionStage<JobDetails> save(JobDetails job) {
return doSave(job)
- .thenApply(jobStreams::publishJobStatusChange);
+ .thenApply(jobEventPublisher::publishJobStatusChange);
}
public abstract CompletionStage<JobDetails> doSave(JobDetails job);
@@ -70,7 +70,7 @@ public abstract class BaseReactiveJobRepository implements
ReactiveJobRepository
@Override
public CompletionStage<JobDetails> delete(JobDetails job) {
return delete(job.getId())
- .thenApply(j -> jobStreams.publishJobStatusChange(job));
+ .thenApply(j -> jobEventPublisher.publishJobStatusChange(job));
}
@Override
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
index c559eefa0..5b61e1f40 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
@@ -33,7 +33,7 @@ import
org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import io.quarkus.arc.DefaultBean;
@@ -53,8 +53,8 @@ public class InMemoryJobRepository extends
BaseReactiveJobRepository implements
}
@Inject
- public InMemoryJobRepository(Vertx vertx, JobStreams jobStreams) {
- super(vertx, jobStreams);
+ public InMemoryJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher) {
+ super(vertx, jobEventPublisher);
}
@Override
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
index 6f5f028b3..97a8f0a90 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
@@ -20,24 +20,18 @@ package org.kie.kogito.jobs.service.scheduler.impl;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.config.inject.ConfigProperty;
-import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
-import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.job.DelegateJob;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
-import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler;
-import org.kie.kogito.jobs.service.stream.AvailableStreams;
-import org.kie.kogito.jobs.service.stream.JobStreams;
-import org.kie.kogito.jobs.service.utils.ErrorHandling;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.timer.Trigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
@@ -58,7 +52,7 @@ public class TimerDelegateJobScheduler extends
BaseTimerJobScheduler {
private VertxTimerServiceScheduler delegate;
- private JobStreams jobStreams;
+ private JobEventPublisher jobEventPublisher;
protected TimerDelegateJobScheduler() {
}
@@ -70,11 +64,11 @@ public class TimerDelegateJobScheduler extends
BaseTimerJobScheduler {
@ConfigProperty(name =
"kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") long
schedulerChunkInMinutes,
@ConfigProperty(name =
"kogito.jobs-service.forceExecuteExpiredJobs", defaultValue = "true") boolean
forceExecuteExpiredJobs,
JobExecutorResolver jobExecutorResolver,
VertxTimerServiceScheduler delegate,
- JobStreams jobStreams) {
+ JobEventPublisher jobEventPublisher) {
super(jobRepository, backoffRetryMillis,
maxIntervalLimitToRetryMillis, schedulerChunkInMinutes,
forceExecuteExpiredJobs);
this.jobExecutorResolver = jobExecutorResolver;
this.delegate = delegate;
- this.jobStreams = jobStreams;
+ this.jobEventPublisher = jobEventPublisher;
}
@Override
@@ -82,7 +76,7 @@ public class TimerDelegateJobScheduler extends
BaseTimerJobScheduler {
LOGGER.debug("Job Scheduling {}", job);
return ReactiveStreams
.of(job)
- .map(j -> delegate.scheduleJob(new
DelegateJob(jobExecutorResolver, jobStreams), new JobDetailsContext(j),
+ .map(j -> delegate.scheduleJob(new
DelegateJob(jobExecutorResolver, jobEventPublisher), new JobDetailsContext(j),
trigger.orElse(j.getTrigger())));
}
@@ -100,33 +94,4 @@ public class TimerDelegateJobScheduler extends
BaseTimerJobScheduler {
.buildRs();
}
- //Stream Processors
-
- @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
- @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
- public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse
response) {
- LOGGER.warn("Error received {}", response);
- return
ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionError, response)
- .findFirst()
- .run()
- .thenApply(Optional::isPresent)
- .exceptionally(e -> {
- LOGGER.error("Error handling error {}", response, e);
- return false;
- });
- }
-
- @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
- @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
- public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse
response) {
- LOGGER.debug("Success received to be processed {}", response);
- return
ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionSuccess,
response)
- .findFirst()
- .run()
- .thenApply(Optional::isPresent)
- .exceptionally(e -> {
- LOGGER.error("Error handling error {}", response, e);
- return false;
- });
- }
}
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
similarity index 66%
copy from
jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
copy to
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
index 3a65ab42e..4342cf00c 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.jobs.service.exception;
+package org.kie.kogito.jobs.service.stream;
-public class JobExecutionException extends JobServiceException {
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
- private String jobId;
+public interface JobEventPublisher {
- public JobExecutionException(String jobId, String message) {
- super(message);
- this.jobId = jobId;
- }
+ JobExecutionResponse publishJobError(JobExecutionResponse response);
- public String getJobId() {
- return jobId;
- }
-}
\ No newline at end of file
+ JobExecutionResponse publishJobSuccess(JobExecutionResponse response);
+
+ JobDetails publishJobStatusChange(JobDetails scheduledJob);
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
similarity index 69%
rename from
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
rename to
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
index a951695b1..f8ef12319 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
@@ -18,6 +18,9 @@
*/
package org.kie.kogito.jobs.service.stream;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -26,6 +29,8 @@ import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +44,12 @@ import jakarta.inject.Inject;
* received item.
*/
@ApplicationScoped
-public class JobStreams {
+public class JobStreamsEventPublisher implements JobEventPublisher {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobStreamsEventPublisher.class);
- private static final Logger LOGGER =
LoggerFactory.getLogger(JobStreams.class);
+ @Inject
+ ReactiveJobScheduler scheduler;
/**
* Publish on Stream of Job Error events
@@ -82,8 +90,36 @@ public class JobStreams {
return scheduledJob;
}
- //Broadcast Events from Emitter to Streams
+ //Stream Processors
+ @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
+ @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+ public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse
response) {
+ LOGGER.warn("Error received {}", response);
+ return
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError,
response)
+ .findFirst()
+ .run()
+ .thenApply(Optional::isPresent)
+ .exceptionally(e -> {
+ LOGGER.error("Error handling error {}", response, e);
+ return false;
+ });
+ }
+
+ @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
+ @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+ public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse
response) {
+ LOGGER.debug("Success received to be processed {}", response);
+ return
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess,
response)
+ .findFirst()
+ .run()
+ .thenApply(Optional::isPresent)
+ .exceptionally(e -> {
+ LOGGER.error("Error handling error {}", response, e);
+ return false;
+ });
+ }
+ // Broadcast Events from Emitter to Streams
@Incoming(AvailableStreams.JOB_ERROR)
@Outgoing(AvailableStreams.JOB_ERROR_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
index f992ca071..cfbadc4f8 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
+++
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
@@ -34,7 +34,7 @@ import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.Recipient;
import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.jobs.service.utils.FunctionsUtil;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
@@ -56,8 +56,8 @@ public abstract class BaseJobRepositoryTest {
createAndSaveJob(ID);
}
- public JobStreams mockJobStreams() {
- final JobStreams mock = mock(JobStreams.class);
+ public JobEventPublisher mockJobEventPublisher() {
+ final JobEventPublisher mock = mock(JobEventPublisher.class);
lenient().when(mock.publishJobStatusChange(any(JobDetails.class))).thenAnswer(a
-> a.getArgument(0));
lenient().when(mock.publishJobSuccess(any(JobExecutionResponse.class))).thenAnswer(a
-> a.getArgument(0));
lenient().when(mock.publishJobError(any(JobExecutionResponse.class))).thenAnswer(a
-> a.getArgument(0));
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
index cb7f67f27..6b975c4f0 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
@@ -36,7 +36,7 @@ class InMemoryJobRepositoryTest extends BaseJobRepositoryTest
{
@BeforeEach
public void setUp() throws Exception {
- tested = new InMemoryJobRepository(vertx, mockJobStreams());
+ tested = new InMemoryJobRepository(vertx, mockJobEventPublisher());
super.setUp();
}
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
index 9c5ca3bc9..f272a790a 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
+++
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
@@ -22,7 +22,6 @@ import java.util.Optional;
import java.util.UUID;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
-import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -46,10 +45,8 @@ import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -99,42 +96,6 @@ class TimerDelegateJobSchedulerTest extends
BaseTimerJobSchedulerTest {
verify(timer, never()).removeJob(any(ManageableJobHandle.class));
}
- @Test
- void testJobSuccessProcessor() {
- JobExecutionResponse response = getJobResponse();
- doReturn(ReactiveStreams.of(JobDetails.builder().build()))
- .when(tested).handleJobExecutionSuccess(response);
- tested.jobSuccessProcessor(response).thenAccept(r ->
assertThat(r).isTrue());
- verify(tested).handleJobExecutionSuccess(response);
- }
-
- @Test
- void testJobSuccessProcessorFail() {
- JobExecutionResponse response = getJobResponse();
- doReturn(ReactiveStreams.failed(new RuntimeException()))
- .when(tested).handleJobExecutionSuccess(response);
- tested.jobSuccessProcessor(response).thenAccept(r ->
assertThat(r).isFalse());
- verify(tested).handleJobExecutionSuccess(response);
- }
-
- @Test
- void testJobErrorProcessor() {
- JobExecutionResponse response = getJobResponse();
- doReturn(ReactiveStreams.of(JobDetails.builder().build()))
- .when(tested).handleJobExecutionError(response);
- tested.jobErrorProcessor(response).thenAccept(r ->
assertThat(r).isTrue());
- verify(tested).handleJobExecutionError(response);
- }
-
- @Test
- void testJobErrorProcessorFail() {
- JobExecutionResponse response = getJobResponse();
- doReturn(ReactiveStreams.failed(new RuntimeException()))
- .when(tested).handleJobExecutionError(response);
- tested.jobErrorProcessor(response).thenAccept(r ->
assertThat(r).isFalse());
- verify(tested).handleJobExecutionError(response);
- }
-
private JobExecutionResponse getJobResponse() {
return JobExecutionResponse.builder()
.jobId(UUID.randomUUID().toString())
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
index 4f7850245..3978fccb1 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
+++
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
@@ -29,7 +29,7 @@ import org.kie.kogito.jobs.service.job.DelegateJob;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Job;
import org.kie.kogito.timer.JobContext;
@@ -62,7 +62,7 @@ class VertxTimerServiceSchedulerTest {
private JobExecutorResolver jobExecutorResolver;
@Mock
- private JobStreams jobStreams;
+ private JobEventPublisher jobEventPublisher;
@Captor
private ArgumentCaptor<JobDetails> jobCaptor;
@@ -110,7 +110,7 @@ class VertxTimerServiceSchedulerTest {
trigger = new PointInTimeTrigger(timestamp, null, null);
jobDetails = JobDetails.builder().build();
context = new JobDetailsContext(jobDetails);
- job = new DelegateJob(jobExecutorResolver, jobStreams);
+ job = new DelegateJob(jobExecutorResolver, jobEventPublisher);
return tested.scheduleJob(job, context, trigger);
}
diff --git
a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
index 210e37435..8b9a25fa7 100644
---
a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
+++
b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
@@ -35,7 +35,7 @@ import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import io.vertx.core.Vertx;
@@ -58,9 +58,9 @@ public class InfinispanJobRepository extends
BaseReactiveJobRepository implement
@Inject
public InfinispanJobRepository(Vertx vertx,
- JobStreams jobStreams,
+ JobEventPublisher jobEventPublisher,
RemoteCacheManager remoteCacheManager) {
- super(vertx, jobStreams);
+ super(vertx, jobEventPublisher);
this.remoteCacheManager = remoteCacheManager;
}
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
index 3149da797..1e112838a 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
+++
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
@@ -29,8 +29,6 @@ import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.api.Retry;
import org.kie.kogito.jobs.service.api.Schedule;
import org.kie.kogito.jobs.service.api.TemporalUnit;
-import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
-import org.kie.kogito.jobs.service.api.recipient.sink.SinkRecipient;
import org.kie.kogito.jobs.service.api.schedule.timer.TimerSchedule;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
@@ -161,7 +159,6 @@ public class JobDetailsAdapter {
}
public static Recipient<?> toRecipient(JobDetails jobDetails) {
- checkIsSupported(jobDetails.getRecipient().getRecipient());
return jobDetails.getRecipient().getRecipient();
}
@@ -170,15 +167,9 @@ public class JobDetailsAdapter {
}
public static org.kie.kogito.jobs.service.model.Recipient
from(Recipient<?> recipient) {
- checkIsSupported(recipient);
return new RecipientInstance(recipient);
}
- static void checkIsSupported(Recipient<?> recipient) {
- if (!(recipient instanceof HttpRecipient) && !(recipient
instanceof SinkRecipient)) {
- throw new UnsupportedOperationException("Only HttpRecipient
and SinkRecipient are supported");
- }
- }
}
public static class RetryAdapter {
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
index 3a65ab42e..830c227c6 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
@@ -22,6 +22,11 @@ public class JobExecutionException extends
JobServiceException {
private String jobId;
+ public JobExecutionException(String jobId, String message, Throwable th) {
+ super(message, th);
+ this.jobId = jobId;
+ }
+
public JobExecutionException(String jobId, String message) {
super(message);
this.jobId = jobId;
diff --git
a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
index 4d89e4226..6bc29ec4b 100644
---
a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
+++
b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
@@ -30,7 +30,7 @@ import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.JobDetailsMarshaller;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import com.mongodb.client.model.FindOneAndReplaceOptions;
@@ -87,10 +87,10 @@ public class MongoDBJobRepository extends
BaseReactiveJobRepository implements R
}
@Inject
- public MongoDBJobRepository(Vertx vertx, JobStreams jobStreams,
ReactiveMongoClient mongoClient,
+ public MongoDBJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher, ReactiveMongoClient mongoClient,
@ConfigProperty(name = DATABASE_PROPERTY) String database,
JobDetailsMarshaller jobDetailsMarshaller) {
- super(vertx, jobStreams);
+ super(vertx, jobEventPublisher);
this.jobDetailsMarshaller = jobDetailsMarshaller;
this.collection =
mongoClient.getDatabase(database).getCollection(JOB_DETAILS_COLLECTION);
}
diff --git
a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
index 132b35182..fa95138fa 100644
---
a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
+++
b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
@@ -34,7 +34,7 @@ import
org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Trigger;
@@ -74,9 +74,9 @@ public class PostgreSqlJobRepository extends
BaseReactiveJobRepository implement
}
@Inject
- public PostgreSqlJobRepository(Vertx vertx, JobStreams jobStreams, PgPool
client,
+ public PostgreSqlJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher, PgPool client,
TriggerMarshaller triggerMarshaller, RecipientMarshaller
recipientMarshaller) {
- super(vertx, jobStreams);
+ super(vertx, jobEventPublisher);
this.client = client;
this.triggerMarshaller = triggerMarshaller;
this.recipientMarshaller = recipientMarshaller;
diff --git
a/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
b/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
index 4dc2184d4..5070c9069 100644
---
a/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
+++
b/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
@@ -20,7 +20,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=
quarkus.datasource.password=
-quarkus.datasource.reactive.url=
quarkus.datasource.jdbc.url=
quarkus.flyway.migrate-at-start=true
quarkus.datasource.health.enabled=true
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml
new file mode 100644
index 000000000..c090cb87e
--- /dev/null
+++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>kogito-addons-jobs-service</artifactId>
+ <groupId>org.kie.kogito</groupId>
+ <version>999-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>kogito-addons-quarkus-jobs</artifactId>
+
+ <name>Jobs Collocated Quarkus Addon - Runtime</name>
+ <description>Run Jobs Service embedded with the application.</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-internal-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-services</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-timer</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-common</artifactId>
+
+ <exclusions>
+ <exclusion>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-reactive</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Testing dependencies -->
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5-mockito</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>jobs-service-inmemory</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
new file mode 100644
index 000000000..bbc84c113
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import org.kie.kogito.Application;
+import org.kie.kogito.Model;
+import org.kie.kogito.jobs.service.api.Recipient;
+import org.kie.kogito.jobs.service.exception.JobExecutionException;
+import org.kie.kogito.jobs.service.executor.JobExecutor;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.RecipientInstance;
+import org.kie.kogito.process.Process;
+import org.kie.kogito.process.Processes;
+import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
+
+import io.smallrye.mutiny.Uni;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+@Alternative
+public class EmbeddedJobExecutor implements JobExecutor {
+
+ @Inject
+ Processes processes;
+
+ @Inject
+ Application application;
+
+ @Override
+ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
+
+ String correlationId = jobDetails.getCorrelationId();
+ RecipientInstance recipientModel = (RecipientInstance)
jobDetails.getRecipient();
+ InVMRecipient recipient = (InVMRecipient)
recipientModel.getRecipient();
+ String timerId = recipient.getPayload().getData().timerId();
+ String processId = recipient.getPayload().getData().processId();
+ Process<? extends Model> process = processes.processById(processId);
+ String processInstanceId =
recipient.getPayload().getData().processInstanceId();
+ Integer limit = jobDetails.getRetries();
+
+ TriggerJobCommand command = new TriggerJobCommand(processInstanceId,
correlationId, timerId, limit, process, application.unitOfWorkManager());
+
+ return Uni.createFrom().item(command::execute)
+ .onFailure()
+ .transform(
+ unexpected -> new
JobExecutionException(jobDetails.getId(), "Unexpected error when executing
Embedded request for job: " + jobDetails.getId() + ". " +
unexpected.getMessage(),
+ unexpected))
+ .onItem()
+ .transform(res -> JobExecutionResponse.builder()
+ .message("Embedded job executed")
+ .code(String.valueOf(200))
+ .now()
+ .jobId(jobDetails.getId())
+ .build());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends Recipient> type() {
+ return InVMRecipient.class;
+ }
+
+}
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
similarity index 71%
copy from
jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
copy to
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
index 3a65ab42e..2c004389c 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
@@ -16,18 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.jobs.service.exception;
+package org.kie.kogito.jobs.embedded;
-public class JobExecutionException extends JobServiceException {
+import org.kie.kogito.jobs.service.model.JobDetails;
- private String jobId;
+public class EmbeddedJobServiceEvent {
- public JobExecutionException(String jobId, String message) {
- super(message);
- this.jobId = jobId;
+ private JobDetails jobDetails;
+
+ public EmbeddedJobServiceEvent(JobDetails jobDetails) {
+ this.jobDetails = jobDetails;
}
- public String getJobId() {
- return jobId;
+ public JobDetails getJobDetails() {
+ return jobDetails;
}
-}
\ No newline at end of file
+
+}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
new file mode 100644
index 000000000..95b9b0d92
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.concurrent.ExecutionException;
+
+import org.kie.kogito.jobs.JobsService;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.ProcessJobDescription;
+import org.kie.kogito.jobs.api.JobCallbackResourceDef;
+import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
+import org.kie.kogito.jobs.service.api.Job;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+
+import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher;
+
+@ApplicationScoped
+@Alternative
+public class EmbeddedJobsService implements JobsService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EmbeddedJobsService.class);
+
+ @Inject
+ ReactiveJobScheduler scheduler;
+
+ public EmbeddedJobsService() {
+ LOGGER.info("Starting Embedded Job Service");
+ }
+
+ @Override
+ public String scheduleProcessJob(ProcessJobDescription description) {
+ LOGGER.debug("ScheduleProcessJob: {} not supported", description);
+ return null;
+ }
+
+ @Override
+ public String scheduleProcessInstanceJob(ProcessInstanceJobDescription
description) {
+ try {
+ Job job = Job.builder()
+ .id(description.id())
+ .correlationId(description.id())
+ .recipient(new InVMRecipient(new
InVMPayloadData(description)))
+
.schedule(JobCallbackResourceDef.buildSchedule(description))
+ .build();
+
+ JobDetails jobDetails = JobDetailsAdapter.from(job);
+ LOGGER.debug("Embedded ScheduleProcessJob: {}", jobDetails);
+
+ String outcome = null;
+
+ JobDetails uni =
Uni.createFrom().publisher(publisher(scheduler.schedule(jobDetails))).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).subscribe().asCompletionStage().get();
+ outcome = uni.getId();
+
+ LOGGER.debug("Embedded ScheduleProcessJob: {} scheduled", outcome);
+ return outcome;
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("interrupted execution", e);
+ return null;
+ }
+ }
+
+ @Override
+ public boolean cancelJob(String jobId) {
+ try {
+ LOGGER.debug("Embedded cancelJob: {}", jobId);
+ return
JobStatus.CANCELED.equals(scheduler.cancel(jobId).toCompletableFuture().get().getStatus());
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
new file mode 100644
index 000000000..cb49a135b
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.service.api.PayloadData;
+
+public class InVMPayloadData extends
PayloadData<ProcessInstanceJobDescription> {
+
+ private ProcessInstanceJobDescription jobDescription;
+
+ public InVMPayloadData() {
+ // do nothing
+ }
+
+ public void setJobDescription(ProcessInstanceJobDescription
jobDescription) {
+ this.jobDescription = jobDescription;
+ }
+
+ public ProcessInstanceJobDescription getJobDescription() {
+ return jobDescription;
+ }
+
+ @Override
+ public ProcessInstanceJobDescription getData() {
+ return jobDescription;
+ }
+
+ public InVMPayloadData(ProcessInstanceJobDescription data) {
+ this.jobDescription = data;
+ }
+
+ @Override
+ public String toString() {
+ return "InVMPayloadData [data=" + jobDescription + "]";
+ }
+}
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
similarity index 56%
copy from
jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
copy to
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
index cb7f67f27..39b700aea 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
@@ -16,32 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.jobs.service.repository.impl;
+package org.kie.kogito.jobs.embedded;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.api.Recipient;
-import io.vertx.core.Vertx;
+public class InVMRecipient extends Recipient<InVMPayloadData> {
-class InMemoryJobRepositoryTest extends BaseJobRepositoryTest {
+ private InVMPayloadData data;
- private InMemoryJobRepository tested;
- private static Vertx vertx;
+ public InVMRecipient() {
+ // do nothing
+ }
+
+ public void setData(InVMPayloadData data) {
+ this.data = data;
+ }
- @BeforeAll
- static void init() {
- vertx = Vertx.vertx();
+ public InVMPayloadData getData() {
+ return data;
}
- @BeforeEach
- public void setUp() throws Exception {
- tested = new InMemoryJobRepository(vertx, mockJobStreams());
- super.setUp();
+ public InVMRecipient(InVMPayloadData data) {
+ this.data = data;
}
@Override
- public ReactiveJobRepository tested() {
- return tested;
+ public InVMPayloadData getPayload() {
+ return data;
}
+
+ @Override
+ public String toString() {
+ return "InVMRecipient [data=" + data + "]";
+ }
+
}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
new file mode 100644
index 000000000..814db7e91
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.embedded;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.event.job.JobInstanceDataEvent;
+import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.resource.RestApiConstants;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
+import org.kie.kogito.jobs.service.utils.ErrorHandling;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Event;
+import jakarta.enterprise.event.ObservesAsync;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+import static java.util.stream.Collectors.toList;
+import static org.kie.kogito.jobs.service.events.JobDataEvent.JOB_EVENT_TYPE;
+
+@ApplicationScoped
+@Alternative
+public class JobInVMEventPublisher implements JobEventPublisher {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobInVMEventPublisher.class);
+
+ private final String url;
+
+ private final List<EventPublisher> eventPublishers;
+
+ private final ObjectMapper objectMapper;
+
+ @Inject
+ ReactiveJobScheduler scheduler;
+
+ @Inject
+ Event<EmbeddedJobServiceEvent> bus;
+
+ public JobInVMEventPublisher(
+ @ConfigProperty(name = "kogito.service.url", defaultValue =
"http://localhost:8080") String url,
+ Instance<EventPublisher> eventPublishers,
+ ObjectMapper objectMapper) {
+ this.url = url;
+ this.eventPublishers = eventPublishers.stream().collect(toList());
+ this.objectMapper = objectMapper;
+ LOGGER.info("JobInVMEventPublisher Started with url {}", url);
+ }
+
+ @Override
+ public JobExecutionResponse publishJobError(JobExecutionResponse response)
{
+ try {
+ LOGGER.debug("publishJobError {}", response);
+
+
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError,
response)
+ .findFirst()
+ .run()
+ .thenApply(Optional::isPresent)
+ .exceptionally(e -> {
+ LOGGER.error("Error handling error {}", response, e);
+ return false;
+ }).toCompletableFuture().get();
+
+ return response;
+ } catch (Exception e) {
+ LOGGER.error("error in publishJobError", e);
+ return response;
+ }
+ }
+
+ @Override
+ public JobExecutionResponse publishJobSuccess(JobExecutionResponse
response) {
+ try {
+ LOGGER.debug("publishJobSuccess {}", response);
+
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess,
response)
+ .findFirst()
+ .run()
+ .thenApply(Optional::isPresent)
+ .exceptionally(e -> {
+ LOGGER.error("Error handling error {}", response, e);
+ return false;
+ }).toCompletableFuture().get();
+
+ return response;
+ } catch (Exception e) {
+ LOGGER.error("error in publishJobSuccess", e);
+ return response;
+ }
+ }
+
+ @Override
+ public JobDetails publishJobStatusChange(JobDetails jobDetails) {
+ try {
+ LOGGER.debug("publishJobStatusChange {}", jobDetails);
+ if (eventPublishers.isEmpty()) {
+ return jobDetails;
+ }
+
+ bus.fireAsync(new
EmbeddedJobServiceEvent(jobDetails)).toCompletableFuture().get();
+ return jobDetails;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) {
+ JobDetails jobDetails = serviceEvent.getJobDetails();
+ LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails);
+ try {
+ ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
+ byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
+ JobInstanceDataEvent event = new
JobInstanceDataEvent(JOB_EVENT_TYPE,
+ url + RestApiConstants.JOBS_PATH,
+ jsonContent,
+ scheduledJob.getProcessInstanceId(),
+ scheduledJob.getRootProcessInstanceId(),
+ scheduledJob.getProcessId(),
+ scheduledJob.getRootProcessId(),
+ null);
+
+ eventPublishers.forEach(e -> e.publish(event));
+ } catch (Exception e) {
+ LOGGER.error("Job status change propagation has failed at
eventPublisher: " + eventPublishers.getClass() + " execution.", e);
+ }
+ }
+}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/META-INF/beans.xml
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..e69de29bb
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
new file mode 100644
index 000000000..81cb27f1d
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
@@ -0,0 +1 @@
+quarkus.arc.selected-alternatives=org.kie.kogito.jobs.embedded.*
\ No newline at end of file
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
new file mode 100644
index 000000000..b602a9dff
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.JobsService;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+@QuarkusTest
+public class EmbeddedJobsServiceTests {
+
+ @Inject
+ JobsService jobService;
+
+ @Inject
+ TestEventPublisher publisher;
+
+ @Test
+ public void testJobService() throws Exception {
+ // testing only we have the full lifecycle
+ publisher.expectedEvents(2);
+
+ ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.builder()
+ .generateId()
+ .timerId("-1")
+ .expirationTime(DurationExpirationTime.now())
+ .processInstanceId("1")
+ .rootProcessInstanceId(null)
+ .processId("processId")
+ .rootProcessId(null)
+ .nodeInstanceId("node_1")
+ .build();
+ jobService.scheduleProcessInstanceJob(description);
+
+ List<DataEvent<?>> events = publisher.getEvents();
+ Assertions.assertEquals(2, events.size());
+
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
similarity index 50%
copy from
jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
copy to
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
index cb7f67f27..62be9cfd9 100644
---
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
@@ -16,32 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.jobs.service.repository.impl;
+package org.kie.kogito.jobs.embedded;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.Application;
+import org.kie.kogito.Config;
+import org.kie.kogito.KogitoEngine;
+import org.kie.kogito.uow.UnitOfWork;
+import org.kie.kogito.uow.UnitOfWorkManager;
+import org.mockito.Mockito;
-import io.vertx.core.Vertx;
+import jakarta.enterprise.context.ApplicationScoped;
-class InMemoryJobRepositoryTest extends BaseJobRepositoryTest {
+@ApplicationScoped
+public class TestApplication implements Application {
- private InMemoryJobRepository tested;
- private static Vertx vertx;
-
- @BeforeAll
- static void init() {
- vertx = Vertx.vertx();
+ @Override
+ public Config config() {
+ return Mockito.mock(Config.class);
}
- @BeforeEach
- public void setUp() throws Exception {
- tested = new InMemoryJobRepository(vertx, mockJobStreams());
- super.setUp();
+ @Override
+ public <T extends KogitoEngine> T get(Class<T> clazz) {
+ return (T) Mockito.mock(KogitoEngine.class);
}
@Override
- public ReactiveJobRepository tested() {
- return tested;
+ public UnitOfWorkManager unitOfWorkManager() {
+ UnitOfWorkManager uowm = Mockito.mock(UnitOfWorkManager.class);
+
Mockito.when(uowm.newUnitOfWork()).thenReturn(Mockito.mock(UnitOfWork.class));
+ return uowm;
}
+
}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
new file mode 100644
index 000000000..f5b43f7af
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.EventPublisher;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class TestEventPublisher implements EventPublisher {
+ private List<DataEvent<?>> events;
+
+ private CountDownLatch latch;
+
+ public List<DataEvent<?>> getEvents() {
+ return events;
+ }
+
+ public TestEventPublisher() {
+ events = new ArrayList<>();
+ }
+
+ @Override
+ public void publish(DataEvent<?> event) {
+ events.add(event);
+ latch.countDown();
+ }
+
+ @Override
+ public void publish(Collection<DataEvent<?>> events) {
+ events.addAll(events);
+ events.forEach(e -> latch.countDown());
+ }
+
+ public void expectedEvents(int numOfEvents) {
+ latch = new CountDownLatch(numOfEvents);
+ }
+
+}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
new file mode 100644
index 000000000..b99aaea71
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.kie.kogito.Model;
+import org.kie.kogito.process.Process;
+import org.kie.kogito.process.ProcessInstance;
+import org.kie.kogito.process.ProcessInstances;
+import org.kie.kogito.process.Processes;
+import org.mockito.Mockito;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class TestProcesses implements Processes {
+
+ @Override
+ public Process<? extends Model> processById(String processId) {
+ Process<? extends Model> process = Mockito.mock(Process.class);
+ ProcessInstances instances = Mockito.mock(ProcessInstances.class);
+ Mockito.when(process.instances()).thenReturn(instances);
+
+
Mockito.when(instances.findById(Mockito.any())).thenReturn(Optional.of(Mockito.mock(ProcessInstance.class)));
+ return process;
+ }
+
+ @Override
+ public Collection<String> processIds() {
+ return Collections.emptyList();
+ }
+
+}
diff --git a/jobs-service/kogito-addons-jobs-service/pom.xml
b/jobs-service/kogito-addons-jobs-service/pom.xml
index 88130ac2c..45b239713 100644
--- a/jobs-service/kogito-addons-jobs-service/pom.xml
+++ b/jobs-service/kogito-addons-jobs-service/pom.xml
@@ -19,8 +19,9 @@
under the License.
-->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito</groupId>
@@ -33,6 +34,7 @@
<packaging>pom</packaging>
<modules>
+ <module>kogito-addons-quarkus-jobs</module>
<module>kogito-addons-quarkus-jobs-service-embedded</module>
</modules>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]