This is an automated email from the ASF dual-hosted git repository.
pefernan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 85ebcbfff incubator-kie-issues#844: `JobInstanceDataEvent` are loosing
some values when published by `JobInVMEventPublisher`. (#1963)
85ebcbfff is described below
commit 85ebcbfff8dda1286cb73c2ee1e11f79752bbaeb
Author: Pere Fernández <[email protected]>
AuthorDate: Mon Jan 22 15:23:47 2024 +0100
incubator-kie-issues#844: `JobInstanceDataEvent` are loosing some values
when published by `JobInVMEventPublisher`. (#1963)
---
.../ProcessInstanceJobDescriptionDeserializer.java | 14 +--
.../jobs/embedded/JobInVMEventPublisher.java | 11 +++
.../jobs/embedded/EmbeddedJobsServiceTest.java | 108 +++++++++++++++++++++
.../jobs/embedded/EmbeddedJobsServiceTests.java | 65 -------------
.../kogito/jobs/embedded/TestEventPublisher.java | 4 +-
5 files changed, 128 insertions(+), 74 deletions(-)
diff --git
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
index 8b60fd644..6ccfb99f4 100644
---
a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
+++
b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java
@@ -45,14 +45,14 @@ public class ProcessInstanceJobDescriptionDeserializer
extends StdDeserializer<P
ProcessInstanceJobDescriptionBuilder builder =
ProcessInstanceJobDescription.builder();
JsonNode node = jp.getCodec().readTree(jp);
- ofNullable(node.get("id")).ifPresent(e -> builder.id(e.asText()));
- ofNullable(node.get("timerId")).ifPresent(e ->
builder.timerId(e.asText()));
+ ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
+ ofNullable(node.get("timerId")).ifPresent(e ->
builder.timerId(e.textValue()));
ofNullable(node.get("priority")).ifPresent(e ->
builder.priority(e.asInt()));
- ofNullable(node.get("processInstanceId")).ifPresent(e ->
builder.processInstanceId(e.asText()));
- ofNullable(node.get("rootProcessInstanceId")).ifPresent(e ->
builder.rootProcessInstanceId(e.asText()));
- ofNullable(node.get("processId")).ifPresent(e ->
builder.processId(e.asText()));
- ofNullable(node.get("rootProcessId")).ifPresent(e ->
builder.rootProcessId(e.asText()));
- ofNullable(node.get("nodeInstanceId")).ifPresent(e ->
builder.nodeInstanceId(e.asText()));
+ ofNullable(node.get("processInstanceId")).ifPresent(e ->
builder.processInstanceId(e.textValue()));
+ ofNullable(node.get("rootProcessInstanceId")).ifPresent(e ->
builder.rootProcessInstanceId(e.textValue()));
+ ofNullable(node.get("processId")).ifPresent(e ->
builder.processId(e.textValue()));
+ ofNullable(node.get("rootProcessId")).ifPresent(e ->
builder.rootProcessId(e.textValue()));
+ ofNullable(node.get("nodeInstanceId")).ifPresent(e ->
builder.nodeInstanceId(e.textValue()));
String type = node.get("expirationTime").get("@type").asText();
try {
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
index 814db7e91..aa8f331a8 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java
@@ -26,7 +26,9 @@ import java.util.concurrent.ExecutionException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.job.JobInstanceDataEvent;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
+import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ScheduledJob;
@@ -138,6 +140,15 @@ public class JobInVMEventPublisher implements
JobEventPublisher {
LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails);
try {
ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
+ Recipient<InVMPayloadData> recipient =
jobDetails.getRecipient().getRecipient();
+ ProcessInstanceJobDescription jobDescription =
recipient.getPayload().getJobDescription();
+
+
scheduledJob.setProcessInstanceId(jobDescription.processInstanceId());
+ scheduledJob.setProcessId(jobDescription.processId());
+
scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId());
+ scheduledJob.setRootProcessId(jobDescription.rootProcessId());
+ scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId());
+
byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
JobInstanceDataEvent event = new
JobInstanceDataEvent(JOB_EVENT_TYPE,
url + RestApiConstants.JOBS_PATH,
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
new file mode 100644
index 000000000..c854f86f7
--- /dev/null
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.embedded;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.job.JobInstanceDataEvent;
+import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.JobsService;
+import org.kie.kogito.jobs.ProcessInstanceJobDescription;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@QuarkusTest
+public class EmbeddedJobsServiceTest {
+
+ private static final String PROCESS_ID = "processId";
+ private static final String PROCESS_INSTANCE_ID = "1";
+ private static final String NODE_INSTANCE_ID = "node_1";
+ private static final String ROOT_PROCESS_ID = "rootProcess";
+ private static final String ROOT_PROCESS_INSTANCE_ID = "0";
+
+ @Inject
+ JobsService jobService;
+
+ @Inject
+ TestEventPublisher publisher;
+
+ @Test
+ public void testJobService() throws Exception {
+
+ // testing only when we have the full lifecycle
+ CountDownLatch latch = new CountDownLatch(8);
+ publisher.setLatch(latch);
+
+ ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.builder()
+ .generateId()
+ .timerId("-1")
+ .expirationTime(DurationExpirationTime.now())
+ .processInstanceId(PROCESS_INSTANCE_ID)
+ .rootProcessInstanceId(null)
+ .processId(PROCESS_ID)
+ .rootProcessId(null)
+ .nodeInstanceId(NODE_INSTANCE_ID)
+ .build();
+ jobService.scheduleProcessInstanceJob(description);
+
+ ProcessInstanceJobDescription descriptionWRootProcess =
ProcessInstanceJobDescription.builder()
+ .generateId()
+ .timerId("-1")
+ .expirationTime(DurationExpirationTime.now())
+ .processInstanceId(PROCESS_INSTANCE_ID)
+ .rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID)
+ .processId(PROCESS_ID)
+ .rootProcessId(ROOT_PROCESS_ID)
+ .nodeInstanceId(NODE_INSTANCE_ID)
+ .build();
+ jobService.scheduleProcessInstanceJob(descriptionWRootProcess);
+
+ latch.await();
+
+ List<DataEvent<?>> events = publisher.getEvents();
+
+ Assertions.assertEquals(8, events.size());
+
+ Consumer<DataEvent<?>> noRootProcess = e -> assertThat(e)
+ .hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId",
null)
+ .hasFieldOrPropertyWithValue("kogitoRootProcessId", null);
+
+ Consumer<DataEvent<?>> withRootProcess = e -> assertThat(e)
+ .hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId",
ROOT_PROCESS_INSTANCE_ID)
+ .hasFieldOrPropertyWithValue("kogitoRootProcessId",
ROOT_PROCESS_ID);
+
+ events.forEach(event -> {
+ assertThat(event)
+ .isInstanceOf(JobInstanceDataEvent.class)
+ .hasFieldOrPropertyWithValue("kogitoProcessId", PROCESS_ID)
+ .hasFieldOrPropertyWithValue("kogitoProcessInstanceId",
PROCESS_INSTANCE_ID)
+ .satisfiesAnyOf(noRootProcess, withRootProcess);
+ });
+ }
+
+}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
deleted file mode 100644
index b602a9dff..000000000
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kie.kogito.jobs.embedded;
-
-import java.util.List;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.kie.kogito.event.DataEvent;
-import org.kie.kogito.jobs.DurationExpirationTime;
-import org.kie.kogito.jobs.JobsService;
-import org.kie.kogito.jobs.ProcessInstanceJobDescription;
-
-import io.quarkus.test.junit.QuarkusTest;
-
-import jakarta.inject.Inject;
-
-@QuarkusTest
-public class EmbeddedJobsServiceTests {
-
- @Inject
- JobsService jobService;
-
- @Inject
- TestEventPublisher publisher;
-
- @Test
- public void testJobService() throws Exception {
- // testing only we have the full lifecycle
- publisher.expectedEvents(2);
-
- ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.builder()
- .generateId()
- .timerId("-1")
- .expirationTime(DurationExpirationTime.now())
- .processInstanceId("1")
- .rootProcessInstanceId(null)
- .processId("processId")
- .rootProcessId(null)
- .nodeInstanceId("node_1")
- .build();
- jobService.scheduleProcessInstanceJob(description);
-
- List<DataEvent<?>> events = publisher.getEvents();
- Assertions.assertEquals(2, events.size());
-
- }
-
-}
diff --git
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
index f5b43f7af..f077205d5 100644
---
a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
+++
b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java
@@ -54,8 +54,8 @@ public class TestEventPublisher implements EventPublisher {
events.forEach(e -> latch.countDown());
}
- public void expectedEvents(int numOfEvents) {
- latch = new CountDownLatch(numOfEvents);
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]