This is an automated email from the ASF dual-hosted git repository.

egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new f7856466d [incubator-kie-issues-1551] Deadlines for Human Task (#2128)
f7856466d is described below

commit f7856466d98cce8c7e69896c623bc1e79c90326d
Author: Enrique <[email protected]>
AuthorDate: Mon Nov 18 14:09:58 2024 +0100

    [incubator-kie-issues-1551] Deadlines for Human Task (#2128)
---
 .../jobs/service/json/JacksonConfiguration.java    |  6 +-
 .../service/json/JobDescriptionDeserializer.java   | 85 ++++++++++++++++++++++
 ...rializer.java => JobDescriptionSerializer.java} | 32 +++++---
 .../ProcessInstanceJobDescriptionDeserializer.java | 67 -----------------
 .../kogito/jobs/embedded/EmbeddedJobExecutor.java  | 79 ++++++++++++++++----
 .../kogito/jobs/embedded/EmbeddedJobsService.java  | 11 +--
 .../kie/kogito/jobs/embedded/InVMPayloadData.java  | 14 ++--
 .../jobs/embedded/JobInVMEventPublisher.java       | 19 +++--
 .../jobs/embedded/EmbeddedJobsServiceTest.java     | 10 +--
 9 files changed, 197 insertions(+), 126 deletions(-)

diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
index cfbbd024a..f1603c1ef 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
@@ -20,7 +20,7 @@ package org.kie.kogito.jobs.service.json;
 
 import org.kie.kogito.jobs.DurationExpirationTime;
 import org.kie.kogito.jobs.ExactExpirationTime;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
 import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +46,8 @@ public class JacksonConfiguration {
         return objectMapper -> {
             LOGGER.debug("Jackson customization initialized.");
             SimpleModule kogitoCustomModule = new SimpleModule();
-            
kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new 
ProcessInstanceJobDescriptionSerializer());
-            
kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new 
ProcessInstanceJobDescriptionDeserializer());
+            kogitoCustomModule.addSerializer(JobDescription.class, new 
JobDescriptionSerializer());
+            kogitoCustomModule.addDeserializer(JobDescription.class, new 
JobDescriptionDeserializer());
             kogitoCustomModule.addSerializer(DurationExpirationTime.class, new 
DurationExpirationTimeSerializer());
             kogitoCustomModule.addDeserializer(DurationExpirationTime.class, 
new DurationExpirationTimeDeserializer());
             kogitoCustomModule.addSerializer(ExactExpirationTime.class, new 
ExactExpirationTimeSerializer());
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
new file mode 100644
index 000000000..f0a72db2b
--- /dev/null
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExpirationTime;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import static java.util.Optional.ofNullable;
+
+public class JobDescriptionDeserializer extends 
StdDeserializer<JobDescription> {
+
+    private static final long serialVersionUID = -8307549297456060422L;
+
+    public JobDescriptionDeserializer() {
+        super(ProcessInstanceJobDescription.class);
+    }
+
+    @Override
+    public JobDescription deserialize(JsonParser jp, DeserializationContext 
ctxt) throws IOException, JacksonException {
+        try {
+            JsonNode node = jp.getCodec().readTree(jp);
+            String jobDescriptionType = node.get("@type").asText();
+            switch (jobDescriptionType) {
+                case "ProcessInstanceJobDescription": {
+                    ProcessInstanceJobDescriptionBuilder builder = 
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder();
+                    ofNullable(node.get("id")).ifPresent(e -> 
builder.id(e.textValue()));
+                    ofNullable(node.get("priority")).ifPresent(e -> 
builder.priority(e.asInt()));
+                    String expirationTimeType = 
node.get("expirationTime").get("@type").asText();
+                    builder.expirationTime((ExpirationTime) 
ctxt.readTreeAsValue(node.get("expirationTime"), 
Class.forName(expirationTimeType)));
+
+                    ofNullable(node.get("timerId")).ifPresent(e -> 
builder.timerId(e.textValue()));
+                    ofNullable(node.get("processInstanceId")).ifPresent(e -> 
builder.processInstanceId(e.textValue()));
+                    ofNullable(node.get("rootProcessInstanceId")).ifPresent(e 
-> builder.rootProcessInstanceId(e.textValue()));
+                    ofNullable(node.get("processId")).ifPresent(e -> 
builder.processId(e.textValue()));
+                    ofNullable(node.get("rootProcessId")).ifPresent(e -> 
builder.rootProcessId(e.textValue()));
+                    ofNullable(node.get("nodeInstanceId")).ifPresent(e -> 
builder.nodeInstanceId(e.textValue()));
+
+                    return builder.build();
+                }
+                case "UserTaskInstanceJobDescription": {
+                    UserTaskInstanceJobDescriptionBuilder builder = 
UserTaskInstanceJobDescription.newUserTaskInstanceJobDescriptionBuilder();
+                    ofNullable(node.get("id")).ifPresent(e -> 
builder.id(e.textValue()));
+                    ofNullable(node.get("priority")).ifPresent(e -> 
builder.priority(e.asInt()));
+                    String expirationTimeType = 
node.get("expirationTime").get("@type").asText();
+                    builder.expirationTime((ExpirationTime) 
ctxt.readTreeAsValue(node.get("expirationTime"), 
Class.forName(expirationTimeType)));
+
+                    ofNullable(node.get("userTaskInstanceId")).ifPresent(e -> 
builder.userTaskInstanceId(e.textValue()));
+                    return builder.build();
+                }
+            }
+        } catch (ClassNotFoundException e1) {
+            throw new IllegalArgumentException("expiration time class not 
found", e1);
+        }
+        return null;
+    }
+
+}
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
similarity index 50%
rename from 
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
rename to 
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
index 379da2a0e..22935e0e3 100644
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
+++ 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
@@ -20,32 +20,40 @@ package org.kie.kogito.jobs.service.json;
 
 import java.io.IOException;
 
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
-public class ProcessInstanceJobDescriptionSerializer extends 
StdSerializer<ProcessInstanceJobDescription> {
+public class JobDescriptionSerializer extends StdSerializer<JobDescription> {
 
     private static final long serialVersionUID = -8307549297456060422L;
 
-    public ProcessInstanceJobDescriptionSerializer() {
-        super(ProcessInstanceJobDescription.class);
+    public JobDescriptionSerializer() {
+        super(JobDescription.class);
     }
 
     @Override
-    public void serialize(ProcessInstanceJobDescription value, JsonGenerator 
jgen, SerializerProvider provider) throws IOException {
+    public void serialize(JobDescription value, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
         jgen.writeStartObject();
+        jgen.writeStringField("@type", value.getClass().getSimpleName());
         jgen.writeStringField("id", value.id());
-        jgen.writeStringField("timerId", value.timerId());
-        jgen.writeObjectField("expirationTime", value.expirationTime());
         jgen.writeNumberField("priority", value.priority());
-        jgen.writeStringField("processInstanceId", value.processInstanceId());
-        jgen.writeStringField("rootProcessInstanceId", 
value.rootProcessInstanceId());
-        jgen.writeStringField("processId", value.processId());
-        jgen.writeStringField("rootProcessId", value.rootProcessId());
-        jgen.writeStringField("nodeInstanceId", value.nodeInstanceId());
+        jgen.writeObjectField("expirationTime", value.expirationTime());
+        if (value instanceof ProcessInstanceJobDescription jobDescription) {
+            jgen.writeStringField("timerId", jobDescription.timerId());
+            jgen.writeStringField("processInstanceId", 
jobDescription.processInstanceId());
+            jgen.writeStringField("rootProcessInstanceId", 
jobDescription.rootProcessInstanceId());
+            jgen.writeStringField("processId", jobDescription.processId());
+            jgen.writeStringField("rootProcessId", 
jobDescription.rootProcessId());
+            jgen.writeStringField("nodeInstanceId", 
jobDescription.nodeInstanceId());
+            jgen.writeEndObject();
+        } else if (value instanceof UserTaskInstanceJobDescription 
userTaskInstanceJobDescription) {
+            jgen.writeStringField("userTaskInstanceId", 
userTaskInstanceJobDescription.getUserTaskInstanceId());
+        }
         jgen.writeEndObject();
     }
 
diff --git 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
 
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
deleted file mode 100644
index 6ccfb99f4..000000000
--- 
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kie.kogito.jobs.service.json;
-
-import java.io.IOException;
-
-import org.kie.kogito.jobs.ExpirationTime;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
-import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder;
-
-import com.fasterxml.jackson.core.JacksonException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import static java.util.Optional.ofNullable;
-
-public class ProcessInstanceJobDescriptionDeserializer extends 
StdDeserializer<ProcessInstanceJobDescription> {
-
-    private static final long serialVersionUID = -8307549297456060422L;
-
-    public ProcessInstanceJobDescriptionDeserializer() {
-        super(ProcessInstanceJobDescription.class);
-    }
-
-    @Override
-    public ProcessInstanceJobDescription deserialize(JsonParser jp, 
DeserializationContext ctxt) throws IOException, JacksonException {
-        ProcessInstanceJobDescriptionBuilder builder = 
ProcessInstanceJobDescription.builder();
-
-        JsonNode node = jp.getCodec().readTree(jp);
-        ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
-        ofNullable(node.get("timerId")).ifPresent(e -> 
builder.timerId(e.textValue()));
-        ofNullable(node.get("priority")).ifPresent(e -> 
builder.priority(e.asInt()));
-        ofNullable(node.get("processInstanceId")).ifPresent(e -> 
builder.processInstanceId(e.textValue()));
-        ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> 
builder.rootProcessInstanceId(e.textValue()));
-        ofNullable(node.get("processId")).ifPresent(e -> 
builder.processId(e.textValue()));
-        ofNullable(node.get("rootProcessId")).ifPresent(e -> 
builder.rootProcessId(e.textValue()));
-        ofNullable(node.get("nodeInstanceId")).ifPresent(e -> 
builder.nodeInstanceId(e.textValue()));
-
-        String type = node.get("expirationTime").get("@type").asText();
-        try {
-            builder.expirationTime((ExpirationTime) 
ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(type)));
-        } catch (ClassNotFoundException | IOException e1) {
-            e1.printStackTrace();
-        }
-
-        return builder.build();
-    }
-
-}
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
index ead3bbf2a..961e0485a 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -19,9 +19,13 @@
 package org.kie.kogito.jobs.embedded;
 
 import java.util.Optional;
+import java.util.function.Supplier;
 
 import org.kie.kogito.Application;
 import org.kie.kogito.Model;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
 import org.kie.kogito.jobs.service.api.Recipient;
 import org.kie.kogito.jobs.service.exception.JobExecutionException;
 import org.kie.kogito.jobs.service.executor.JobExecutor;
@@ -31,40 +35,81 @@ import org.kie.kogito.jobs.service.model.RecipientInstance;
 import org.kie.kogito.process.Process;
 import org.kie.kogito.process.Processes;
 import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
+import org.kie.kogito.usertask.UserTaskInstance;
+import org.kie.kogito.usertask.UserTasks;
 
 import io.smallrye.mutiny.Uni;
 
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.enterprise.inject.Alternative;
+import jakarta.enterprise.inject.Instance;
 import jakarta.inject.Inject;
 
+import static 
org.kie.kogito.services.uow.UnitOfWorkExecutor.executeInUnitOfWork;
+
 @ApplicationScoped
 @Alternative
 public class EmbeddedJobExecutor implements JobExecutor {
 
     @Inject
-    Processes processes;
+    Instance<Processes> processes;
+
+    @Inject
+    Instance<UserTasks> userTasks;
 
     @Inject
     Application application;
 
     @Override
     public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
-
-        String correlationId = jobDetails.getCorrelationId();
         RecipientInstance recipientModel = (RecipientInstance) 
jobDetails.getRecipient();
         InVMRecipient recipient = (InVMRecipient) 
recipientModel.getRecipient();
-        String timerId = recipient.getPayload().getData().timerId();
-        String processInstanceId = 
recipient.getPayload().getData().processInstanceId();
-        Optional<Process<? extends Model>> process;
-        try {
-            process = processes.processByProcessInstanceId(processInstanceId);
-        } catch (Exception ex) {
-            return Uni.createFrom().failure(
-                    new JobExecutionException(jobDetails.getId(),
-                            "Unexpected error when executing Embedded request 
for job: " + jobDetails.getId() + ". " + ex.getMessage(),
-                            ex));
+        JobDescription jobDescription = recipient.getPayload().getData();
+        if (jobDescription instanceof ProcessInstanceJobDescription 
processInstanceJobDescription && processes.isResolvable()) {
+            return processJobDescription(jobDetails, 
processInstanceJobDescription);
+        } else if (jobDescription instanceof UserTaskInstanceJobDescription 
userTaskInstanceJobDescription && userTasks.isResolvable()) {
+            return processJobDescription(jobDetails, 
userTaskInstanceJobDescription);
         }
+
+        return Uni.createFrom().item(
+                JobExecutionResponse.builder()
+                        .code("401")
+                        .jobId(jobDetails.getId())
+                        .now()
+                        .message("job cannot be processed")
+                        .build());
+    }
+
+    private Uni<JobExecutionResponse> processJobDescription(JobDetails 
jobDetails, UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
+        Supplier<Void> execute = () -> 
executeInUnitOfWork(application.unitOfWorkManager(), () -> {
+            Optional<UserTaskInstance> userTaskInstance = 
userTasks.get().instances().findById(userTaskInstanceJobDescription.getUserTaskInstanceId());
+            if (userTaskInstance.isEmpty()) {
+                return null;
+            }
+            UserTaskInstance instance = userTaskInstance.get();
+            instance.trigger(userTaskInstanceJobDescription);
+            return null;
+        });
+
+        return Uni.createFrom().item(execute)
+                .onFailure()
+                .transform(
+                        unexpected -> new 
JobExecutionException(jobDetails.getId(), "Unexpected error when executing 
Embedded request for job: " + jobDetails.getId() + ". " + 
unexpected.getMessage(),
+                                unexpected))
+                .onItem()
+                .transform(res -> JobExecutionResponse.builder()
+                        .message("Embedded job executed")
+                        .code(String.valueOf(200))
+                        .now()
+                        .jobId(jobDetails.getId())
+                        .build());
+
+    }
+
+    private Uni<JobExecutionResponse> processJobDescription(JobDetails 
jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
+        String timerId = processInstanceJobDescription.timerId();
+        String processInstanceId = 
processInstanceJobDescription.processInstanceId();
+        Optional<Process<? extends Model>> process = 
processes.get().processByProcessInstanceId(processInstanceId);
         if (process.isEmpty()) {
             return Uni.createFrom().item(
                     JobExecutionResponse.builder()
@@ -77,9 +122,13 @@ public class EmbeddedJobExecutor implements JobExecutor {
 
         Integer limit = jobDetails.getRetries();
 
-        TriggerJobCommand command = new TriggerJobCommand(processInstanceId, 
correlationId, timerId, limit, process.get(), application.unitOfWorkManager());
+        Supplier<Boolean> execute = () -> 
executeInUnitOfWork(application.unitOfWorkManager(), () -> {
+            TriggerJobCommand command = new 
TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, 
limit, process.get(), application.unitOfWorkManager());
+            return command.execute();
+        });
 
-        return Uni.createFrom().item(command::execute)
+        return Uni.createFrom()
+                .item(execute)
                 .onFailure()
                 .transform(
                         unexpected -> new 
JobExecutionException(jobDetails.getId(), "Unexpected error when executing 
Embedded request for job: " + jobDetails.getId() + ". " + 
unexpected.getMessage(),
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
index 95b9b0d92..84754c6dc 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
@@ -20,9 +20,8 @@ package org.kie.kogito.jobs.embedded;
 
 import java.util.concurrent.ExecutionException;
 
+import org.kie.kogito.jobs.JobDescription;
 import org.kie.kogito.jobs.JobsService;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
-import org.kie.kogito.jobs.ProcessJobDescription;
 import org.kie.kogito.jobs.api.JobCallbackResourceDef;
 import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
 import org.kie.kogito.jobs.service.api.Job;
@@ -54,13 +53,7 @@ public class EmbeddedJobsService implements JobsService {
     }
 
     @Override
-    public String scheduleProcessJob(ProcessJobDescription description) {
-        LOGGER.debug("ScheduleProcessJob: {} not supported", description);
-        return null;
-    }
-
-    @Override
-    public String scheduleProcessInstanceJob(ProcessInstanceJobDescription 
description) {
+    public String scheduleJob(JobDescription description) {
         try {
             Job job = Job.builder()
                     .id(description.id())
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
index cb49a135b..3a9cdf928 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
@@ -18,31 +18,31 @@
  */
 package org.kie.kogito.jobs.embedded;
 
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
 import org.kie.kogito.jobs.service.api.PayloadData;
 
-public class InVMPayloadData extends 
PayloadData<ProcessInstanceJobDescription> {
+public class InVMPayloadData extends PayloadData<JobDescription> {
 
-    private ProcessInstanceJobDescription jobDescription;
+    private JobDescription jobDescription;
 
     public InVMPayloadData() {
         // do nothing
     }
 
-    public void setJobDescription(ProcessInstanceJobDescription 
jobDescription) {
+    public void setJobDescription(JobDescription jobDescription) {
         this.jobDescription = jobDescription;
     }
 
-    public ProcessInstanceJobDescription getJobDescription() {
+    public JobDescription getJobDescription() {
         return jobDescription;
     }
 
     @Override
-    public ProcessInstanceJobDescription getData() {
+    public JobDescription getData() {
         return jobDescription;
     }
 
-    public InVMPayloadData(ProcessInstanceJobDescription data) {
+    public InVMPayloadData(JobDescription data) {
         this.jobDescription = data;
     }
 
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
index dae4ac4ec..78434dc84 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
@@ -25,7 +25,8 @@ import java.util.concurrent.ExecutionException;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.kie.kogito.event.EventPublisher;
 import org.kie.kogito.event.job.JobInstanceDataEvent;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
 import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
 import org.kie.kogito.jobs.service.api.Recipient;
 import org.kie.kogito.jobs.service.model.JobDetails;
@@ -93,13 +94,15 @@ public class JobInVMEventPublisher implements 
JobEventPublisher {
         try {
             ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
             Recipient<InVMPayloadData> recipient = 
jobDetails.getRecipient().getRecipient();
-            ProcessInstanceJobDescription jobDescription = 
recipient.getPayload().getJobDescription();
-
-            
scheduledJob.setProcessInstanceId(jobDescription.processInstanceId());
-            scheduledJob.setProcessId(jobDescription.processId());
-            
scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId());
-            scheduledJob.setRootProcessId(jobDescription.rootProcessId());
-            scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId());
+            JobDescription jobDescription = 
recipient.getPayload().getJobDescription();
+
+            if (jobDescription instanceof ProcessInstanceJobDescription 
processInstanceJobDescription) {
+                
scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId());
+                
scheduledJob.setProcessId(processInstanceJobDescription.processId());
+                
scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId());
+                
scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId());
+                
scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId());
+            }
 
             byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
             JobInstanceDataEvent event = new 
JobInstanceDataEvent(JOB_EVENT_TYPE,
diff --git 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
index c854f86f7..33e0c8be4 100644
--- 
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
+++ 
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
@@ -28,7 +28,7 @@ import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.job.JobInstanceDataEvent;
 import org.kie.kogito.jobs.DurationExpirationTime;
 import org.kie.kogito.jobs.JobsService;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
 
 import io.quarkus.test.junit.QuarkusTest;
 
@@ -58,7 +58,7 @@ public class EmbeddedJobsServiceTest {
         CountDownLatch latch = new CountDownLatch(8);
         publisher.setLatch(latch);
 
-        ProcessInstanceJobDescription description = 
ProcessInstanceJobDescription.builder()
+        ProcessInstanceJobDescription description = 
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
                 .generateId()
                 .timerId("-1")
                 .expirationTime(DurationExpirationTime.now())
@@ -68,9 +68,9 @@ public class EmbeddedJobsServiceTest {
                 .rootProcessId(null)
                 .nodeInstanceId(NODE_INSTANCE_ID)
                 .build();
-        jobService.scheduleProcessInstanceJob(description);
+        jobService.scheduleJob(description);
 
-        ProcessInstanceJobDescription descriptionWRootProcess = 
ProcessInstanceJobDescription.builder()
+        ProcessInstanceJobDescription descriptionWRootProcess = 
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
                 .generateId()
                 .timerId("-1")
                 .expirationTime(DurationExpirationTime.now())
@@ -80,7 +80,7 @@ public class EmbeddedJobsServiceTest {
                 .rootProcessId(ROOT_PROCESS_ID)
                 .nodeInstanceId(NODE_INSTANCE_ID)
                 .build();
-        jobService.scheduleProcessInstanceJob(descriptionWRootProcess);
+        jobService.scheduleJob(descriptionWRootProcess);
 
         latch.await();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to