This is an automated email from the ASF dual-hosted git repository.

ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b3d59b67  KOGITO-9849 DataIndex is not processing well the http cloud 
events (#1889)
6b3d59b67 is described below

commit 6b3d59b679c49b74e5188701cf600f8a5d7fad3a
Author: Walter Medvedeo <[email protected]>
AuthorDate: Mon Oct 30 14:29:33 2023 +0100

     KOGITO-9849 DataIndex is not processing well the http cloud events (#1889)
    
    * KOGITO-9849 DataIndex is not processing well the http cloud events
    
    * Code review suggestions I
    
    * Fix structured mode that was removed in PR in the middle
---
 .../data-index-service-common/pom.xml              |   5 +-
 .../messaging/KogitoIndexEventConverter.java       | 128 ++++++++++++-
 .../messaging/KogitoIndexEventConverterTest.java   | 197 +++++++++++++++++++--
 .../src/test/resources/binary_job_event_data.json  |  18 ++
 .../binary_process_instance_event_data.json        |  16 ++
 ...binary_user_task_instance_state_event_data.json |  14 ++
 .../src/test/resources/process_instance_event.json |   2 +-
 7 files changed, 360 insertions(+), 20 deletions(-)

diff --git a/data-index/data-index-service/data-index-service-common/pom.xml 
b/data-index/data-index-service/data-index-service-common/pom.xml
index fefd18171..af7ac6a13 100644
--- a/data-index/data-index-service/data-index-service-common/pom.xml
+++ b/data-index/data-index-service/data-index-service-common/pom.xml
@@ -95,7 +95,10 @@
       <groupId>io.quarkus</groupId>
       <artifactId>quarkus-rest-client-reactive</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>io.cloudevents</groupId>
+      <artifactId>cloudevents-http-vertx</artifactId>
+    </dependency>
     <dependency>
       <groupId>io.quarkiverse.reactivemessaging.http</groupId>
       <artifactId>quarkus-reactive-messaging-http</artifactId>
diff --git 
a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
 
b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
index e95c8589b..c848e2a5c 100644
--- 
a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
+++ 
b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
@@ -20,26 +20,57 @@ package org.kie.kogito.index.service.messaging;
 
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.util.function.Supplier;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 
 import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLAEventBody;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody;
 import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody;
 import org.kie.kogito.index.event.KogitoJobCloudEvent;
+import org.kie.kogito.index.model.Job;
 import org.kie.kogito.index.service.DataIndexServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.quarkus.reactivemessaging.http.runtime.IncomingHttpMetadata;
 import io.smallrye.reactive.messaging.MessageConverter;
+import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
 
 /**
  * Converts the message payload into an indexable object. The conversion takes 
into account that the
- * message can be coded in the structured.
+ * message can be coded in the structured or binary format.
  */
 @ApplicationScoped
 public class KogitoIndexEventConverter implements MessageConverter {
@@ -63,15 +94,24 @@ public class KogitoIndexEventConverter implements 
MessageConverter {
     @Override
     public Message<?> convert(Message<?> message, Type type) {
         try {
+            // quarkus-http connector case, let Vertx manage binary and 
structured mode.
+            IncomingHttpMetadata httpMetadata = 
message.getMetadata(IncomingHttpMetadata.class)
+                    .orElseThrow(() -> new IllegalStateException("No 
IncomingHttpMetadata metadata was found current message."));
+            CloudEvent cloudEvent;
+            MultiMap httpHeaders = httpMetadata.getHeaders();
+            Buffer buffer = (Buffer) message.getPayload();
+            MessageReader messageReader = 
VertxMessageFactory.createReader(httpHeaders, buffer);
+            cloudEvent = messageReader.toEvent();
+
             if 
(type.getTypeName().equals(ProcessInstanceDataEvent.class.getTypeName())) {
-                return 
message.withPayload(objectMapper.readValue(message.getPayload().toString(), 
ProcessInstanceDataEvent.class));
+                return 
message.withPayload(buildProcessInstanceDataEventVariant(cloudEvent));
             } else if 
(type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
-                return 
message.withPayload(objectMapper.readValue(message.getPayload().toString(), 
KogitoJobCloudEvent.class));
+                return 
message.withPayload(buildKogitoJobCloudEvent(cloudEvent));
             } else if 
(type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) {
-                return 
message.withPayload(objectMapper.readValue(message.getPayload().toString(), 
UserTaskInstanceDataEvent.class));
-            } else {
-                return 
message.withPayload(objectMapper.readValue(message.getPayload().toString(), 
(Class<?>) type));
+                return 
message.withPayload(buildUserTaskInstanceDataEvent(cloudEvent));
             }
+            // never happens, see isIndexable.
+            throw new IllegalArgumentException("Unknown event type: " + type);
         } catch (IOException e) {
             LOGGER.error("Error converting message payload to " + 
type.getTypeName(), e);
             throw new DataIndexServiceException("Error converting message 
payload:\n" + message.getPayload() + " \n to" + type.getTypeName(), e);
@@ -82,4 +122,80 @@ public class KogitoIndexEventConverter implements 
MessageConverter {
     public void setObjectMapper(ObjectMapper objectMapper) {
         this.objectMapper = objectMapper;
     }
+
+    private DataEvent<?> buildProcessInstanceDataEventVariant(CloudEvent 
cloudEvent) throws IOException {
+        switch (cloudEvent.getType()) {
+            case "ProcessInstanceErrorDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
ProcessInstanceErrorDataEvent::new, ProcessInstanceErrorEventBody.class);
+            case "ProcessInstanceNodeDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
ProcessInstanceNodeDataEvent::new, ProcessInstanceNodeEventBody.class);
+            case "ProcessInstanceSLADataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
ProcessInstanceSLADataEvent::new, ProcessInstanceSLAEventBody.class);
+            case "ProcessInstanceStateDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
ProcessInstanceStateDataEvent::new, ProcessInstanceStateEventBody.class);
+            case "ProcessInstanceVariableDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
ProcessInstanceVariableDataEvent::new, ProcessInstanceVariableEventBody.class);
+            default:
+                throw new IllegalArgumentException("Unknown 
ProcessInstanceDataEvent variant: " + cloudEvent.getType());
+        }
+    }
+
+    private DataEvent<?> buildUserTaskInstanceDataEvent(CloudEvent cloudEvent) 
throws IOException {
+        switch (cloudEvent.getType()) {
+            case "UserTaskInstanceAssignmentDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceAssignmentDataEvent::new, 
UserTaskInstanceAssignmentEventBody.class);
+            case "UserTaskInstanceAttachmentDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceAttachmentDataEvent::new, 
UserTaskInstanceAttachmentEventBody.class);
+            case "UserTaskInstanceCommentDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceCommentDataEvent::new, UserTaskInstanceCommentEventBody.class);
+            case "UserTaskInstanceDeadlineDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceDeadlineDataEvent::new, 
UserTaskInstanceDeadlineEventBody.class);
+            case "UserTaskInstanceStateDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceStateDataEvent::new, UserTaskInstanceStateEventBody.class);
+            case "UserTaskInstanceVariableDataEvent":
+                return buildDataEvent(cloudEvent, objectMapper, 
UserTaskInstanceVariableDataEvent::new, 
UserTaskInstanceVariableEventBody.class);
+            default:
+                throw new IllegalArgumentException("Unknown 
UserTaskInstanceDataEvent variant: " + cloudEvent.getType());
+        }
+    }
+
+    private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent 
cloudEvent) throws IOException {
+        KogitoJobCloudEvent jobCloudEvent = new KogitoJobCloudEvent();
+        jobCloudEvent.setId(cloudEvent.getId());
+        jobCloudEvent.setType(cloudEvent.getType());
+        jobCloudEvent.setSource(cloudEvent.getSource());
+        jobCloudEvent.setContentType(cloudEvent.getDataContentType());
+        jobCloudEvent.setSchemaURL(cloudEvent.getDataSchema());
+        jobCloudEvent.setSubject(cloudEvent.getSubject());
+        jobCloudEvent.setTime(cloudEvent.getTime() != null ? 
cloudEvent.getTime().toZonedDateTime() : null);
+        if (cloudEvent.getData() != null) {
+            
jobCloudEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), 
Job.class));
+        }
+        return jobCloudEvent;
+    }
+
+    private static <E extends AbstractDataEvent<T>, T> E 
buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> 
supplier, Class<T> clazz) throws IOException {
+        E dataEvent = supplier.get();
+        applyCloudEventAttributes(cloudEvent, dataEvent);
+        applyExtensions(cloudEvent, dataEvent);
+        if (cloudEvent.getData() != null) {
+            
dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), 
clazz));
+        }
+        return dataEvent;
+    }
+
+    private static void applyCloudEventAttributes(CloudEvent cloudEvent, 
AbstractDataEvent<?> dataEvent) {
+        dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
+        dataEvent.setId(cloudEvent.getId());
+        dataEvent.setType(cloudEvent.getType());
+        dataEvent.setSource(cloudEvent.getSource());
+        dataEvent.setDataContentType(cloudEvent.getDataContentType());
+        dataEvent.setDataSchema(cloudEvent.getDataSchema());
+        dataEvent.setSubject(cloudEvent.getSubject());
+        dataEvent.setTime(cloudEvent.getTime());
+    }
+
+    private static void applyExtensions(CloudEvent cloudEvent, 
AbstractDataEvent<?> dataEvent) {
+        cloudEvent.getExtensionNames().forEach(extensionName -> 
dataEvent.addExtensionAttribute(extensionName, 
cloudEvent.getExtension(extensionName)));
+    }
 }
diff --git 
a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
 
b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
index 65e21dd04..764d0efc8 100644
--- 
a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
+++ 
b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java
@@ -18,6 +18,11 @@
  */
 package org.kie.kogito.index.service.messaging;
 
+import java.net.URI;
+import java.time.OffsetDateTime;
+
+import javax.ws.rs.core.HttpHeaders;
+
 import org.eclipse.microprofile.reactive.messaging.Message;
 import org.eclipse.microprofile.reactive.messaging.Metadata;
 import org.junit.jupiter.api.BeforeEach;
@@ -26,43 +31,65 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.kie.kogito.event.process.ProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
 import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
 import org.kie.kogito.index.event.KogitoJobCloudEvent;
 import org.kie.kogito.index.event.mapper.ProcessInstanceStateDataEventMerger;
+import org.kie.kogito.index.event.mapper.UserTaskInstanceStateEventMerger;
 import org.kie.kogito.index.json.ObjectMapperProducer;
+import org.kie.kogito.index.model.Job;
 import org.kie.kogito.index.model.ProcessInstance;
+import org.kie.kogito.index.model.UserTaskInstance;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import io.cloudevents.SpecVersion;
 import io.quarkus.reactivemessaging.http.runtime.IncomingHttpMetadata;
 import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static io.cloudevents.core.v1.CloudEventV1.DATACONTENTTYPE;
+import static io.cloudevents.core.v1.CloudEventV1.DATASCHEMA;
+import static io.cloudevents.core.v1.CloudEventV1.ID;
+import static io.cloudevents.core.v1.CloudEventV1.SOURCE;
+import static io.cloudevents.core.v1.CloudEventV1.SPECVERSION;
+import static io.cloudevents.core.v1.CloudEventV1.SUBJECT;
+import static io.cloudevents.core.v1.CloudEventV1.TIME;
+import static io.cloudevents.core.v1.CloudEventV1.TYPE;
+import static org.assertj.core.api.Assertions.*;
 import static org.kie.kogito.index.test.TestUtils.readFileContent;
 import static org.mockito.Mockito.lenient;
 
 @ExtendWith(MockitoExtension.class)
 class KogitoIndexEventConverterTest {
 
-    private static final String BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA = 
"process_instance_event.json";
+    private static final String PROCESS_INSTANCE_STATE_EVENT_TYPE = 
"ProcessInstanceStateDataEvent";
+    private static final String USER_TASK_INSTANCE_STATE_EVENT_TYPE = 
"UserTaskInstanceStateDataEvent";
+    private static final String JOB_EVENT_TYPE = "JobEvent";
+    private static final String EVENT_ID = "ID";
+    private static final URI EVENT_SOURCE = 
URI.create("http://localhost:8080/travels";);
+    private static final OffsetDateTime EVENT_TIME = 
OffsetDateTime.parse("2022-03-18T15:33:05.608395+10:00");
+    private static final URI EVENT_DATA_SCHEMA = 
URI.create("http://my_event_data_schema/my_schema.json";);
+    private static final String EVENT_DATA_CONTENT_TYPE = "application/json; 
charset=utf-8";
+    private static final String EVENT_SUBJECT = "SUBJECT";
 
-    @Mock
-    IncomingHttpMetadata httpMetadata;
+    private static final String STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT = 
"process_instance_event.json";
+    private static final String BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA = 
"binary_process_instance_event_data.json";
+    private static final String BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA = 
"binary_user_task_instance_state_event_data.json";
+    private static final String BINARY_KOGITO_JOB_CLOUD_EVENT_DATA = 
"binary_job_event_data.json";
 
+    @Mock
+    private IncomingHttpMetadata httpMetadata;
     private MultiMap headers;
-
     private KogitoIndexEventConverter converter;
-    private ObjectMapper objectMapper;
 
     @BeforeEach
     void setUp() {
         headers = MultiMap.caseInsensitiveMultiMap();
         lenient().doReturn(headers).when(httpMetadata).getHeaders();
         converter = new KogitoIndexEventConverter();
-        objectMapper = new ObjectMapper();
+        ObjectMapper objectMapper = new ObjectMapper();
         new ObjectMapperProducer().customize(objectMapper);
         converter.setObjectMapper(objectMapper);
     }
@@ -87,27 +114,173 @@ class KogitoIndexEventConverterTest {
     }
 
     @Test
-    void convertBinaryCloudProcessInstanceEvent() throws Exception {
+    void convertBinaryProcessInstanceDataEvent() throws Exception {
         Buffer buffer = 
Buffer.buffer(readFileContent(BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA));
         Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+        // set ce-xxx headers for the binary format.
+        headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+        headers.add(ceHeader(ID), EVENT_ID);
+        headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+        headers.add(ceHeader(TYPE), PROCESS_INSTANCE_STATE_EVENT_TYPE);
+        headers.add(ceHeader(TIME), EVENT_TIME.toString());
+        headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+        headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+        headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
         Message<?> result = converter.convert(message, 
ProcessInstanceDataEvent.class);
-        
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceStateDataEvent.class);
+        
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class);
         ProcessInstanceStateDataEvent cloudEvent = 
(ProcessInstanceStateDataEvent) result.getPayload();
 
+        assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+        
assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString());
+        
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+        
assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE);
+        assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+        assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA);
+        
assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+        assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+        ProcessInstance pi = new ProcessInstance();
+        new ProcessInstanceStateDataEventMerger().merge(pi, cloudEvent);
+        
assertThat(pi.getId()).isEqualTo("5f8b1a48-4d37-4bd2-a1a6-9b8f6097cfdd");
+        assertThat(pi.getVersion()).isEqualTo("1.0");
+        assertThat(pi.getProcessId()).isEqualTo("subscription_flow");
+        assertThat(pi.getProcessName()).isEqualTo("subscription workflow");
+
+        
assertThat(pi.getRootProcessInstanceId()).isEqualTo("root_process_instance_id");
+        assertThat(pi.getRootProcessId()).isEqualTo("root_process_id");
+        
assertThat(pi.getParentProcessInstanceId()).isEqualTo("parent_instance_id");
+
+        assertThat(pi.getRoles()).containsExactly("admin", "user");
+        assertThat(pi.getState()).isEqualTo(1);
+        assertThat(pi.getEnd()).isNull();
+        assertThat(pi.getBusinessKey()).isEqualTo("business_key");
+    }
+
+    @Test
+    void convertStructuredProcessInstanceDataEvent() throws Exception {
+        Buffer buffer = 
Buffer.buffer(readFileContent(STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT));
+        Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+        // set ce header for the structured format.
+        headers.add(HttpHeaders.CONTENT_TYPE, "application/cloudevents+json");
+
+        Message<?> result = converter.convert(message, 
ProcessInstanceDataEvent.class);
+        
assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class);
+        ProcessInstanceDataEvent<?> cloudEvent = (ProcessInstanceDataEvent<?>) 
result.getPayload();
+
+        
assertThat(cloudEvent.getId()).isEqualTo("867ff7b4-2e49-49b3-882a-76f65a2c4124");
+        
assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString());
+        
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+        
assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE);
+        assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+
         ProcessInstance pi = new ProcessInstance();
         new ProcessInstanceStateDataEventMerger().merge(pi, cloudEvent);
         
assertThat(pi.getId()).isEqualTo("2308e23d-9998-47e9-a772-a078cf5b891b");
+        assertThat(pi.getVersion()).isEqualTo("1.0");
         assertThat(pi.getProcessId()).isEqualTo("travels");
         assertThat(pi.getProcessName()).isEqualTo("travels");
         assertThat(pi.getState()).isEqualTo(1);
+        assertThat(pi.getBusinessKey()).isEqualTo("F7RTPS");
         assertThat(pi.getStart()).isEqualTo("2022-03-18T05:32:21.887Z");
         assertThat(pi.getEnd()).isNull();
     }
 
     @Test
-    void convertFailureBinaryUnexpectedBufferContent() throws Exception {
+    void convertBinaryKogitoJobCloudEvent() throws Exception {
+        Buffer buffer = 
Buffer.buffer(readFileContent(BINARY_KOGITO_JOB_CLOUD_EVENT_DATA));
+        Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+        // set ce-xxx headers for the binary format.
+        headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+        headers.add(ceHeader(ID), EVENT_ID);
+        headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+        headers.add(ceHeader(TYPE), JOB_EVENT_TYPE);
+        headers.add(ceHeader(TIME), EVENT_TIME.toString());
+        headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+        headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+        headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
+        Message<?> result = converter.convert(message, 
KogitoJobCloudEvent.class);
+        
assertThat(result.getPayload()).isInstanceOf(KogitoJobCloudEvent.class);
+        KogitoJobCloudEvent cloudEvent = (KogitoJobCloudEvent) 
result.getPayload();
+
+        assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+        
assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1.toString());
+        
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+        assertThat(cloudEvent.getType()).isEqualTo(JOB_EVENT_TYPE);
+        
assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME.toZonedDateTime());
+        assertThat(cloudEvent.getSchemaURL()).isEqualTo(EVENT_DATA_SCHEMA);
+        
assertThat(cloudEvent.getContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+        assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+        Job job = cloudEvent.getData();
+        
assertThat(job.getId()).isEqualTo("8350b8b6-c5d9-432d-a339-a9fc85f642d4_0");
+        assertThat(job.getProcessId()).isEqualTo("timerscycle");
+        
assertThat(job.getProcessInstanceId()).isEqualTo("7c1d9b38-b462-47c5-8bf2-d9154f54957b");
+        assertThat(job.getRootProcessId()).isEqualTo("root_process_id");
+        
assertThat(job.getRootProcessInstanceId()).isEqualTo("root_process_instance_id");
+        assertThat(job.getNodeInstanceId()).isEqualTo("node_instance_id");
+        assertThat(job.getRepeatInterval()).isEqualTo(1000);
+        assertThat(job.getCallbackEndpoint())
+                
.isEqualTo("http://localhost:8080/management/jobs/timerscycle/instances/7c1d9b38-b462-47c5-8bf2-d9154f54957b/timers/8350b8b6-c5d9-432d-a339-a9fc85f642d4_0";);
+        assertThat(job.getScheduledId()).isEqualTo("1234");
+        assertThat(job.getStatus()).isEqualTo("SCHEDULED");
+        assertThat(job.getRepeatInterval()).isEqualTo(1000);
+        assertThat(job.getRepeatLimit()).isEqualTo(2147483647);
+        assertThat(job.getRetries()).isEqualTo(0);
+        assertThat(job.getExecutionCounter()).isEqualTo(0);
+    }
+
+    @Test
+    void convertBinaryUserTaskInstanceDataEvent() throws Exception {
+        Buffer buffer = 
Buffer.buffer(readFileContent(BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA));
+        Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
+
+        // set ce-xxx headers for the binary format.
+        headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString());
+        headers.add(ceHeader(ID), EVENT_ID);
+        headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString());
+        headers.add(ceHeader(TYPE), USER_TASK_INSTANCE_STATE_EVENT_TYPE);
+        headers.add(ceHeader(TIME), EVENT_TIME.toString());
+        headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString());
+        headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE);
+        headers.add(ceHeader(SUBJECT), EVENT_SUBJECT);
+
+        Message<?> result = converter.convert(message, 
UserTaskInstanceDataEvent.class);
+        
assertThat(result.getPayload()).isInstanceOf(UserTaskInstanceStateDataEvent.class);
+        UserTaskInstanceStateDataEvent cloudEvent = 
(UserTaskInstanceStateDataEvent) result.getPayload();
+
+        assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID);
+        assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1);
+        
assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString());
+        
assertThat(cloudEvent.getType()).isEqualTo(USER_TASK_INSTANCE_STATE_EVENT_TYPE);
+        assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME);
+        assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA);
+        
assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE);
+        assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT);
+
+        UserTaskInstance userTaskInstance = new UserTaskInstance();
+        new UserTaskInstanceStateEventMerger().merge(userTaskInstance, 
cloudEvent);
+        
assertThat(userTaskInstance.getId()).isEqualTo("45fae435-b098-4f27-97cf-a0c107072e8b");
+        
assertThat(userTaskInstance.getProcessInstanceId()).isEqualTo("67fb3435-b098-4f27-97cf-a0c107072e8b");
+        assertThat(userTaskInstance.getName()).isEqualTo("VisaApplication");
+        assertThat(userTaskInstance.getDescription()).isEqualTo("This task is 
for applying to a visa");
+        assertThat(userTaskInstance.getReferenceName()).isEqualTo("Apply for 
visa");
+        assertThat(userTaskInstance.getPriority()).isEqualTo("1");
+        assertThat(userTaskInstance.getState()).isEqualTo("Completed");
+    }
+
+    @Test
+    void convertFailureBinaryUnexpectedBufferContent() {
         Buffer buffer = Buffer.buffer("unexpected Content");
         Message<?> message = Message.of(buffer, Metadata.of(httpMetadata));
         assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> 
converter.convert(message, ProcessInstanceDataEvent.class));
     }
-}
+
+    private static String ceHeader(String name) {
+        return "ce-" + name;
+    }
+}
\ No newline at end of file
diff --git 
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
new file mode 100644
index 000000000..eb75e357e
--- /dev/null
+++ 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_job_event_data.json
@@ -0,0 +1,18 @@
+{
+  "id": "8350b8b6-c5d9-432d-a339-a9fc85f642d4_0",
+  "expirationTime": "2020-01-16T20:40:58.918Z",
+  "priority": 0,
+  "callbackEndpoint": 
"http://localhost:8080/management/jobs/timerscycle/instances/7c1d9b38-b462-47c5-8bf2-d9154f54957b/timers/8350b8b6-c5d9-432d-a339-a9fc85f642d4_0";,
+  "processInstanceId": "7c1d9b38-b462-47c5-8bf2-d9154f54957b",
+  "rootProcessInstanceId": "root_process_instance_id",
+  "processId": "timerscycle",
+  "rootProcessId": "root_process_id",
+  "nodeInstanceId": "node_instance_id",
+  "repeatInterval": 1000,
+  "repeatLimit": 2147483647,
+  "scheduledId": "1234",
+  "retries": 0,
+  "status": "SCHEDULED",
+  "lastUpdate": "2020-01-16T20:40:58.206Z",
+  "executionCounter": 0
+}
\ No newline at end of file
diff --git 
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
new file mode 100644
index 000000000..ecfc27f4c
--- /dev/null
+++ 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_process_instance_event_data.json
@@ -0,0 +1,16 @@
+{
+  "eventDate" :  1698332756677,
+  "eventUser" : "admin",
+  "eventType" : 1,
+  "processId": "subscription_flow",
+  "processVersion": "1.0",
+  "processType": null,
+  "processInstanceId":  "5f8b1a48-4d37-4bd2-a1a6-9b8f6097cfdd",
+  "businessKey" :  "business_key",
+  "processName": "subscription workflow",
+  "parentInstanceId": "parent_instance_id",
+  "rootProcessId" : "root_process_id",
+  "rootProcessInstanceId" :  "root_process_instance_id",
+  "state" :  1,
+  "roles":  ["admin", "user"]
+}
\ No newline at end of file
diff --git 
a/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
new file mode 100644
index 000000000..eaae16098
--- /dev/null
+++ 
b/data-index/data-index-service/data-index-service-common/src/test/resources/binary_user_task_instance_state_event_data.json
@@ -0,0 +1,14 @@
+{
+  "eventDate" : 1698332756677,
+  "eventUser" : "admin",
+  "userTaskDefinitionId" : "task_123",
+  "userTaskInstanceId" : "45fae435-b098-4f27-97cf-a0c107072e8b",
+  "userTaskName": "VisaApplication",
+  "eventType": 1,
+  "userTaskDescription" : "This task is for applying to a visa",
+  "userTaskPriority": "1",
+  "userTaskReferenceName": "Apply for visa",
+  "state" : "Completed",
+  "actualOwner" : "admin",
+  "processInstanceId" : "67fb3435-b098-4f27-97cf-a0c107072e8b"
+}
\ No newline at end of file
diff --git 
a/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
 
b/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
index 8ec0a4b74..8b9191815 100644
--- 
a/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
+++ 
b/data-index/data-index-service/data-index-service-common/src/test/resources/process_instance_event.json
@@ -19,5 +19,5 @@
   "kogitoprocinstanceid": "2308e23d-9998-47e9-a772-a078cf5b891b",
   "kogitoprocid": "travels",
   "kogitoaddons": 
"cloudevents,process-svg,prometheus-monitoring,monitoring,infinispan-persistence,process-management",
-  "kogitoProcessInstanceState" : "1"
+  "kogitoprocist" : "1"
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to