This is an automated email from the ASF dual-hosted git repository.

egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new b8c93b8ae [incubator-kie-issues-711] Refactroing job service to allow 
collocated service for Quarkus (#1925)
b8c93b8ae is described below

commit b8c93b8ae2188f5c4ea51d54d27e235c6dfee286
Author: Enrique <[email protected]>
AuthorDate: Mon Jan 8 10:25:04 2024 +0100

    [incubator-kie-issues-711] Refactroing job service to allow collocated 
service for Quarkus (#1925)
    
    * [incubator-kie-issues-711] Refactroing job service to allow collocated 
service for Quarkus
    
    * fix debug information
    
    * fix in-vm events
    
    * set correct logger level
    
    * add lifecycle test
    
    * fix formatting
    
    * Quarkus 3 upgrades
    
    * - renames
    - cleanup
    
    * - Fixed alternatives
    
    * - Fix Data Audit Jobs Events
    - Format
    
    ---------
    
    Co-authored-by: Pere Fernández <[email protected]>
---
 data-audit/.gitignore                              | 167 ---------------------
 data-audit/data-audit-common/.gitignore            |   1 -
 .../kogito-addons-data-audit-jpa-common/.gitignore |   1 -
 data-index/data-index-common/.gitignore            |   1 -
 data-index/data-index-graphql/.gitignore           |   1 -
 .../data-index-service-common/.gitignore           |   1 -
 .../data-index-service-infinispan/.gitignore       |   1 -
 .../data-index-storage-postgresql/.gitignore       |   1 -
 .../kie/kogito/jobs/service/job/DelegateJob.java   |  12 +-
 .../json/DurationExpirationTimeDeserializer.java   |  61 ++++++++
 .../json/DurationExpirationTimeSerializer.java     |  51 +++++++
 .../json/ExactExpirationTimeDeserializer.java      |  47 ++++++
 .../json/ExactExpirationTimeSerializer.java        |  45 ++++++
 .../jobs/service/json/JacksonConfiguration.java    |  12 ++
 .../ProcessInstanceJobDescriptionDeserializer.java |  67 +++++++++
 .../ProcessInstanceJobDescriptionSerializer.java   |  52 +++++++
 .../repository/impl/BaseReactiveJobRepository.java |  12 +-
 .../repository/impl/InMemoryJobRepository.java     |   6 +-
 .../scheduler/impl/TimerDelegateJobScheduler.java  |  45 +-----
 .../jobs/service/stream/JobEventPublisher.java}    |  20 ++-
 ...bStreams.java => JobStreamsEventPublisher.java} |  42 +++++-
 .../repository/impl/BaseJobRepositoryTest.java     |   6 +-
 .../repository/impl/InMemoryJobRepositoryTest.java |   2 +-
 .../impl/TimerDelegateJobSchedulerTest.java        |  39 -----
 .../impl/VertxTimerServiceSchedulerTest.java       |   6 +-
 .../infinispan/InfinispanJobRepository.java        |   6 +-
 .../jobs/service/adapter/JobDetailsAdapter.java    |   9 --
 .../service/exception/JobExecutionException.java   |   5 +
 .../repository/mongodb/MongoDBJobRepository.java   |   6 +-
 .../postgresql/PostgreSqlJobRepository.java        |   6 +-
 .../src/main/resources/application.properties      |   1 -
 .../kogito-addons-quarkus-jobs/pom.xml             |  80 ++++++++++
 .../kogito/jobs/embedded/EmbeddedJobExecutor.java  |  83 ++++++++++
 .../jobs/embedded/EmbeddedJobServiceEvent.java}    |  20 +--
 .../kogito/jobs/embedded/EmbeddedJobsService.java  |  98 ++++++++++++
 .../kie/kogito/jobs/embedded/InVMPayloadData.java  |  53 +++++++
 .../kie/kogito/jobs/embedded/InVMRecipient.java}   |  40 ++---
 .../jobs/embedded/JobInVMEventPublisher.java       | 156 +++++++++++++++++++
 .../src/main/resources/META-INF/beans.xml          |   0
 .../src/main/resources/application.properties      |   1 +
 .../jobs/embedded/EmbeddedJobsServiceTests.java    |  65 ++++++++
 .../kie/kogito/jobs/embedded/TestApplication.java} |  39 ++---
 .../kogito/jobs/embedded/TestEventPublisher.java   |  61 ++++++++
 .../kie/kogito/jobs/embedded/TestProcesses.java    |  52 +++++++
 jobs-service/kogito-addons-jobs-service/pom.xml    |   6 +-
 45 files changed, 1132 insertions(+), 354 deletions(-)

diff --git a/data-audit/.gitignore b/data-audit/.gitignore
deleted file mode 100644
index a3191deaf..000000000
--- a/data-audit/.gitignore
+++ /dev/null
@@ -1,167 +0,0 @@
-/target/
-!.mvn/wrapper/maven-wrapper.jar
-
-### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
-
-
-ObjectStore/
-PutObjectStoreDirHere/
-
-# Created by https://www.gitignore.io/api/git,java,maven,eclipse,windows
-
-### Eclipse ###
-
-.metadata
-bin/
-tmp/
-*.tmp
-*.bak
-*.swp
-*~.nib
-local.properties
-.settings/
-.loadpath
-.recommenders
-
-# External tool builders
-.externalToolBuilders/
-
-# Locally stored "Eclipse launch configurations"
-*.launch
-
-# PyDev specific (Python IDE for Eclipse)
-*.pydevproject
-
-# CDT-specific (C/C++ Development Tooling)
-.cproject
-
-# CDT- autotools
-.autotools
-
-# Java annotation processor (APT)
-.factorypath
-
-# PDT-specific (PHP Development Tools)
-.buildpath
-
-# sbteclipse plugin
-.target
-
-# Tern plugin
-.tern-project
-
-# TeXlipse plugin
-.texlipse
-
-# STS (Spring Tool Suite)
-.springBeans
-
-# Code Recommenders
-.recommenders/
-
-# Annotation Processing
-.apt_generated/
-
-# Scala IDE specific (Scala & Java development for Eclipse)
-.cache-main
-.scala_dependencies
-.worksheet
-
-### Eclipse Patch ###
-# Eclipse Core
-.project
-
-# JDT-specific (Eclipse Java Development Tools)
-.classpath
-
-# Annotation Processing
-.apt_generated
-
-.sts4-cache/
-
-### Git ###
-# Created by git for backups. To disable backups in Git:
-# $ git config --global mergetool.keepBackup false
-*.orig
-
-# Created by git when using merge tools for conflicts
-*.BACKUP.*
-*.BASE.*
-*.LOCAL.*
-*.REMOTE.*
-*_BACKUP_*.txt
-*_BASE_*.txt
-*_LOCAL_*.txt
-*_REMOTE_*.txt
-
-### Java ###
-# Compiled class file
-*.class
-
-# Log file
-*.log
-
-# BlueJ files
-*.ctxt
-
-# Mobile Tools for Java (J2ME)
-.mtj.tmp/
-
-# Package Files #
-*.jar
-*.war
-*.nar
-*.ear
-*.zip
-*.tar.gz
-*.rar
-
-# virtual machine crash logs, see 
http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
-
-### Maven ###
-target/
-pom.xml.tag
-pom.xml.releaseBackup
-pom.xml.versionsBackup
-pom.xml.next
-release.properties
-dependency-reduced-pom.xml
-buildNumber.properties
-.mvn/timing.properties
-.mvn/wrapper/maven-wrapper.jar
-
-### Windows ###
-# Windows thumbnail cache files
-Thumbs.db
-ehthumbs.db
-ehthumbs_vista.db
-
-# Dump file
-*.stackdump
-
-# Folder config file
-[Dd]esktop.ini
-
-# Recycle Bin used on file shares
-$RECYCLE.BIN/
-
-# Windows Installer files
-*.cab
-*.msi
-*.msix
-*.msm
-*.msp
-
-# Windows shortcuts
-*.lnk
-
-
-# End of https://www.gitignore.io/api/git,java,maven,eclipse,windows
diff --git a/data-audit/data-audit-common/.gitignore 
b/data-audit/data-audit-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-audit/data-audit-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git 
a/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
 
b/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- 
a/data-audit/kogito-addons-data-audit-jpa/kogito-addons-data-audit-jpa-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-common/.gitignore 
b/data-index/data-index-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-graphql/.gitignore 
b/data-index/data-index-graphql/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-graphql/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git a/data-index/data-index-service/data-index-service-common/.gitignore 
b/data-index/data-index-service/data-index-service-common/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-service/data-index-service-common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git 
a/data-index/data-index-service/data-index-service-infinispan/.gitignore 
b/data-index/data-index-service/data-index-service-infinispan/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-service/data-index-service-infinispan/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git 
a/data-index/data-index-storage/data-index-storage-postgresql/.gitignore 
b/data-index/data-index-storage/data-index-storage-postgresql/.gitignore
deleted file mode 100644
index b83d22266..000000000
--- a/data-index/data-index-storage/data-index-storage-postgresql/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
index 0cd6cc081..20796efa2 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java
@@ -24,7 +24,7 @@ import 
org.kie.kogito.jobs.service.exception.JobExecutionException;
 import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
 import org.kie.kogito.jobs.service.model.JobDetailsContext;
 import org.kie.kogito.jobs.service.model.JobExecutionResponse;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.timer.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,11 +40,11 @@ public class DelegateJob implements Job<JobDetailsContext> {
 
     private final JobExecutorResolver jobExecutorResolver;
 
-    private final JobStreams jobStreams;
+    private final JobEventPublisher jobEventPublisher;
 
-    public DelegateJob(JobExecutorResolver executorResolver, JobStreams 
jobStreams) {
+    public DelegateJob(JobExecutorResolver executorResolver, JobEventPublisher 
jobEventPublisher) {
         this.jobExecutorResolver = executorResolver;
-        this.jobStreams = jobStreams;
+        this.jobEventPublisher = jobEventPublisher;
     }
 
     @Override
@@ -55,11 +55,11 @@ public class DelegateJob implements Job<JobDetailsContext> {
                 .map(jobExecutorResolver::get)
                 .map(executor -> executor.execute(ctx.getJobDetails()))
                 .orElseThrow(() -> new IllegalStateException("JobDetails 
cannot be null from context " + ctx))
-                .onItem().invoke(jobStreams::publishJobSuccess)
+                .onItem().invoke(jobEventPublisher::publishJobSuccess)
                 .onFailure(JobExecutionException.class).invoke(ex -> {
                     String jobId = ((JobExecutionException) ex).getJobId();
                     LOGGER.error("Error executing job {}", jobId, ex);
-                    jobStreams.publishJobError(JobExecutionResponse.builder()
+                    
jobEventPublisher.publishJobError(JobExecutionResponse.builder()
                             .message(ex.getMessage())
                             .now()
                             .jobId(jobId)
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
new file mode 100644
index 000000000..f23b7aab4
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeDeserializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.DurationExpirationTime;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class DurationExpirationTimeDeserializer extends 
StdDeserializer<DurationExpirationTime> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public DurationExpirationTimeDeserializer() {
+        super(DurationExpirationTime.class);
+    }
+
+    @Override
+    public DurationExpirationTime deserialize(JsonParser jp, 
DeserializationContext ctxt) throws IOException, JacksonException {
+
+        JsonNode node = jp.getCodec().readTree(jp);
+
+        ZonedDateTime time = ctxt.readTreeAsValue(node.get("expirationTime"), 
ZonedDateTime.class);
+        Long repeatInterval = null;
+        if (node.has("repeatInterval")) {
+            repeatInterval = node.get("repeatInterval").asLong();
+        }
+        Integer repeatLimit = null;
+        if (node.has("repeatLimit")) {
+            repeatLimit = node.get("repeatLimit").asInt(0);
+        }
+
+        Duration res = Duration.between(Instant.now(), time.toInstant());
+        return DurationExpirationTime.repeat(res.toMillis(), repeatInterval, 
repeatLimit);
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
new file mode 100644
index 000000000..f995dbfe4
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/DurationExpirationTimeSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.DurationExpirationTime;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class DurationExpirationTimeSerializer extends 
StdSerializer<DurationExpirationTime> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public DurationExpirationTimeSerializer() {
+        super(DurationExpirationTime.class);
+    }
+
+    @Override
+    public void serialize(DurationExpirationTime value, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
+        jgen.writeStartObject();
+        jgen.writeStringField("@type", DurationExpirationTime.class.getName());
+        jgen.writeObjectField("expirationTime", value.get());
+        if (value.repeatInterval() != null) {
+            jgen.writeNumberField("repeatInterval", value.repeatInterval());
+        }
+        if (value.repeatLimit() != null) {
+            jgen.writeNumberField("repeatLimit", value.repeatLimit());
+        }
+        jgen.writeEndObject();
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
new file mode 100644
index 000000000..32ffee185
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeDeserializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import org.kie.kogito.jobs.ExactExpirationTime;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class ExactExpirationTimeDeserializer extends 
StdDeserializer<ExactExpirationTime> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public ExactExpirationTimeDeserializer() {
+        super(ExactExpirationTime.class);
+    }
+
+    @Override
+    public ExactExpirationTime deserialize(JsonParser jp, 
DeserializationContext ctxt) throws IOException, JacksonException {
+        JsonNode node = jp.getCodec().readTree(jp);
+        ZonedDateTime time = ctxt.readTreeAsValue(node.get("expirationTime"), 
ZonedDateTime.class);
+        return ExactExpirationTime.of(time);
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
new file mode 100644
index 000000000..f77a6f8d8
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ExactExpirationTimeSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExactExpirationTime;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ExactExpirationTimeSerializer extends 
StdSerializer<ExactExpirationTime> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public ExactExpirationTimeSerializer() {
+        super(ExactExpirationTime.class);
+    }
+
+    @Override
+    public void serialize(ExactExpirationTime value, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
+        jgen.writeStartObject();
+        jgen.writeStringField("@type", ExactExpirationTime.class.getName());
+        jgen.writeObjectField("expirationTime", value.get());
+        jgen.writeEndObject();
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
index a3d5bc41e..cfbbd024a 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
@@ -18,12 +18,16 @@
  */
 package org.kie.kogito.jobs.service.json;
 
+import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.ExactExpirationTime;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
 import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import io.cloudevents.jackson.JsonFormat;
@@ -41,8 +45,16 @@ public class JacksonConfiguration {
     public ObjectMapperCustomizer customizer() {
         return objectMapper -> {
             LOGGER.debug("Jackson customization initialized.");
+            SimpleModule kogitoCustomModule = new SimpleModule();
+            
kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new 
ProcessInstanceJobDescriptionSerializer());
+            
kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new 
ProcessInstanceJobDescriptionDeserializer());
+            kogitoCustomModule.addSerializer(DurationExpirationTime.class, new 
DurationExpirationTimeSerializer());
+            kogitoCustomModule.addDeserializer(DurationExpirationTime.class, 
new DurationExpirationTimeDeserializer());
+            kogitoCustomModule.addSerializer(ExactExpirationTime.class, new 
ExactExpirationTimeSerializer());
+            kogitoCustomModule.addDeserializer(ExactExpirationTime.class, new 
ExactExpirationTimeDeserializer());
             objectMapper
                     .registerModule(new JavaTimeModule())
+                    .registerModule(kogitoCustomModule)
                     .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
                     
.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
                     .registerModule(JsonFormat.getCloudEventJacksonModule());
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
new file mode 100644
index 000000000..8b60fd644
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExpirationTime;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import static java.util.Optional.ofNullable;
+
+public class ProcessInstanceJobDescriptionDeserializer extends 
StdDeserializer<ProcessInstanceJobDescription> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public ProcessInstanceJobDescriptionDeserializer() {
+        super(ProcessInstanceJobDescription.class);
+    }
+
+    @Override
+    public ProcessInstanceJobDescription deserialize(JsonParser jp, 
DeserializationContext ctxt) throws IOException, JacksonException {
+        ProcessInstanceJobDescriptionBuilder builder = 
ProcessInstanceJobDescription.builder();
+
+        JsonNode node = jp.getCodec().readTree(jp);
+        ofNullable(node.get("id")).ifPresent(e -> builder.id(e.asText()));
+        ofNullable(node.get("timerId")).ifPresent(e -> 
builder.timerId(e.asText()));
+        ofNullable(node.get("priority")).ifPresent(e -> 
builder.priority(e.asInt()));
+        ofNullable(node.get("processInstanceId")).ifPresent(e -> 
builder.processInstanceId(e.asText()));
+        ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> 
builder.rootProcessInstanceId(e.asText()));
+        ofNullable(node.get("processId")).ifPresent(e -> 
builder.processId(e.asText()));
+        ofNullable(node.get("rootProcessId")).ifPresent(e -> 
builder.rootProcessId(e.asText()));
+        ofNullable(node.get("nodeInstanceId")).ifPresent(e -> 
builder.nodeInstanceId(e.asText()));
+
+        String type = node.get("expirationTime").get("@type").asText();
+        try {
+            builder.expirationTime((ExpirationTime) 
ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(type)));
+        } catch (ClassNotFoundException | IOException e1) {
+            e1.printStackTrace();
+        }
+
+        return builder.build();
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
new file mode 100644
index 000000000..379da2a0e
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ProcessInstanceJobDescriptionSerializer extends 
StdSerializer<ProcessInstanceJobDescription> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public ProcessInstanceJobDescriptionSerializer() {
+        super(ProcessInstanceJobDescription.class);
+    }
+
+    @Override
+    public void serialize(ProcessInstanceJobDescription value, JsonGenerator 
jgen, SerializerProvider provider) throws IOException {
+        jgen.writeStartObject();
+        jgen.writeStringField("id", value.id());
+        jgen.writeStringField("timerId", value.timerId());
+        jgen.writeObjectField("expirationTime", value.expirationTime());
+        jgen.writeNumberField("priority", value.priority());
+        jgen.writeStringField("processInstanceId", value.processInstanceId());
+        jgen.writeStringField("rootProcessInstanceId", 
value.rootProcessInstanceId());
+        jgen.writeStringField("processId", value.processId());
+        jgen.writeStringField("rootProcessId", value.rootProcessId());
+        jgen.writeStringField("nodeInstanceId", value.nodeInstanceId());
+        jgen.writeEndObject();
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
index 2d7a402ac..2271f46dc 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java
@@ -30,7 +30,7 @@ import 
org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobStatus;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 
 import io.vertx.core.Vertx;
 
@@ -38,11 +38,11 @@ public abstract class BaseReactiveJobRepository implements 
ReactiveJobRepository
 
     private Vertx vertx;
 
-    private JobStreams jobStreams;
+    private JobEventPublisher jobEventPublisher;
 
-    protected BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
+    protected BaseReactiveJobRepository(Vertx vertx, JobEventPublisher 
jobEventPublisher) {
         this.vertx = vertx;
-        this.jobStreams = jobStreams;
+        this.jobEventPublisher = jobEventPublisher;
     }
 
     public <T> CompletionStage<T> runAsync(Supplier<T> function) {
@@ -62,7 +62,7 @@ public abstract class BaseReactiveJobRepository implements 
ReactiveJobRepository
     @Override
     public CompletionStage<JobDetails> save(JobDetails job) {
         return doSave(job)
-                .thenApply(jobStreams::publishJobStatusChange);
+                .thenApply(jobEventPublisher::publishJobStatusChange);
     }
 
     public abstract CompletionStage<JobDetails> doSave(JobDetails job);
@@ -70,7 +70,7 @@ public abstract class BaseReactiveJobRepository implements 
ReactiveJobRepository
     @Override
     public CompletionStage<JobDetails> delete(JobDetails job) {
         return delete(job.getId())
-                .thenApply(j -> jobStreams.publishJobStatusChange(job));
+                .thenApply(j -> jobEventPublisher.publishJobStatusChange(job));
     }
 
     @Override
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
index c559eefa0..5b61e1f40 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java
@@ -33,7 +33,7 @@ import 
org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobStatus;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.jobs.service.utils.DateUtil;
 
 import io.quarkus.arc.DefaultBean;
@@ -53,8 +53,8 @@ public class InMemoryJobRepository extends 
BaseReactiveJobRepository implements
     }
 
     @Inject
-    public InMemoryJobRepository(Vertx vertx, JobStreams jobStreams) {
-        super(vertx, jobStreams);
+    public InMemoryJobRepository(Vertx vertx, JobEventPublisher 
jobEventPublisher) {
+        super(vertx, jobEventPublisher);
     }
 
     @Override
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
index 6f5f028b3..97a8f0a90 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java
@@ -20,24 +20,18 @@ package org.kie.kogito.jobs.service.scheduler.impl;
 
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.CompletionStage;
 
 import org.eclipse.microprofile.config.inject.ConfigProperty;
-import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
-import org.eclipse.microprofile.reactive.messaging.Incoming;
 import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
 import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
 import org.kie.kogito.jobs.service.job.DelegateJob;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobDetailsContext;
-import org.kie.kogito.jobs.service.model.JobExecutionResponse;
 import org.kie.kogito.jobs.service.model.ManageableJobHandle;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
 import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler;
-import org.kie.kogito.jobs.service.stream.AvailableStreams;
-import org.kie.kogito.jobs.service.stream.JobStreams;
-import org.kie.kogito.jobs.service.utils.ErrorHandling;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.timer.Trigger;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
@@ -58,7 +52,7 @@ public class TimerDelegateJobScheduler extends 
BaseTimerJobScheduler {
 
     private VertxTimerServiceScheduler delegate;
 
-    private JobStreams jobStreams;
+    private JobEventPublisher jobEventPublisher;
 
     protected TimerDelegateJobScheduler() {
     }
@@ -70,11 +64,11 @@ public class TimerDelegateJobScheduler extends 
BaseTimerJobScheduler {
             @ConfigProperty(name = 
"kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") long 
schedulerChunkInMinutes,
             @ConfigProperty(name = 
"kogito.jobs-service.forceExecuteExpiredJobs", defaultValue = "true") boolean 
forceExecuteExpiredJobs,
             JobExecutorResolver jobExecutorResolver, 
VertxTimerServiceScheduler delegate,
-            JobStreams jobStreams) {
+            JobEventPublisher jobEventPublisher) {
         super(jobRepository, backoffRetryMillis, 
maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, 
forceExecuteExpiredJobs);
         this.jobExecutorResolver = jobExecutorResolver;
         this.delegate = delegate;
-        this.jobStreams = jobStreams;
+        this.jobEventPublisher = jobEventPublisher;
     }
 
     @Override
@@ -82,7 +76,7 @@ public class TimerDelegateJobScheduler extends 
BaseTimerJobScheduler {
         LOGGER.debug("Job Scheduling {}", job);
         return ReactiveStreams
                 .of(job)
-                .map(j -> delegate.scheduleJob(new 
DelegateJob(jobExecutorResolver, jobStreams), new JobDetailsContext(j),
+                .map(j -> delegate.scheduleJob(new 
DelegateJob(jobExecutorResolver, jobEventPublisher), new JobDetailsContext(j),
                         trigger.orElse(j.getTrigger())));
     }
 
@@ -100,33 +94,4 @@ public class TimerDelegateJobScheduler extends 
BaseTimerJobScheduler {
                 .buildRs();
     }
 
-    //Stream Processors
-
-    @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
-    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
-    public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse 
response) {
-        LOGGER.warn("Error received {}", response);
-        return 
ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionError, response)
-                .findFirst()
-                .run()
-                .thenApply(Optional::isPresent)
-                .exceptionally(e -> {
-                    LOGGER.error("Error handling error {}", response, e);
-                    return false;
-                });
-    }
-
-    @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
-    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
-    public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse 
response) {
-        LOGGER.debug("Success received to be processed {}", response);
-        return 
ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionSuccess, 
response)
-                .findFirst()
-                .run()
-                .thenApply(Optional::isPresent)
-                .exceptionally(e -> {
-                    LOGGER.error("Error handling error {}", response, e);
-                    return false;
-                });
-    }
 }
diff --git 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
similarity index 66%
copy from 
jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
copy to 
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
index 3a65ab42e..4342cf00c 100644
--- 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java
@@ -16,18 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.jobs.service.exception;
+package org.kie.kogito.jobs.service.stream;
 
-public class JobExecutionException extends JobServiceException {
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
 
-    private String jobId;
+public interface JobEventPublisher {
 
-    public JobExecutionException(String jobId, String message) {
-        super(message);
-        this.jobId = jobId;
-    }
+    JobExecutionResponse publishJobError(JobExecutionResponse response);
 
-    public String getJobId() {
-        return jobId;
-    }
-}
\ No newline at end of file
+    JobExecutionResponse publishJobSuccess(JobExecutionResponse response);
+
+    JobDetails publishJobStatusChange(JobDetails scheduledJob);
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
similarity index 69%
rename from 
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
rename to 
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
index a951695b1..f8ef12319 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java
@@ -18,6 +18,9 @@
  */
 package org.kie.kogito.jobs.service.stream;
 
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
 import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
 import org.eclipse.microprofile.reactive.messaging.Channel;
 import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -26,6 +29,8 @@ import org.eclipse.microprofile.reactive.messaging.OnOverflow;
 import org.eclipse.microprofile.reactive.messaging.Outgoing;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.kie.kogito.jobs.service.utils.ErrorHandling;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,9 +44,12 @@ import jakarta.inject.Inject;
  * received item.
  */
 @ApplicationScoped
-public class JobStreams {
+public class JobStreamsEventPublisher implements JobEventPublisher {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobStreamsEventPublisher.class);
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobStreams.class);
+    @Inject
+    ReactiveJobScheduler scheduler;
 
     /**
      * Publish on Stream of Job Error events
@@ -82,8 +90,36 @@ public class JobStreams {
         return scheduledJob;
     }
 
-    //Broadcast Events from Emitter to Streams
+    //Stream Processors
+    @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
+    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+    public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse 
response) {
+        LOGGER.warn("Error received {}", response);
+        return 
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, 
response)
+                .findFirst()
+                .run()
+                .thenApply(Optional::isPresent)
+                .exceptionally(e -> {
+                    LOGGER.error("Error handling error {}", response, e);
+                    return false;
+                });
+    }
+
+    @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
+    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
+    public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse 
response) {
+        LOGGER.debug("Success received to be processed {}", response);
+        return 
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, 
response)
+                .findFirst()
+                .run()
+                .thenApply(Optional::isPresent)
+                .exceptionally(e -> {
+                    LOGGER.error("Error handling error {}", response, e);
+                    return false;
+                });
+    }
 
+    // Broadcast Events from Emitter to Streams
     @Incoming(AvailableStreams.JOB_ERROR)
     @Outgoing(AvailableStreams.JOB_ERROR_EVENTS)
     @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
index f992ca071..cfbadc4f8 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java
@@ -34,7 +34,7 @@ import org.kie.kogito.jobs.service.model.JobStatus;
 import org.kie.kogito.jobs.service.model.Recipient;
 import org.kie.kogito.jobs.service.model.RecipientInstance;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.jobs.service.utils.DateUtil;
 import org.kie.kogito.jobs.service.utils.FunctionsUtil;
 import org.kie.kogito.timer.impl.PointInTimeTrigger;
@@ -56,8 +56,8 @@ public abstract class BaseJobRepositoryTest {
         createAndSaveJob(ID);
     }
 
-    public JobStreams mockJobStreams() {
-        final JobStreams mock = mock(JobStreams.class);
+    public JobEventPublisher mockJobEventPublisher() {
+        final JobEventPublisher mock = mock(JobEventPublisher.class);
         
lenient().when(mock.publishJobStatusChange(any(JobDetails.class))).thenAnswer(a 
-> a.getArgument(0));
         
lenient().when(mock.publishJobSuccess(any(JobExecutionResponse.class))).thenAnswer(a
 -> a.getArgument(0));
         
lenient().when(mock.publishJobError(any(JobExecutionResponse.class))).thenAnswer(a
 -> a.getArgument(0));
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
index cb7f67f27..6b975c4f0 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
@@ -36,7 +36,7 @@ class InMemoryJobRepositoryTest extends BaseJobRepositoryTest 
{
 
     @BeforeEach
     public void setUp() throws Exception {
-        tested = new InMemoryJobRepository(vertx, mockJobStreams());
+        tested = new InMemoryJobRepository(vertx, mockJobEventPublisher());
         super.setUp();
     }
 
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
index 9c5ca3bc9..f272a790a 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java
@@ -22,7 +22,6 @@ import java.util.Optional;
 import java.util.UUID;
 
 import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
-import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -46,10 +45,8 @@ import org.reactivestreams.Publisher;
 import io.smallrye.mutiny.Multi;
 
 import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -99,42 +96,6 @@ class TimerDelegateJobSchedulerTest extends 
BaseTimerJobSchedulerTest {
         verify(timer, never()).removeJob(any(ManageableJobHandle.class));
     }
 
-    @Test
-    void testJobSuccessProcessor() {
-        JobExecutionResponse response = getJobResponse();
-        doReturn(ReactiveStreams.of(JobDetails.builder().build()))
-                .when(tested).handleJobExecutionSuccess(response);
-        tested.jobSuccessProcessor(response).thenAccept(r -> 
assertThat(r).isTrue());
-        verify(tested).handleJobExecutionSuccess(response);
-    }
-
-    @Test
-    void testJobSuccessProcessorFail() {
-        JobExecutionResponse response = getJobResponse();
-        doReturn(ReactiveStreams.failed(new RuntimeException()))
-                .when(tested).handleJobExecutionSuccess(response);
-        tested.jobSuccessProcessor(response).thenAccept(r -> 
assertThat(r).isFalse());
-        verify(tested).handleJobExecutionSuccess(response);
-    }
-
-    @Test
-    void testJobErrorProcessor() {
-        JobExecutionResponse response = getJobResponse();
-        doReturn(ReactiveStreams.of(JobDetails.builder().build()))
-                .when(tested).handleJobExecutionError(response);
-        tested.jobErrorProcessor(response).thenAccept(r -> 
assertThat(r).isTrue());
-        verify(tested).handleJobExecutionError(response);
-    }
-
-    @Test
-    void testJobErrorProcessorFail() {
-        JobExecutionResponse response = getJobResponse();
-        doReturn(ReactiveStreams.failed(new RuntimeException()))
-                .when(tested).handleJobExecutionError(response);
-        tested.jobErrorProcessor(response).thenAccept(r -> 
assertThat(r).isFalse());
-        verify(tested).handleJobExecutionError(response);
-    }
-
     private JobExecutionResponse getJobResponse() {
         return JobExecutionResponse.builder()
                 .jobId(UUID.randomUUID().toString())
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
index 4f7850245..3978fccb1 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
+++ 
b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java
@@ -29,7 +29,7 @@ import org.kie.kogito.jobs.service.job.DelegateJob;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobDetailsContext;
 import org.kie.kogito.jobs.service.model.ManageableJobHandle;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.jobs.service.utils.DateUtil;
 import org.kie.kogito.timer.Job;
 import org.kie.kogito.timer.JobContext;
@@ -62,7 +62,7 @@ class VertxTimerServiceSchedulerTest {
     private JobExecutorResolver jobExecutorResolver;
 
     @Mock
-    private JobStreams jobStreams;
+    private JobEventPublisher jobEventPublisher;
 
     @Captor
     private ArgumentCaptor<JobDetails> jobCaptor;
@@ -110,7 +110,7 @@ class VertxTimerServiceSchedulerTest {
         trigger = new PointInTimeTrigger(timestamp, null, null);
         jobDetails = JobDetails.builder().build();
         context = new JobDetailsContext(jobDetails);
-        job = new DelegateJob(jobExecutorResolver, jobStreams);
+        job = new DelegateJob(jobExecutorResolver, jobEventPublisher);
         return tested.scheduleJob(job, context, trigger);
     }
 
diff --git 
a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
 
b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
index 210e37435..8b9a25fa7 100644
--- 
a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
+++ 
b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java
@@ -35,7 +35,7 @@ import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobStatus;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
 import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 
 import io.vertx.core.Vertx;
 
@@ -58,9 +58,9 @@ public class InfinispanJobRepository extends 
BaseReactiveJobRepository implement
 
     @Inject
     public InfinispanJobRepository(Vertx vertx,
-            JobStreams jobStreams,
+            JobEventPublisher jobEventPublisher,
             RemoteCacheManager remoteCacheManager) {
-        super(vertx, jobStreams);
+        super(vertx, jobEventPublisher);
         this.remoteCacheManager = remoteCacheManager;
     }
 
diff --git 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
 
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
index 3149da797..1e112838a 100644
--- 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
+++ 
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/adapter/JobDetailsAdapter.java
@@ -29,8 +29,6 @@ import org.kie.kogito.jobs.service.api.Recipient;
 import org.kie.kogito.jobs.service.api.Retry;
 import org.kie.kogito.jobs.service.api.Schedule;
 import org.kie.kogito.jobs.service.api.TemporalUnit;
-import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
-import org.kie.kogito.jobs.service.api.recipient.sink.SinkRecipient;
 import org.kie.kogito.jobs.service.api.schedule.timer.TimerSchedule;
 import org.kie.kogito.jobs.service.model.JobDetails;
 import org.kie.kogito.jobs.service.model.JobStatus;
@@ -161,7 +159,6 @@ public class JobDetailsAdapter {
         }
 
         public static Recipient<?> toRecipient(JobDetails jobDetails) {
-            checkIsSupported(jobDetails.getRecipient().getRecipient());
             return jobDetails.getRecipient().getRecipient();
         }
 
@@ -170,15 +167,9 @@ public class JobDetailsAdapter {
         }
 
         public static org.kie.kogito.jobs.service.model.Recipient 
from(Recipient<?> recipient) {
-            checkIsSupported(recipient);
             return new RecipientInstance(recipient);
         }
 
-        static void checkIsSupported(Recipient<?> recipient) {
-            if (!(recipient instanceof HttpRecipient) && !(recipient 
instanceof SinkRecipient)) {
-                throw new UnsupportedOperationException("Only HttpRecipient 
and SinkRecipient are supported");
-            }
-        }
     }
 
     public static class RetryAdapter {
diff --git 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
 
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
index 3a65ab42e..830c227c6 100644
--- 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++ 
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
@@ -22,6 +22,11 @@ public class JobExecutionException extends 
JobServiceException {
 
     private String jobId;
 
+    public JobExecutionException(String jobId, String message, Throwable th) {
+        super(message, th);
+        this.jobId = jobId;
+    }
+
     public JobExecutionException(String jobId, String message) {
         super(message);
         this.jobId = jobId;
diff --git 
a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
 
b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
index 4d89e4226..6bc29ec4b 100644
--- 
a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
+++ 
b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java
@@ -30,7 +30,7 @@ import org.kie.kogito.jobs.service.model.JobStatus;
 import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
 import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
 import org.kie.kogito.jobs.service.repository.marshaller.JobDetailsMarshaller;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 
 import com.mongodb.client.model.FindOneAndReplaceOptions;
 
@@ -87,10 +87,10 @@ public class MongoDBJobRepository extends 
BaseReactiveJobRepository implements R
     }
 
     @Inject
-    public MongoDBJobRepository(Vertx vertx, JobStreams jobStreams, 
ReactiveMongoClient mongoClient,
+    public MongoDBJobRepository(Vertx vertx, JobEventPublisher 
jobEventPublisher, ReactiveMongoClient mongoClient,
             @ConfigProperty(name = DATABASE_PROPERTY) String database,
             JobDetailsMarshaller jobDetailsMarshaller) {
-        super(vertx, jobStreams);
+        super(vertx, jobEventPublisher);
         this.jobDetailsMarshaller = jobDetailsMarshaller;
         this.collection = 
mongoClient.getDatabase(database).getCollection(JOB_DETAILS_COLLECTION);
     }
diff --git 
a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
 
b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
index 132b35182..fa95138fa 100644
--- 
a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
+++ 
b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java
@@ -34,7 +34,7 @@ import 
org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
 import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
 import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
 import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
-import org.kie.kogito.jobs.service.stream.JobStreams;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
 import org.kie.kogito.jobs.service.utils.DateUtil;
 import org.kie.kogito.timer.Trigger;
 
@@ -74,9 +74,9 @@ public class PostgreSqlJobRepository extends 
BaseReactiveJobRepository implement
     }
 
     @Inject
-    public PostgreSqlJobRepository(Vertx vertx, JobStreams jobStreams, PgPool 
client,
+    public PostgreSqlJobRepository(Vertx vertx, JobEventPublisher 
jobEventPublisher, PgPool client,
             TriggerMarshaller triggerMarshaller, RecipientMarshaller 
recipientMarshaller) {
-        super(vertx, jobStreams);
+        super(vertx, jobEventPublisher);
         this.client = client;
         this.triggerMarshaller = triggerMarshaller;
         this.recipientMarshaller = recipientMarshaller;
diff --git 
a/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
 
b/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
index 4dc2184d4..5070c9069 100644
--- 
a/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
+++ 
b/jobs-service/jobs-service-postgresql/src/main/resources/application.properties
@@ -20,7 +20,6 @@
 quarkus.datasource.db-kind=postgresql
 quarkus.datasource.username=
 quarkus.datasource.password=
-quarkus.datasource.reactive.url=
 quarkus.datasource.jdbc.url=
 quarkus.flyway.migrate-at-start=true
 quarkus.datasource.health.enabled=true
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml
new file mode 100644
index 000000000..c090cb87e
--- /dev/null
+++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>kogito-addons-jobs-service</artifactId>
+        <groupId>org.kie.kogito</groupId>
+        <version>999-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kogito-addons-quarkus-jobs</artifactId>
+
+    <name>Jobs Collocated Quarkus Addon - Runtime</name>
+    <description>Run Jobs Service embedded with the application.</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-internal-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>kogito-services</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>kogito-timer</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-common</artifactId>
+
+            <exclusions>
+                <exclusion>
+                    <groupId>io.quarkus</groupId>
+                    <artifactId>quarkus-resteasy-reactive</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.quarkus</groupId>
+                    <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Testing dependencies -->
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-junit5-mockito</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.kie.kogito</groupId>
+            <artifactId>jobs-service-inmemory</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
new file mode 100644
index 000000000..bbc84c113
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import org.kie.kogito.Application;
+import org.kie.kogito.Model;
+import org.kie.kogito.jobs.service.api.Recipient;
+import org.kie.kogito.jobs.service.exception.JobExecutionException;
+import org.kie.kogito.jobs.service.executor.JobExecutor;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.RecipientInstance;
+import org.kie.kogito.process.Process;
+import org.kie.kogito.process.Processes;
+import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
+
+import io.smallrye.mutiny.Uni;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+@Alternative
+public class EmbeddedJobExecutor implements JobExecutor {
+
+    @Inject
+    Processes processes;
+
+    @Inject
+    Application application;
+
+    @Override
+    public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
+
+        String correlationId = jobDetails.getCorrelationId();
+        RecipientInstance recipientModel = (RecipientInstance) 
jobDetails.getRecipient();
+        InVMRecipient recipient = (InVMRecipient) 
recipientModel.getRecipient();
+        String timerId = recipient.getPayload().getData().timerId();
+        String processId = recipient.getPayload().getData().processId();
+        Process<? extends Model> process = processes.processById(processId);
+        String processInstanceId = 
recipient.getPayload().getData().processInstanceId();
+        Integer limit = jobDetails.getRetries();
+
+        TriggerJobCommand command = new TriggerJobCommand(processInstanceId, 
correlationId, timerId, limit, process, application.unitOfWorkManager());
+
+        return Uni.createFrom().item(command::execute)
+                .onFailure()
+                .transform(
+                        unexpected -> new 
JobExecutionException(jobDetails.getId(), "Unexpected error when executing 
Embedded request for job: " + jobDetails.getId() + ". " + 
unexpected.getMessage(),
+                                unexpected))
+                .onItem()
+                .transform(res -> JobExecutionResponse.builder()
+                        .message("Embedded job executed")
+                        .code(String.valueOf(200))
+                        .now()
+                        .jobId(jobDetails.getId())
+                        .build());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Class<? extends Recipient> type() {
+        return InVMRecipient.class;
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
similarity index 71%
copy from 
jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
copy to 
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
index 3a65ab42e..2c004389c 100644
--- 
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/exception/JobExecutionException.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobServiceEvent.java
@@ -16,18 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.jobs.service.exception;
+package org.kie.kogito.jobs.embedded;
 
-public class JobExecutionException extends JobServiceException {
+import org.kie.kogito.jobs.service.model.JobDetails;
 
-    private String jobId;
+public class EmbeddedJobServiceEvent {
 
-    public JobExecutionException(String jobId, String message) {
-        super(message);
-        this.jobId = jobId;
+    private JobDetails jobDetails;
+
+    public EmbeddedJobServiceEvent(JobDetails jobDetails) {
+        this.jobDetails = jobDetails;
     }
 
-    public String getJobId() {
-        return jobId;
+    public JobDetails getJobDetails() {
+        return jobDetails;
     }
-}
\ No newline at end of file
+
+}
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
new file mode 100644
index 000000000..95b9b0d92
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.concurrent.ExecutionException;
+
+import org.kie.kogito.jobs.JobsService;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.ProcessJobDescription;
+import org.kie.kogito.jobs.api.JobCallbackResourceDef;
+import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
+import org.kie.kogito.jobs.service.api.Job;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+
+import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher;
+
+@ApplicationScoped
+@Alternative
+public class EmbeddedJobsService implements JobsService {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedJobsService.class);
+
+    @Inject
+    ReactiveJobScheduler scheduler;
+
+    public EmbeddedJobsService() {
+        LOGGER.info("Starting Embedded Job Service");
+    }
+
+    @Override
+    public String scheduleProcessJob(ProcessJobDescription description) {
+        LOGGER.debug("ScheduleProcessJob: {} not supported", description);
+        return null;
+    }
+
+    @Override
+    public String scheduleProcessInstanceJob(ProcessInstanceJobDescription 
description) {
+        try {
+            Job job = Job.builder()
+                    .id(description.id())
+                    .correlationId(description.id())
+                    .recipient(new InVMRecipient(new 
InVMPayloadData(description)))
+                    
.schedule(JobCallbackResourceDef.buildSchedule(description))
+                    .build();
+
+            JobDetails jobDetails = JobDetailsAdapter.from(job);
+            LOGGER.debug("Embedded ScheduleProcessJob: {}", jobDetails);
+
+            String outcome = null;
+
+            JobDetails uni = 
Uni.createFrom().publisher(publisher(scheduler.schedule(jobDetails))).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).subscribe().asCompletionStage().get();
+            outcome = uni.getId();
+
+            LOGGER.debug("Embedded ScheduleProcessJob: {} scheduled", outcome);
+            return outcome;
+        } catch (InterruptedException | ExecutionException e) {
+            LOGGER.error("interrupted execution", e);
+            return null;
+        }
+    }
+
+    @Override
+    public boolean cancelJob(String jobId) {
+        try {
+            LOGGER.debug("Embedded cancelJob: {}", jobId);
+            return 
JobStatus.CANCELED.equals(scheduler.cancel(jobId).toCompletableFuture().get().getStatus());
+        } catch (InterruptedException | ExecutionException e) {
+            return false;
+        }
+    }
+
+}
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
new file mode 100644
index 000000000..cb49a135b
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.service.api.PayloadData;
+
+public class InVMPayloadData extends 
PayloadData<ProcessInstanceJobDescription> {
+
+    private ProcessInstanceJobDescription jobDescription;
+
+    public InVMPayloadData() {
+        // do nothing
+    }
+
+    public void setJobDescription(ProcessInstanceJobDescription 
jobDescription) {
+        this.jobDescription = jobDescription;
+    }
+
+    public ProcessInstanceJobDescription getJobDescription() {
+        return jobDescription;
+    }
+
+    @Override
+    public ProcessInstanceJobDescription getData() {
+        return jobDescription;
+    }
+
+    public InVMPayloadData(ProcessInstanceJobDescription data) {
+        this.jobDescription = data;
+    }
+
+    @Override
+    public String toString() {
+        return "InVMPayloadData [data=" + jobDescription + "]";
+    }
+}
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
similarity index 56%
copy from 
jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
copy to 
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
index cb7f67f27..39b700aea 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMRecipient.java
@@ -16,32 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.jobs.service.repository.impl;
+package org.kie.kogito.jobs.embedded;
 
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.api.Recipient;
 
-import io.vertx.core.Vertx;
+public class InVMRecipient extends Recipient<InVMPayloadData> {
 
-class InMemoryJobRepositoryTest extends BaseJobRepositoryTest {
+    private InVMPayloadData data;
 
-    private InMemoryJobRepository tested;
-    private static Vertx vertx;
+    public InVMRecipient() {
+        // do nothing
+    }
+
+    public void setData(InVMPayloadData data) {
+        this.data = data;
+    }
 
-    @BeforeAll
-    static void init() {
-        vertx = Vertx.vertx();
+    public InVMPayloadData getData() {
+        return data;
     }
 
-    @BeforeEach
-    public void setUp() throws Exception {
-        tested = new InMemoryJobRepository(vertx, mockJobStreams());
-        super.setUp();
+    public InVMRecipient(InVMPayloadData data) {
+        this.data = data;
     }
 
     @Override
-    public ReactiveJobRepository tested() {
-        return tested;
+    public InVMPayloadData getPayload() {
+        return data;
     }
+
+    @Override
+    public String toString() {
+        return "InVMRecipient [data=" + data + "]";
+    }
+
 }
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
new file mode 100644
index 000000000..814db7e91
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.embedded;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.event.job.JobInstanceDataEvent;
+import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobExecutionResponse;
+import org.kie.kogito.jobs.service.model.ScheduledJob;
+import org.kie.kogito.jobs.service.resource.RestApiConstants;
+import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
+import org.kie.kogito.jobs.service.utils.ErrorHandling;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Event;
+import jakarta.enterprise.event.ObservesAsync;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+import static java.util.stream.Collectors.toList;
+import static org.kie.kogito.jobs.service.events.JobDataEvent.JOB_EVENT_TYPE;
+
+@ApplicationScoped
+@Alternative
+public class JobInVMEventPublisher implements JobEventPublisher {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobInVMEventPublisher.class);
+
+    private final String url;
+
+    private final List<EventPublisher> eventPublishers;
+
+    private final ObjectMapper objectMapper;
+
+    @Inject
+    ReactiveJobScheduler scheduler;
+
+    @Inject
+    Event<EmbeddedJobServiceEvent> bus;
+
+    public JobInVMEventPublisher(
+            @ConfigProperty(name = "kogito.service.url", defaultValue = 
"http://localhost:8080";) String url,
+            Instance<EventPublisher> eventPublishers,
+            ObjectMapper objectMapper) {
+        this.url = url;
+        this.eventPublishers = eventPublishers.stream().collect(toList());
+        this.objectMapper = objectMapper;
+        LOGGER.info("JobInVMEventPublisher Started with url {}", url);
+    }
+
+    @Override
+    public JobExecutionResponse publishJobError(JobExecutionResponse response) 
{
+        try {
+            LOGGER.debug("publishJobError {}", response);
+
+            
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, 
response)
+                    .findFirst()
+                    .run()
+                    .thenApply(Optional::isPresent)
+                    .exceptionally(e -> {
+                        LOGGER.error("Error handling error {}", response, e);
+                        return false;
+                    }).toCompletableFuture().get();
+
+            return response;
+        } catch (Exception e) {
+            LOGGER.error("error in publishJobError", e);
+            return response;
+        }
+    }
+
+    @Override
+    public JobExecutionResponse publishJobSuccess(JobExecutionResponse 
response) {
+        try {
+            LOGGER.debug("publishJobSuccess {}", response);
+            
ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, 
response)
+                    .findFirst()
+                    .run()
+                    .thenApply(Optional::isPresent)
+                    .exceptionally(e -> {
+                        LOGGER.error("Error handling error {}", response, e);
+                        return false;
+                    }).toCompletableFuture().get();
+
+            return response;
+        } catch (Exception e) {
+            LOGGER.error("error in publishJobSuccess", e);
+            return response;
+        }
+    }
+
+    @Override
+    public JobDetails publishJobStatusChange(JobDetails jobDetails) {
+        try {
+            LOGGER.debug("publishJobStatusChange {}", jobDetails);
+            if (eventPublishers.isEmpty()) {
+                return jobDetails;
+            }
+
+            bus.fireAsync(new 
EmbeddedJobServiceEvent(jobDetails)).toCompletableFuture().get();
+            return jobDetails;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) {
+        JobDetails jobDetails = serviceEvent.getJobDetails();
+        LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails);
+        try {
+            ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
+            byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
+            JobInstanceDataEvent event = new 
JobInstanceDataEvent(JOB_EVENT_TYPE,
+                    url + RestApiConstants.JOBS_PATH,
+                    jsonContent,
+                    scheduledJob.getProcessInstanceId(),
+                    scheduledJob.getRootProcessInstanceId(),
+                    scheduledJob.getProcessId(),
+                    scheduledJob.getRootProcessId(),
+                    null);
+
+            eventPublishers.forEach(e -> e.publish(event));
+        } catch (Exception e) {
+            LOGGER.error("Job status change propagation has failed at 
eventPublisher: " + eventPublishers.getClass() + " execution.", e);
+        }
+    }
+}
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/META-INF/beans.xml
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..e69de29bb
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
new file mode 100644
index 000000000..81cb27f1d
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/resources/application.properties
@@ -0,0 +1 @@
+quarkus.arc.selected-alternatives=org.kie.kogito.jobs.embedded.*
\ No newline at end of file
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
new file mode 100644
index 000000000..b602a9dff
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.JobsService;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+@QuarkusTest
+public class EmbeddedJobsServiceTests {
+
+    @Inject
+    JobsService jobService;
+
+    @Inject
+    TestEventPublisher publisher;
+
+    @Test
+    public void testJobService() throws Exception {
+        // testing only we have the full lifecycle
+        publisher.expectedEvents(2);
+
+        ProcessInstanceJobDescription description = 
ProcessInstanceJobDescription.builder()
+                .generateId()
+                .timerId("-1")
+                .expirationTime(DurationExpirationTime.now())
+                .processInstanceId("1")
+                .rootProcessInstanceId(null)
+                .processId("processId")
+                .rootProcessId(null)
+                .nodeInstanceId("node_1")
+                .build();
+        jobService.scheduleProcessInstanceJob(description);
+
+        List<DataEvent<?>> events = publisher.getEvents();
+        Assertions.assertEquals(2, events.size());
+
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
similarity index 50%
copy from 
jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
copy to 
jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
index cb7f67f27..62be9cfd9 100644
--- 
a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepositoryTest.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestApplication.java
@@ -16,32 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.jobs.service.repository.impl;
+package org.kie.kogito.jobs.embedded;
 
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.Application;
+import org.kie.kogito.Config;
+import org.kie.kogito.KogitoEngine;
+import org.kie.kogito.uow.UnitOfWork;
+import org.kie.kogito.uow.UnitOfWorkManager;
+import org.mockito.Mockito;
 
-import io.vertx.core.Vertx;
+import jakarta.enterprise.context.ApplicationScoped;
 
-class InMemoryJobRepositoryTest extends BaseJobRepositoryTest {
+@ApplicationScoped
+public class TestApplication implements Application {
 
-    private InMemoryJobRepository tested;
-    private static Vertx vertx;
-
-    @BeforeAll
-    static void init() {
-        vertx = Vertx.vertx();
+    @Override
+    public Config config() {
+        return Mockito.mock(Config.class);
     }
 
-    @BeforeEach
-    public void setUp() throws Exception {
-        tested = new InMemoryJobRepository(vertx, mockJobStreams());
-        super.setUp();
+    @Override
+    public <T extends KogitoEngine> T get(Class<T> clazz) {
+        return (T) Mockito.mock(KogitoEngine.class);
     }
 
     @Override
-    public ReactiveJobRepository tested() {
-        return tested;
+    public UnitOfWorkManager unitOfWorkManager() {
+        UnitOfWorkManager uowm = Mockito.mock(UnitOfWorkManager.class);
+        
Mockito.when(uowm.newUnitOfWork()).thenReturn(Mockito.mock(UnitOfWork.class));
+        return uowm;
     }
+
 }
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
new file mode 100644
index 000000000..f5b43f7af
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.EventPublisher;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class TestEventPublisher implements EventPublisher {
+    private List<DataEvent<?>> events;
+
+    private CountDownLatch latch;
+
+    public List<DataEvent<?>> getEvents() {
+        return events;
+    }
+
+    public TestEventPublisher() {
+        events = new ArrayList<>();
+    }
+
+    @Override
+    public void publish(DataEvent<?> event) {
+        events.add(event);
+        latch.countDown();
+    }
+
+    @Override
+    public void publish(Collection<DataEvent<?>> events) {
+        events.addAll(events);
+        events.forEach(e -> latch.countDown());
+    }
+
+    public void expectedEvents(int numOfEvents) {
+        latch = new CountDownLatch(numOfEvents);
+    }
+
+}
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
new file mode 100644
index 000000000..b99aaea71
--- /dev/null
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestProcesses.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.kie.kogito.Model;
+import org.kie.kogito.process.Process;
+import org.kie.kogito.process.ProcessInstance;
+import org.kie.kogito.process.ProcessInstances;
+import org.kie.kogito.process.Processes;
+import org.mockito.Mockito;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class TestProcesses implements Processes {
+
+    @Override
+    public Process<? extends Model> processById(String processId) {
+        Process<? extends Model> process = Mockito.mock(Process.class);
+        ProcessInstances instances = Mockito.mock(ProcessInstances.class);
+        Mockito.when(process.instances()).thenReturn(instances);
+
+        
Mockito.when(instances.findById(Mockito.any())).thenReturn(Optional.of(Mockito.mock(ProcessInstance.class)));
+        return process;
+    }
+
+    @Override
+    public Collection<String> processIds() {
+        return Collections.emptyList();
+    }
+
+}
diff --git a/jobs-service/kogito-addons-jobs-service/pom.xml 
b/jobs-service/kogito-addons-jobs-service/pom.xml
index 88130ac2c..45b239713 100644
--- a/jobs-service/kogito-addons-jobs-service/pom.xml
+++ b/jobs-service/kogito-addons-jobs-service/pom.xml
@@ -19,8 +19,9 @@
     under the License.
 
 -->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
-         xmlns="http://maven.apache.org/POM/4.0.0";>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+    xmlns="http://maven.apache.org/POM/4.0.0";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.kie.kogito</groupId>
@@ -33,6 +34,7 @@
     <packaging>pom</packaging>
 
     <modules>
+        <module>kogito-addons-quarkus-jobs</module>
         <module>kogito-addons-quarkus-jobs-service-embedded</module>
     </modules>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to