This is an automated email from the ASF dual-hosted git repository.
egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new f7856466d [incubator-kie-issues-1551] Deadlines for Human Task (#2128)
f7856466d is described below
commit f7856466d98cce8c7e69896c623bc1e79c90326d
Author: Enrique <[email protected]>
AuthorDate: Mon Nov 18 14:09:58 2024 +0100
[incubator-kie-issues-1551] Deadlines for Human Task (#2128)
---
.../jobs/service/json/JacksonConfiguration.java | 6 +-
.../service/json/JobDescriptionDeserializer.java | 85 ++++++++++++++++++++++
...rializer.java => JobDescriptionSerializer.java} | 32 +++++---
.../ProcessInstanceJobDescriptionDeserializer.java | 67 -----------------
.../kogito/jobs/embedded/EmbeddedJobExecutor.java | 79 ++++++++++++++++----
.../kogito/jobs/embedded/EmbeddedJobsService.java | 11 +--
.../kie/kogito/jobs/embedded/InVMPayloadData.java | 14 ++--
.../jobs/embedded/JobInVMEventPublisher.java | 19 +++--
.../jobs/embedded/EmbeddedJobsServiceTest.java | 10 +--
9 files changed, 197 insertions(+), 126 deletions(-)
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
index cfbbd024a..f1603c1ef 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java
@@ -20,7 +20,7 @@ package org.kie.kogito.jobs.service.json;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.ExactExpirationTime;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +46,8 @@ public class JacksonConfiguration {
return objectMapper -> {
LOGGER.debug("Jackson customization initialized.");
SimpleModule kogitoCustomModule = new SimpleModule();
-
kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new
ProcessInstanceJobDescriptionSerializer());
-
kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new
ProcessInstanceJobDescriptionDeserializer());
+ kogitoCustomModule.addSerializer(JobDescription.class, new
JobDescriptionSerializer());
+ kogitoCustomModule.addDeserializer(JobDescription.class, new
JobDescriptionDeserializer());
kogitoCustomModule.addSerializer(DurationExpirationTime.class, new
DurationExpirationTimeSerializer());
kogitoCustomModule.addDeserializer(DurationExpirationTime.class,
new DurationExpirationTimeDeserializer());
kogitoCustomModule.addSerializer(ExactExpirationTime.class, new
ExactExpirationTimeSerializer());
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
new file mode 100644
index 000000000..f0a72db2b
--- /dev/null
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.json;
+
+import java.io.IOException;
+
+import org.kie.kogito.jobs.ExpirationTime;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import static java.util.Optional.ofNullable;
+
+public class JobDescriptionDeserializer extends
StdDeserializer<JobDescription> {
+
+ private static final long serialVersionUID = -8307549297456060422L;
+
+ public JobDescriptionDeserializer() {
+ super(ProcessInstanceJobDescription.class);
+ }
+
+ @Override
+ public JobDescription deserialize(JsonParser jp, DeserializationContext
ctxt) throws IOException, JacksonException {
+ try {
+ JsonNode node = jp.getCodec().readTree(jp);
+ String jobDescriptionType = node.get("@type").asText();
+ switch (jobDescriptionType) {
+ case "ProcessInstanceJobDescription": {
+ ProcessInstanceJobDescriptionBuilder builder =
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder();
+ ofNullable(node.get("id")).ifPresent(e ->
builder.id(e.textValue()));
+ ofNullable(node.get("priority")).ifPresent(e ->
builder.priority(e.asInt()));
+ String expirationTimeType =
node.get("expirationTime").get("@type").asText();
+ builder.expirationTime((ExpirationTime)
ctxt.readTreeAsValue(node.get("expirationTime"),
Class.forName(expirationTimeType)));
+
+ ofNullable(node.get("timerId")).ifPresent(e ->
builder.timerId(e.textValue()));
+ ofNullable(node.get("processInstanceId")).ifPresent(e ->
builder.processInstanceId(e.textValue()));
+ ofNullable(node.get("rootProcessInstanceId")).ifPresent(e
-> builder.rootProcessInstanceId(e.textValue()));
+ ofNullable(node.get("processId")).ifPresent(e ->
builder.processId(e.textValue()));
+ ofNullable(node.get("rootProcessId")).ifPresent(e ->
builder.rootProcessId(e.textValue()));
+ ofNullable(node.get("nodeInstanceId")).ifPresent(e ->
builder.nodeInstanceId(e.textValue()));
+
+ return builder.build();
+ }
+ case "UserTaskInstanceJobDescription": {
+ UserTaskInstanceJobDescriptionBuilder builder =
UserTaskInstanceJobDescription.newUserTaskInstanceJobDescriptionBuilder();
+ ofNullable(node.get("id")).ifPresent(e ->
builder.id(e.textValue()));
+ ofNullable(node.get("priority")).ifPresent(e ->
builder.priority(e.asInt()));
+ String expirationTimeType =
node.get("expirationTime").get("@type").asText();
+ builder.expirationTime((ExpirationTime)
ctxt.readTreeAsValue(node.get("expirationTime"),
Class.forName(expirationTimeType)));
+
+ ofNullable(node.get("userTaskInstanceId")).ifPresent(e ->
builder.userTaskInstanceId(e.textValue()));
+ return builder.build();
+ }
+ }
+ } catch (ClassNotFoundException e1) {
+ throw new IllegalArgumentException("expiration time class not
found", e1);
+ }
+ return null;
+ }
+
+}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
similarity index 50%
rename from
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
rename to
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
index 379da2a0e..22935e0e3 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
@@ -20,32 +20,40 @@ package org.kie.kogito.jobs.service.json;
import java.io.IOException;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-public class ProcessInstanceJobDescriptionSerializer extends
StdSerializer<ProcessInstanceJobDescription> {
+public class JobDescriptionSerializer extends StdSerializer<JobDescription> {
private static final long serialVersionUID = -8307549297456060422L;
- public ProcessInstanceJobDescriptionSerializer() {
- super(ProcessInstanceJobDescription.class);
+ public JobDescriptionSerializer() {
+ super(JobDescription.class);
}
@Override
- public void serialize(ProcessInstanceJobDescription value, JsonGenerator
jgen, SerializerProvider provider) throws IOException {
+ public void serialize(JobDescription value, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
jgen.writeStartObject();
+ jgen.writeStringField("@type", value.getClass().getSimpleName());
jgen.writeStringField("id", value.id());
- jgen.writeStringField("timerId", value.timerId());
- jgen.writeObjectField("expirationTime", value.expirationTime());
jgen.writeNumberField("priority", value.priority());
- jgen.writeStringField("processInstanceId", value.processInstanceId());
- jgen.writeStringField("rootProcessInstanceId",
value.rootProcessInstanceId());
- jgen.writeStringField("processId", value.processId());
- jgen.writeStringField("rootProcessId", value.rootProcessId());
- jgen.writeStringField("nodeInstanceId", value.nodeInstanceId());
+ jgen.writeObjectField("expirationTime", value.expirationTime());
+ if (value instanceof ProcessInstanceJobDescription jobDescription) {
+ jgen.writeStringField("timerId", jobDescription.timerId());
+ jgen.writeStringField("processInstanceId",
jobDescription.processInstanceId());
+ jgen.writeStringField("rootProcessInstanceId",
jobDescription.rootProcessInstanceId());
+ jgen.writeStringField("processId", jobDescription.processId());
+ jgen.writeStringField("rootProcessId",
jobDescription.rootProcessId());
+ jgen.writeStringField("nodeInstanceId",
jobDescription.nodeInstanceId());
+ jgen.writeEndObject();
+ } else if (value instanceof UserTaskInstanceJobDescription
userTaskInstanceJobDescription) {
+ jgen.writeStringField("userTaskInstanceId",
userTaskInstanceJobDescription.getUserTaskInstanceId());
+ }
jgen.writeEndObject();
}
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
deleted file mode 100644
index 6ccfb99f4..000000000
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kie.kogito.jobs.service.json;
-
-import java.io.IOException;
-
-import org.kie.kogito.jobs.ExpirationTime;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
-import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder;
-
-import com.fasterxml.jackson.core.JacksonException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import static java.util.Optional.ofNullable;
-
-public class ProcessInstanceJobDescriptionDeserializer extends
StdDeserializer<ProcessInstanceJobDescription> {
-
- private static final long serialVersionUID = -8307549297456060422L;
-
- public ProcessInstanceJobDescriptionDeserializer() {
- super(ProcessInstanceJobDescription.class);
- }
-
- @Override
- public ProcessInstanceJobDescription deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException, JacksonException {
- ProcessInstanceJobDescriptionBuilder builder =
ProcessInstanceJobDescription.builder();
-
- JsonNode node = jp.getCodec().readTree(jp);
- ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
- ofNullable(node.get("timerId")).ifPresent(e ->
builder.timerId(e.textValue()));
- ofNullable(node.get("priority")).ifPresent(e ->
builder.priority(e.asInt()));
- ofNullable(node.get("processInstanceId")).ifPresent(e ->
builder.processInstanceId(e.textValue()));
- ofNullable(node.get("rootProcessInstanceId")).ifPresent(e ->
builder.rootProcessInstanceId(e.textValue()));
- ofNullable(node.get("processId")).ifPresent(e ->
builder.processId(e.textValue()));
- ofNullable(node.get("rootProcessId")).ifPresent(e ->
builder.rootProcessId(e.textValue()));
- ofNullable(node.get("nodeInstanceId")).ifPresent(e ->
builder.nodeInstanceId(e.textValue()));
-
- String type = node.get("expirationTime").get("@type").asText();
- try {
- builder.expirationTime((ExpirationTime)
ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(type)));
- } catch (ClassNotFoundException | IOException e1) {
- e1.printStackTrace();
- }
-
- return builder.build();
- }
-
-}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
index ead3bbf2a..961e0485a 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java
@@ -19,9 +19,13 @@
package org.kie.kogito.jobs.embedded;
import java.util.Optional;
+import java.util.function.Supplier;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
@@ -31,40 +35,81 @@ import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.Processes;
import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
+import org.kie.kogito.usertask.UserTaskInstance;
+import org.kie.kogito.usertask.UserTasks;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;
+import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
+import static
org.kie.kogito.services.uow.UnitOfWorkExecutor.executeInUnitOfWork;
+
@ApplicationScoped
@Alternative
public class EmbeddedJobExecutor implements JobExecutor {
@Inject
- Processes processes;
+ Instance<Processes> processes;
+
+ @Inject
+ Instance<UserTasks> userTasks;
@Inject
Application application;
@Override
public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
-
- String correlationId = jobDetails.getCorrelationId();
RecipientInstance recipientModel = (RecipientInstance)
jobDetails.getRecipient();
InVMRecipient recipient = (InVMRecipient)
recipientModel.getRecipient();
- String timerId = recipient.getPayload().getData().timerId();
- String processInstanceId =
recipient.getPayload().getData().processInstanceId();
- Optional<Process<? extends Model>> process;
- try {
- process = processes.processByProcessInstanceId(processInstanceId);
- } catch (Exception ex) {
- return Uni.createFrom().failure(
- new JobExecutionException(jobDetails.getId(),
- "Unexpected error when executing Embedded request
for job: " + jobDetails.getId() + ". " + ex.getMessage(),
- ex));
+ JobDescription jobDescription = recipient.getPayload().getData();
+ if (jobDescription instanceof ProcessInstanceJobDescription
processInstanceJobDescription && processes.isResolvable()) {
+ return processJobDescription(jobDetails,
processInstanceJobDescription);
+ } else if (jobDescription instanceof UserTaskInstanceJobDescription
userTaskInstanceJobDescription && userTasks.isResolvable()) {
+ return processJobDescription(jobDetails,
userTaskInstanceJobDescription);
}
+
+ return Uni.createFrom().item(
+ JobExecutionResponse.builder()
+ .code("401")
+ .jobId(jobDetails.getId())
+ .now()
+ .message("job cannot be processed")
+ .build());
+ }
+
+ private Uni<JobExecutionResponse> processJobDescription(JobDetails
jobDetails, UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
+ Supplier<Void> execute = () ->
executeInUnitOfWork(application.unitOfWorkManager(), () -> {
+ Optional<UserTaskInstance> userTaskInstance =
userTasks.get().instances().findById(userTaskInstanceJobDescription.getUserTaskInstanceId());
+ if (userTaskInstance.isEmpty()) {
+ return null;
+ }
+ UserTaskInstance instance = userTaskInstance.get();
+ instance.trigger(userTaskInstanceJobDescription);
+ return null;
+ });
+
+ return Uni.createFrom().item(execute)
+ .onFailure()
+ .transform(
+ unexpected -> new
JobExecutionException(jobDetails.getId(), "Unexpected error when executing
Embedded request for job: " + jobDetails.getId() + ". " +
unexpected.getMessage(),
+ unexpected))
+ .onItem()
+ .transform(res -> JobExecutionResponse.builder()
+ .message("Embedded job executed")
+ .code(String.valueOf(200))
+ .now()
+ .jobId(jobDetails.getId())
+ .build());
+
+ }
+
+ private Uni<JobExecutionResponse> processJobDescription(JobDetails
jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
+ String timerId = processInstanceJobDescription.timerId();
+ String processInstanceId =
processInstanceJobDescription.processInstanceId();
+ Optional<Process<? extends Model>> process =
processes.get().processByProcessInstanceId(processInstanceId);
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
@@ -77,9 +122,13 @@ public class EmbeddedJobExecutor implements JobExecutor {
Integer limit = jobDetails.getRetries();
- TriggerJobCommand command = new TriggerJobCommand(processInstanceId,
correlationId, timerId, limit, process.get(), application.unitOfWorkManager());
+ Supplier<Boolean> execute = () ->
executeInUnitOfWork(application.unitOfWorkManager(), () -> {
+ TriggerJobCommand command = new
TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId,
limit, process.get(), application.unitOfWorkManager());
+ return command.execute();
+ });
- return Uni.createFrom().item(command::execute)
+ return Uni.createFrom()
+ .item(execute)
.onFailure()
.transform(
unexpected -> new
JobExecutionException(jobDetails.getId(), "Unexpected error when executing
Embedded request for job: " + jobDetails.getId() + ". " +
unexpected.getMessage(),
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
index 95b9b0d92..84754c6dc 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java
@@ -20,9 +20,8 @@ package org.kie.kogito.jobs.embedded;
import java.util.concurrent.ExecutionException;
+import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
-import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.JobCallbackResourceDef;
import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
import org.kie.kogito.jobs.service.api.Job;
@@ -54,13 +53,7 @@ public class EmbeddedJobsService implements JobsService {
}
@Override
- public String scheduleProcessJob(ProcessJobDescription description) {
- LOGGER.debug("ScheduleProcessJob: {} not supported", description);
- return null;
- }
-
- @Override
- public String scheduleProcessInstanceJob(ProcessInstanceJobDescription
description) {
+ public String scheduleJob(JobDescription description) {
try {
Job job = Job.builder()
.id(description.id())
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
index cb49a135b..3a9cdf928 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java
@@ -18,31 +18,31 @@
*/
package org.kie.kogito.jobs.embedded;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.service.api.PayloadData;
-public class InVMPayloadData extends
PayloadData<ProcessInstanceJobDescription> {
+public class InVMPayloadData extends PayloadData<JobDescription> {
- private ProcessInstanceJobDescription jobDescription;
+ private JobDescription jobDescription;
public InVMPayloadData() {
// do nothing
}
- public void setJobDescription(ProcessInstanceJobDescription
jobDescription) {
+ public void setJobDescription(JobDescription jobDescription) {
this.jobDescription = jobDescription;
}
- public ProcessInstanceJobDescription getJobDescription() {
+ public JobDescription getJobDescription() {
return jobDescription;
}
@Override
- public ProcessInstanceJobDescription getData() {
+ public JobDescription getData() {
return jobDescription;
}
- public InVMPayloadData(ProcessInstanceJobDescription data) {
+ public InVMPayloadData(JobDescription data) {
this.jobDescription = data;
}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
index dae4ac4ec..78434dc84 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
@@ -25,7 +25,8 @@ import java.util.concurrent.ExecutionException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.job.JobInstanceDataEvent;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.model.JobDetails;
@@ -93,13 +94,15 @@ public class JobInVMEventPublisher implements
JobEventPublisher {
try {
ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
Recipient<InVMPayloadData> recipient =
jobDetails.getRecipient().getRecipient();
- ProcessInstanceJobDescription jobDescription =
recipient.getPayload().getJobDescription();
-
-
scheduledJob.setProcessInstanceId(jobDescription.processInstanceId());
- scheduledJob.setProcessId(jobDescription.processId());
-
scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId());
- scheduledJob.setRootProcessId(jobDescription.rootProcessId());
- scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId());
+ JobDescription jobDescription =
recipient.getPayload().getJobDescription();
+
+ if (jobDescription instanceof ProcessInstanceJobDescription
processInstanceJobDescription) {
+
scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId());
+
scheduledJob.setProcessId(processInstanceJobDescription.processId());
+
scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId());
+
scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId());
+
scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId());
+ }
byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
JobInstanceDataEvent event = new
JobInstanceDataEvent(JOB_EVENT_TYPE,
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
index c854f86f7..33e0c8be4 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
@@ -28,7 +28,7 @@ import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.JobsService;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import io.quarkus.test.junit.QuarkusTest;
@@ -58,7 +58,7 @@ public class EmbeddedJobsServiceTest {
CountDownLatch latch = new CountDownLatch(8);
publisher.setLatch(latch);
- ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.builder()
+ ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
@@ -68,9 +68,9 @@ public class EmbeddedJobsServiceTest {
.rootProcessId(null)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
- jobService.scheduleProcessInstanceJob(description);
+ jobService.scheduleJob(description);
- ProcessInstanceJobDescription descriptionWRootProcess =
ProcessInstanceJobDescription.builder()
+ ProcessInstanceJobDescription descriptionWRootProcess =
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
@@ -80,7 +80,7 @@ public class EmbeddedJobsServiceTest {
.rootProcessId(ROOT_PROCESS_ID)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
- jobService.scheduleProcessInstanceJob(descriptionWRootProcess);
+ jobService.scheduleJob(descriptionWRootProcess);
latch.await();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]