This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 6b3d59b67 KOGITO-9849 DataIndex is not processing well the http cloud
events (#1889)
6b3d59b67 is described below
commit 6b3d59b679c49b74e5188701cf600f8a5d7fad3a
Author: Walter Medvedeo <[email protected]>
AuthorDate: Mon Oct 30 14:29:33 2023 +0100
KOGITO-9849 DataIndex is not processing well the http cloud events (#1889)
* KOGITO-9849 DataIndex is not processing well the http cloud events
* Code review suggestions I
* Fix structured mode that was removed in PR in the middle
---
.../data-index-service-common/pom.xml | 5 +-
.../messaging/KogitoIndexEventConverter.java | 128 ++++++++++++-
.../messaging/KogitoIndexEventConverterTest.java | 197 +++++++++++++++++++--
.../src/test/resources/binary_job_event_data.json | 18 ++
.../binary_process_instance_event_data.json | 16 ++
...binary_user_task_instance_state_event_data.json | 14 ++
.../src/test/resources/process_instance_event.json | 2 +-
7 files changed, 360 insertions(+), 20 deletions(-)
diff --git a/data-index/data-index-service/data-index-service-common/pom.xml
b/data-index/data-index-service/data-index-service-common/pom.xml
index fefd18171..af7ac6a13 100644
--- a/data-index/data-index-service/data-index-service-common/pom.xml
+++ b/data-index/data-index-service/data-index-service-common/pom.xml
@@ -95,7 +95,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-http-vertx</artifactId>
+ </dependency>
<dependency>
<groupId>io.quarkiverse.reactivemessaging.http</groupId>
<artifactId>quarkus-reactive-messaging-http</artifactId>
diff --git
a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
index e95c8589b..c848e2a5c 100644
---
a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
+++
b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
@@ -20,26 +20,57 @@ package org.kie.kogito.index.service.messaging;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLAEventBody;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
+import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.service.DataIndexServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.quarkus.reactivemessaging.http.runtime.IncomingHttpMetadata;
import io.smallrye.reactive.messaging.MessageConverter;
+import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
/**
* Converts the message payload into an indexable object. The conversion takes
into account that the
- * message can be coded in the structured.
+ * message can be coded in the structured or binary format.
*/
@ApplicationScoped
public class KogitoIndexEventConverter implements MessageConverter {
@@ -63,15 +94,24 @@ public class KogitoIndexEventConverter implements
MessageConverter {
@Override
public Message<?> convert(Message<?> message, Type type) {
try {
+ // quarkus-http connector case, let Vertx manage binary and
structured mode.
+ IncomingHttpMetadata httpMetadata =
message.getMetadata(IncomingHttpMetadata.class)
+ .orElseThrow(() -> new IllegalStateException("No
IncomingHttpMetadata metadata was found current message."));
+ CloudEvent cloudEvent;
+ MultiMap httpHeaders = httpMetadata.getHeaders();
+ Buffer buffer = (Buffer) message.getPayload();
+ MessageReader messageReader =
VertxMessageFactory.createReader(httpHeaders, buffer);
+ cloudEvent = messageReader.toEvent();
+
if
(type.getTypeName().equals(ProcessInstanceDataEvent.class.getTypeName())) {
- return
message.withPayload(objectMapper.readValue(message.getPayload().toString(),
ProcessInstanceDataEvent.class));
+ return
message.withPayload(buildProcessInstanceDataEventVariant(cloudEvent));
} else if
(type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
- return
message.withPayload(objectMapper.readValue(message.getPayload().toString(),
KogitoJobCloudEvent.class));
+ return
message.withPayload(buildKogitoJobCloudEvent(cloudEvent));
} else if
(type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) {
- return
message.withPayload(objectMapper.readValue(message.getPayload().toString(),
UserTaskInstanceDataEvent.class));
- } else {
- return
message.withPayload(objectMapper.readValue(message.getPayload().toString(),
(Class<?>) type));
+ return
message.withPayload(buildUserTaskInstanceDataEvent(cloudEvent));
}
+ // never happens, see isIndexable.
+ throw new IllegalArgumentException("Unknown event type: " + type);
} catch (IOException e) {
LOGGER.error("Error converting message payload to " +
type.getTypeName(), e);
throw new DataIndexServiceException("Error converting message
payload:\n" + message.getPayload() + " \n to" + type.getTypeName(), e);
@@ -82,4 +122,80 @@ public class KogitoIndexEventConverter implements
MessageConverter {
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
+
+ private DataEvent<?> buildProcessInstanceDataEventVariant(CloudEvent
cloudEvent) throws IOException {
+ switch (cloudEvent.getType()) {
+ case "ProcessInstanceErrorDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
ProcessInstanceErrorDataEvent::new, ProcessInstanceErrorEventBody.class);
+ case "ProcessInstanceNodeDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
ProcessInstanceNodeDataEvent::new, ProcessInstanceNodeEventBody.class);
+ case "ProcessInstanceSLADataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
ProcessInstanceSLADataEvent::new, ProcessInstanceSLAEventBody.class);
+ case "ProcessInstanceStateDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
ProcessInstanceStateDataEvent::new, ProcessInstanceStateEventBody.class);
+ case "ProcessInstanceVariableDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
ProcessInstanceVariableDataEvent::new, ProcessInstanceVariableEventBody.class);
+ default:
+ throw new IllegalArgumentException("Unknown
ProcessInstanceDataEvent variant: " + cloudEvent.getType());
+ }
+ }
+
+ private DataEvent<?> buildUserTaskInstanceDataEvent(CloudEvent cloudEvent)
throws IOException {
+ switch (cloudEvent.getType()) {
+ case "UserTaskInstanceAssignmentDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceAssignmentDataEvent::new,
UserTaskInstanceAssignmentEventBody.class);
+ case "UserTaskInstanceAttachmentDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceAttachmentDataEvent::new,
UserTaskInstanceAttachmentEventBody.class);
+ case "UserTaskInstanceCommentDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceCommentDataEvent::new, UserTaskInstanceCommentEventBody.class);
+ case "UserTaskInstanceDeadlineDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceDeadlineDataEvent::new,
UserTaskInstanceDeadlineEventBody.class);
+ case "UserTaskInstanceStateDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceStateDataEvent::new, UserTaskInstanceStateEventBody.class);
+ case "UserTaskInstanceVariableDataEvent":
+ return buildDataEvent(cloudEvent, objectMapper,
UserTaskInstanceVariableDataEvent::new,
UserTaskInstanceVariableEventBody.class);
+ default:
+ throw new IllegalArgumentException("Unknown
UserTaskInstanceDataEvent variant: " + cloudEvent.getType());
+ }
+ }
+
+ private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent
cloudEvent) throws IOException {
+ KogitoJobCloudEvent jobCloudEvent = new KogitoJobCloudEvent();
+ jobCloudEvent.setId(cloudEvent.getId());
+ jobCloudEvent.setType(cloudEvent.getType());
+ jobCloudEvent.setSource(cloudEvent.getSource());
+ jobCloudEvent.setContentType(cloudEvent.getDataContentType());
+ jobCloudEvent.setSchemaURL(cloudEvent.getDataSchema());
+ jobCloudEvent.setSubject(cloudEvent.getSubject());
+ jobCloudEvent.setTime(cloudEvent.getTime() != null ?
cloudEvent.getTime().toZonedDateTime() : null);
+ if (cloudEvent.getData() != null) {
+
jobCloudEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(),
Job.class));
+ }
+ return jobCloudEvent;
+ }
+
+ private static <E extends AbstractDataEvent<T>, T> E
buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E>
supplier, Class<T> clazz) throws IOException {
+ E dataEvent = supplier.get();
+ applyCloudEventAttributes(cloudEvent, dataEvent);
+ applyExtensions(cloudEvent, dataEvent);
+ if (cloudEvent.getData() != null) {
+
dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(),
clazz));
+ }
+ return dataEvent;
+ }
+
+ private static void applyCloudEventAttributes(CloudEvent cloudEvent,
AbstractDataEvent<?> dataEvent) {
+ dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
+ dataEvent.setId(cloudEvent.getId());
+ dataEvent.setType(cloudEvent.getType());
+ dataEvent.setSource(cloudEvent.getSource());
+ dataEvent.setDataContentType(cloudEvent.getDataContentType());
+ dataEvent.setDataSchema(cloudEvent.getDataSchema());
+ dataEvent.setSubject(cloudEvent.getSubject());
+ dataEvent.setTime(cloudEvent.getTime());
+ }
+
+ private static void applyExtensions(CloudEvent cloudEvent,
AbstractDataEvent<?> dataEvent) {
+ cloudEvent.getExtensionNames().forEach(extensionName ->
dataEvent.addExtensionAttribute(extensionName,
cloudEvent.getExtension(extensionName)));
+ }
}
diff --git
a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
index 65e21dd04..764d0efc8 100644
---
a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
+++
b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
@@ -18,6 +18,11 @@
*/
package org.kie.kogito.index.service.messaging;
+import java.net.URI;
+import java.time.OffsetDateTime;
+
+import javax.ws.rs.core.HttpHeaders;
+
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
@@ -26,43 +31,65 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.event.mapper.ProcessInstanceStateDataEventMerger;
+import org.kie.kogito.index.event.mapper.UserTaskInstanceStateEventMerger;
import org.kie.kogito.index.json.ObjectMapperProducer;
+import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.ProcessInstance;
+import org.kie.kogito.index.model.UserTaskInstance;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.SpecVersion;
import io.quarkus.reactivemessaging.http.runtime.IncomingHttpMetadata;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static io.cloudevents.core.v1.CloudEventV1.DATACONTENTTYPE;
+import static io.cloudevents.core.v1.CloudEventV1.DATASCHEMA;
+import static io.cloudevents.core.v1.CloudEventV1.ID;
+import static io.cloudevents.core.v1.CloudEventV1.SOURCE;
+import static io.cloudevents.core.v1.CloudEventV1.SPECVERSION;
+import static io.cloudevents.core.v1.CloudEventV1.SUBJECT;
+import static io.cloudevents.core.v1.CloudEventV1.TIME;
+import static io.cloudevents.core.v1.CloudEventV1.TYPE;
+import static org.assertj.core.api.Assertions.*;
import static org.kie.kogito.index.test.TestUtils.readFileContent;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
class KogitoIndexEventConverterTest {
- private static final String BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA =
"process_instance_event.json";
+ private static final String PROCESS_INSTANCE_STATE_EVENT_TYPE =
"ProcessInstanceStateDataEvent";
+ private static final String USER_TASK_INSTANCE_STATE_EVENT_TYPE =
"UserTaskInstanceStateDataEvent";
+ private static final String JOB_EVENT_TYPE = "JobEvent";
+ private static final String EVENT_ID = "ID";
+ private static final URI EVENT_SOURCE =
URI.create("http://localhost:8080/travels");
+ private static final OffsetDateTime EVENT_TIME =
OffsetDateTime.parse("2022-03-18T15:33:05.608395+10:00");
+ private static final URI EVENT_DATA_SCHEMA =
URI.create("http://my_event_data_schema/my_schema.json");
+ private static final String EVENT_DATA_CONTENT_TYPE = "application/json;
charset=utf-8";
+ private static final String EVENT_SUBJECT = "SUBJECT";
- @Mock
- IncomingHttpMetadata httpMetadata;
+ private static final String STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT =
"process_instance_event.json";
+ private static final String BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA =
"binary_process_instance_event_data.json";
+ private static final String BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA =
"binary_user_task_instance_state_event_data.json";
+ private static final String BINARY_KOGITO_JOB_CLOUD_EVENT_DATA =
"binary_job_event_data.json";
+ @Mock
+ private IncomingHttpMetadata httpMetadata;
private MultiMap headers;
-
private KogitoIndexEventConverter converter;
- private ObjectMapper objectMapper;
@BeforeEach
void setUp() {
headers = MultiMap.caseInsensitiveMultiMap();
lenient().doReturn(headers).when(httpMetadata).getHeaders();
converter = new KogitoIndexEventConverter();
- objectMapper = new ObjectMapper();
+ ObjectMapper objectMapper = new ObjectMapper();
new ObjectMapperProducer().customize(objectMapper);
converter.setObjectMapper(objectMapper);
}
@@ -87,27 +114,173 @@ class KogitoIndexEventConverterTest {
}
@Test
- void convertBinaryCloudProcessInstanceEvent() throws Exception {
+ void convertBinaryProcessInstanceDataEvent() throws Exception {
Buffer buffer =
Buffer.buffer(readFileContent(BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA));
Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+ // set ce-xxx headers for the binary format.
+ headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+ headers.add(ceHeader(ID), EVENT_ID);
+ headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+ headers.add(ceHeader(TYPE), PROCESS_INSTANCE_STATE_EVENT_TYPE);
+ headers.add(ceHeader(TIME), EVENT_TIME.toString());
+ headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+ headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+ headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
Message<?> result = converter.convert(message,
ProcessInstanceDataEvent.class);
-
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceStateDataEvent.class);
+
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class);
ProcessInstanceStateDataEvent cloudEvent =
(ProcessInstanceStateDataEvent) result.getPayload();
+ assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+
assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString());
+
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+
assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE);
+ assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+ assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA);
+
assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+ assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+ ProcessInstance pi = new ProcessInstance();
+ new ProcessInstanceStateDataEventMerger().merge(pi, cloudEvent);
+
assertThat(pi.getId()).isEqualTo("5f8b1a48-4d37-4bd2-a1a6-9b8f6097cfdd");
+ assertThat(pi.getVersion()).isEqualTo("1.0");
+ assertThat(pi.getProcessId()).isEqualTo("subscription_flow");
+ assertThat(pi.getProcessName()).isEqualTo("subscription workflow");
+
+
assertThat(pi.getRootProcessInstanceId()).isEqualTo("root_process_instance_id");
+ assertThat(pi.getRootProcessId()).isEqualTo("root_process_id");
+
assertThat(pi.getParentProcessInstanceId()).isEqualTo("parent_instance_id");
+
+ assertThat(pi.getRoles()).containsExactly("admin", "user");
+ assertThat(pi.getState()).isEqualTo(1);
+ assertThat(pi.getEnd()).isNull();
+ assertThat(pi.getBusinessKey()).isEqualTo("business_key");
+ }
+
+ @Test
+ void convertStructuredProcessInstanceDataEvent() throws Exception {
+ Buffer buffer =
Buffer.buffer(readFileContent(STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT));
+ Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+ // set ce header for the structured format.
+ headers.add(HttpHeaders.CONTENT_TYPE, "application/cloudevents+json");
+
+ Message<?> result = converter.convert(message,
ProcessInstanceDataEvent.class);
+
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class);
+ ProcessInstanceDataEvent<?> cloudEvent = (ProcessInstanceDataEvent<?>)
result.getPayload();
+
+
assertThat(cloudEvent.getId()).isEqualTo("867ff7b4-2e49-49b3-882a-76f65a2c4124");
+
assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString());
+
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+
assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE);
+ assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+
ProcessInstance pi = new ProcessInstance();
new ProcessInstanceStateDataEventMerger().merge(pi, cloudEvent);
assertThat(pi.getId()).isEqualTo("2308e23d-9998-47e9-a772-a078cf5b891b");
+ assertThat(pi.getVersion()).isEqualTo("1.0");
assertThat(pi.getProcessId()).isEqualTo("travels");
assertThat(pi.getProcessName()).isEqualTo("travels");
assertThat(pi.getState()).isEqualTo(1);
+ assertThat(pi.getBusinessKey()).isEqualTo("F7RTPS");
assertThat(pi.getStart()).isEqualTo("2022-03-18T05:32:21.887Z");
assertThat(pi.getEnd()).isNull();
}
@Test
- void convertFailureBinaryUnexpectedBufferContent() throws Exception {
+ void convertBinaryKogitoJobCloudEvent() throws Exception {
+ Buffer buffer =
Buffer.buffer(readFileContent(BINARY_KOGITO_JOB_CLOUD_EVENT_DATA));
+ Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+ // set ce-xxx headers for the binary format.
+ headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+ headers.add(ceHeader(ID), EVENT_ID);
+ headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+ headers.add(ceHeader(TYPE), JOB_EVENT_TYPE);
+ headers.add(ceHeader(TIME), EVENT_TIME.toString());
+ headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+ headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+ headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
+ Message<?> result = converter.convert(message,
KogitoJobCloudEvent.class);
+
assertThat(result.getPayload()).isInstanceOf(KogitoJobCloudEvent.class);
+ KogitoJobCloudEvent cloudEvent = (KogitoJobCloudEvent)
result.getPayload();
+
+ assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+
assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1.toString());
+
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+ assertThat(cloudEvent.getType()).isEqualTo(JOB_EVENT_TYPE);
+
assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME.toZonedDateTime());
+ assertThat(cloudEvent.getSchemaURL()).isEqualTo(EVENT_DATA_SCHEMA);
+
assertThat(cloudEvent.getContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+ assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+ Job job = cloudEvent.getData();
+
assertThat(job.getId()).isEqualTo("8350b8b6-c5d9-432d-a339-a9fc85f642d4_0");
+ assertThat(job.getProcessId()).isEqualTo("timerscycle");
+
assertThat(job.getProcessInstanceId()).isEqualTo("7c1d9b38-b462-47c5-8bf2-d9154f54957b");
+ assertThat(job.getRootProcessId()).isEqualTo("root_process_id");
+
assertThat(job.getRootProcessInstanceId()).isEqualTo("root_process_instance_id");
+ assertThat(job.getNodeInstanceId()).isEqualTo("node_instance_id");
+ assertThat(job.getRepeatInterval()).isEqualTo(1000);
+ assertThat(job.getCallbackEndpoint())
+
.isEqualTo("http://localhost:8080/management/jobs/timerscycle/instances/7c1d9b38-b462-47c5-8bf2-d9154f54957b/timers/8350b8b6-c5d9-432d-a339-a9fc85f642d4_0");
+ assertThat(job.getScheduledId()).isEqualTo("1234");
+ assertThat(job.getStatus()).isEqualTo("SCHEDULED");
+ assertThat(job.getRepeatInterval()).isEqualTo(1000);
+ assertThat(job.getRepeatLimit()).isEqualTo(2147483647);
+ assertThat(job.getRetries()).isEqualTo(0);
+ assertThat(job.getExecutionCounter()).isEqualTo(0);
+ }
+
+ @Test
+ void convertBinaryUserTaskInstanceDataEvent() throws Exception {
+ Buffer buffer =
Buffer.buffer(readFileContent(BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA));
+ Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+ // set ce-xxx headers for the binary format.
+ headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+ headers.add(ceHeader(ID), EVENT_ID);
+ headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+ headers.add(ceHeader(TYPE), USER_TASK_INSTANCE_STATE_EVENT_TYPE);
+ headers.add(ceHeader(TIME), EVENT_TIME.toString());
+ headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+ headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+ headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
+ Message<?> result = converter.convert(message,
UserTaskInstanceDataEvent.class);
+
assertThat(result.getPayload()).isInstanceOf(UserTaskInstanceStateDataEvent.class);
+ UserTaskInstanceStateDataEvent cloudEvent =
(UserTaskInstanceStateDataEvent) result.getPayload();
+
+ assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+ assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1);
+
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+
assertThat(cloudEvent.getType()).isEqualTo(USER_TASK_INSTANCE_STATE_EVENT_TYPE);
+ assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+ assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA);
+
assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+ assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+ UserTaskInstance userTaskInstance = new UserTaskInstance();
+ new UserTaskInstanceStateEventMerger().merge(userTaskInstance,
cloudEvent);
+
assertThat(userTaskInstance.getId()).isEqualTo("45fae435-b098-4f27-97cf-a0c107072e8b");
+
assertThat(userTaskInstance.getProcessInstanceId()).isEqualTo("67fb3435-b098-4f27-97cf-a0c107072e8b");
+ assertThat(userTaskInstance.getName()).isEqualTo("VisaApplication");
+ assertThat(userTaskInstance.getDescription()).isEqualTo("This task is
for applying to a visa");
+ assertThat(userTaskInstance.getReferenceName()).isEqualTo("Apply for
visa");
+ assertThat(userTaskInstance.getPriority()).isEqualTo("1");
+ assertThat(userTaskInstance.getState()).isEqualTo("Completed");
+ }
+
+ @Test
+ void convertFailureBinaryUnexpectedBufferContent() {
Buffer buffer = Buffer.buffer("unexpected Content");
Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->
converter.convert(message, ProcessInstanceDataEvent.class));
}
-}
+
+ private static String ceHeader(String name) {
+ return "ce-" + name;
+ }
+}
\ No newline at end of file
diff --git
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
new file mode 100644
index 000000000..eb75e357e
--- /dev/null
+++
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
@@ -0,0 +1,18 @@
+{
+ "id": "8350b8b6-c5d9-432d-a339-a9fc85f642d4_0",
+ "expirationTime": "2020-01-16T20:40:58.918Z",
+ "priority": 0,
+ "callbackEndpoint":
"http://localhost:8080/management/jobs/timerscycle/instances/7c1d9b38-b462-47c5-8bf2-d9154f54957b/timers/8350b8b6-c5d9-432d-a339-a9fc85f642d4_0",
+ "processInstanceId": "7c1d9b38-b462-47c5-8bf2-d9154f54957b",
+ "rootProcessInstanceId": "root_process_instance_id",
+ "processId": "timerscycle",
+ "rootProcessId": "root_process_id",
+ "nodeInstanceId": "node_instance_id",
+ "repeatInterval": 1000,
+ "repeatLimit": 2147483647,
+ "scheduledId": "1234",
+ "retries": 0,
+ "status": "SCHEDULED",
+ "lastUpdate": "2020-01-16T20:40:58.206Z",
+ "executionCounter": 0
+}
\ No newline at end of file
diff --git
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
new file mode 100644
index 000000000..ecfc27f4c
--- /dev/null
+++
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
@@ -0,0 +1,16 @@
+{
+ "eventDate" : 1698332756677,
+ "eventUser" : "admin",
+ "eventType" : 1,
+ "processId": "subscription_flow",
+ "processVersion": "1.0",
+ "processType": null,
+ "processInstanceId": "5f8b1a48-4d37-4bd2-a1a6-9b8f6097cfdd",
+ "businessKey" : "business_key",
+ "processName": "subscription workflow",
+ "parentInstanceId": "parent_instance_id",
+ "rootProcessId" : "root_process_id",
+ "rootProcessInstanceId" : "root_process_instance_id",
+ "state" : 1,
+ "roles": ["admin", "user"]
+}
\ No newline at end of file
diff --git
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
new file mode 100644
index 000000000..eaae16098
--- /dev/null
+++
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
@@ -0,0 +1,14 @@
+{
+ "eventDate" : 1698332756677,
+ "eventUser" : "admin",
+ "userTaskDefinitionId" : "task_123",
+ "userTaskInstanceId" : "45fae435-b098-4f27-97cf-a0c107072e8b",
+ "userTaskName": "VisaApplication",
+ "eventType": 1,
+ "userTaskDescription" : "This task is for applying to a visa",
+ "userTaskPriority": "1",
+ "userTaskReferenceName": "Apply for visa",
+ "state" : "Completed",
+ "actualOwner" : "admin",
+ "processInstanceId" : "67fb3435-b098-4f27-97cf-a0c107072e8b"
+}
\ No newline at end of file
diff --git
a/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
b/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
index 8ec0a4b74..8b9191815 100644
---
a/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
+++
b/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
@@ -19,5 +19,5 @@
"kogitoprocinstanceid": "2308e23d-9998-47e9-a772-a078cf5b891b",
"kogitoprocid": "travels",
"kogitoaddons":
"cloudevents,process-svg,prometheus-monitoring,monitoring,infinispan-persistence,process-management",
- "kogitoProcessInstanceState" : "1"
+ "kogitoprocist" : "1"
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]