This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new ce37bdc31a [Fix #3721] Optimize event grouping (#3739)
ce37bdc31a is described below
commit ce37bdc31a97d955a53d0ff57460f62246de1243
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Fri Oct 25 13:36:14 2024 +0200
[Fix #3721] Optimize event grouping (#3739)
* [Fix #3721] Optimize event grouping
* [Fix #3721] Adding addons
* [Fix #3721] Refactoring KogitoIndexConverter
---
api/kogito-events-api/pom.xml | 8 +-
.../kogito/event/process/CloudEventVisitor.java} | 15 +-
.../KogitoEventBodySerializationHelper.java | 296 +++++++++++++++++++++
.../event/process/KogitoMarshallEventSupport.java} | 16 +-
.../process/ProcessInstanceErrorEventBody.java | 32 ++-
.../process/ProcessInstanceNodeEventBody.java | 47 +++-
.../event/process/ProcessInstanceSLAEventBody.java | 41 ++-
.../process/ProcessInstanceStateEventBody.java | 43 ++-
.../process/ProcessInstanceVariableEventBody.java | 37 ++-
.../org/kie/kogito/event/DataEventFactory.java | 17 ++
.../JacksonTypeCloudEventDataConverter.java} | 26 +-
.../process/MultipleProcessInstanceDataEvent.java | 19 +-
.../process/ProcessInstanceErrorDataEvent.java | 5 +-
.../process/ProcessInstanceNodeDataEvent.java | 5 +-
.../event/process/ProcessInstanceSLADataEvent.java | 5 +-
.../process/ProcessInstanceStateDataEvent.java | 5 +-
.../process/ProcessInstanceVariableDataEvent.java | 5 +-
.../JsonProcessInstanceDataEventDeserializer.java | 79 ++++++
.../JsonUserTaskInstanceDataEventDeserializer.java | 82 ++++++
.../KogitoDataEventSerializationHelper.java | 74 ++++++
.../KogitoSerializationModule.java} | 21 +-
...rocessDataInstanceBeanDeserializerModifier.java | 40 +++
...eProcessDataInstanceBeanSerializerModifier.java | 40 +++
...ultipleProcessDataInstanceConverterFactory.java | 65 +++++
...ltipleProcessInstanceDataEventDeserializer.java | 184 +++++++++++++
...MultipleProcessInstanceDataEventSerializer.java | 115 ++++++++
.../ProcessInstanceDataEventExtensionRecord.java | 168 ++++++++++++
.../services/com.fasterxml.jackson.databind.Module | 1 +
.../kogito/event/process/ProcessEventsTest.java | 221 +++++++++++++--
.../config/GlobalObjectMapperQuarkusTemplate.java | 2 +-
.../process/GroupingMessagingEventPublisher.java | 15 +-
31 files changed, 1650 insertions(+), 79 deletions(-)
diff --git a/api/kogito-events-api/pom.xml b/api/kogito-events-api/pom.xml
index 63726ae825..1bd9097e94 100644
--- a/api/kogito-events-api/pom.xml
+++ b/api/kogito-events-api/pom.xml
@@ -47,10 +47,6 @@
</dependency>
<!-- CloudEvents -->
- <dependency>
- <groupId>io.cloudevents</groupId>
- <artifactId>cloudevents-core</artifactId>
- </dependency>
<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -72,6 +68,10 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.kie.kogito</groupId>
+ <artifactId>kogito-jackson-utils</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
similarity index 65%
copy from
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to
api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
index 7db8c0e765..1d6f2c1c59 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
@@ -18,17 +18,8 @@
*/
package org.kie.kogito.event.process;
-import java.net.URI;
-import java.util.Collection;
+import org.kie.kogito.event.DataEvent;
-public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
-
- public static final String TYPE = "MultipleProcessInstanceDataEvent";
-
- public MultipleProcessInstanceDataEvent() {
- }
-
- public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<?>> body) {
- super(TYPE, source, body);
- }
+public interface CloudEventVisitor {
+ void visit(DataEvent<?> cloudEvent);
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
new file mode 100644
index 0000000000..d00146eb9c
--- /dev/null
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.process;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.Date;
+
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class KogitoEventBodySerializationHelper {
+
+ private KogitoEventBodySerializationHelper() {
+ }
+
+ public static String readUTF(DataInput in) throws IOException {
+ boolean isNotNull = in.readBoolean();
+ return isNotNull ? in.readUTF() : null;
+ }
+
+ public static void writeUTF(DataOutput out, String string) throws
IOException {
+ if (string == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(string);
+ }
+ }
+
+ public static void writeDate(DataOutput out, Date date) throws IOException
{
+ if (date == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeLong(date.getTime());
+ }
+ }
+
+ public static Date readDate(DataInput in) throws IOException {
+ boolean isNotNull = in.readBoolean();
+ return isNotNull ? new Date(in.readLong()) : null;
+ }
+
+ public static void writeTime(DataOutput out, OffsetDateTime date) throws
IOException {
+ if (date == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeLong(date.toInstant().toEpochMilli());
+ }
+ }
+
+ public static OffsetDateTime readTime(DataInput in) throws IOException {
+ boolean isNotNull = in.readBoolean();
+ return isNotNull ?
Instant.ofEpochMilli(in.readLong()).atOffset(ZoneOffset.UTC) : null;
+ }
+
+ public static void writeUTFCollection(DataOutput out, Collection<String>
collection) throws IOException {
+ if (collection == null) {
+ writeInt(out, -1);
+ } else {
+ writeInt(out, collection.size());
+ for (String item : collection) {
+ writeUTF(out, item);
+ }
+ }
+ }
+
+ public static <T extends Collection<String>> T readUTFCollection(DataInput
in, T holder) throws IOException {
+ int size = readInt(in);
+ if (size == -1) {
+ return null;
+ }
+ while (size-- > 0) {
+ holder.add(readUTF(in));
+ }
+ return holder;
+ }
+
+ private enum SerType {
+
+ NULL(KogitoEventBodySerializationHelper::writeNull,
KogitoEventBodySerializationHelper::readNull),
+ JSON(KogitoEventBodySerializationHelper::writeJson,
KogitoEventBodySerializationHelper::readJson),
+ DEFAULT(KogitoEventBodySerializationHelper::writeJson,
KogitoEventBodySerializationHelper::readDefault),
+ STRING(KogitoEventBodySerializationHelper::writeString,
DataInput::readUTF),
+ INT(KogitoEventBodySerializationHelper::writeInt, DataInput::readInt),
+ SHORT(KogitoEventBodySerializationHelper::writeShort,
DataInput::readShort),
+ LONG(KogitoEventBodySerializationHelper::writeLong,
DataInput::readLong),
+ BYTE(KogitoEventBodySerializationHelper::writeByte,
DataInput::readByte),
+ BOOLEAN(KogitoEventBodySerializationHelper::writeBoolean,
DataInput::readBoolean),
+ FLOAT(KogitoEventBodySerializationHelper::writeFloat,
DataInput::readFloat),
+ DOUBLE(KogitoEventBodySerializationHelper::writeDouble,
DataInput::readDouble);
+
+ final ObjectWriter writer;
+ final ObjectReader reader;
+
+ SerType(ObjectWriter writer, ObjectReader reader) {
+ this.writer = writer;
+ this.reader = reader;
+ }
+
+ ObjectWriter writer() {
+ return writer;
+ }
+
+ ObjectReader reader() {
+ return reader;
+ }
+
+ static SerType fromType(Class<?> type) {
+ if (JsonNode.class.isAssignableFrom(type)) {
+ return JSON;
+ } else if (String.class.isAssignableFrom(type)) {
+ return STRING;
+ } else if (Boolean.class.isAssignableFrom(type)) {
+ return BOOLEAN;
+ } else if (Integer.class.isAssignableFrom(type)) {
+ return INT;
+ } else if (Short.class.isAssignableFrom(type)) {
+ return SHORT;
+ } else if (Byte.class.isAssignableFrom(type)) {
+ return BYTE;
+ } else if (Long.class.isAssignableFrom(type)) {
+ return LONG;
+ } else if (Float.class.isAssignableFrom(type)) {
+ return FLOAT;
+ } else if (Double.class.isAssignableFrom(type)) {
+ return DOUBLE;
+ } else {
+ return DEFAULT;
+ }
+ }
+
+ static SerType fromObject(Object obj) {
+ return obj == null ? NULL : fromType(obj.getClass());
+ }
+ }
+
+ private static void writeType(DataOutput out, SerType type) throws
IOException {
+ out.writeByte(type.ordinal());
+ }
+
+ private static SerType readType(DataInput in) throws IOException {
+ return SerType.values()[in.readByte()];
+ }
+
+ public static void writeObject(DataOutput out, Object obj) throws
IOException {
+ SerType type = SerType.fromObject(obj);
+ writeType(out, type);
+ type.writer().accept(out, obj);
+ }
+
+ public static Object readObject(DataInput in) throws IOException {
+ return readType(in).reader().apply(in);
+ }
+
+ @FunctionalInterface
+ private static interface ObjectWriter {
+ void accept(DataOutput out, Object obj) throws IOException;
+ }
+
+ private static interface ObjectReader {
+ Object apply(DataInput out) throws IOException;
+ }
+
+ private static void writeString(DataOutput out, Object obj) throws
IOException {
+ out.writeUTF((String) obj);
+ }
+
+ private static void writeBoolean(DataOutput out, Object obj) throws
IOException {
+ out.writeBoolean((Boolean) obj);
+ }
+
+ private static void writeInt(DataOutput out, Object obj) throws
IOException {
+ out.writeInt((Integer) obj);
+ }
+
+ private static void writeLong(DataOutput out, Object obj) throws
IOException {
+ out.writeInt((Integer) obj);
+ }
+
+ private static void writeShort(DataOutput out, Object obj) throws
IOException {
+ out.writeShort((Short) obj);
+ }
+
+ private static void writeByte(DataOutput out, Object obj) throws
IOException {
+ out.writeByte((Byte) obj);
+ }
+
+ private static void writeFloat(DataOutput out, Object obj) throws
IOException {
+ out.writeFloat((Float) obj);
+ }
+
+ private static void writeDouble(DataOutput out, Object obj) throws
IOException {
+ out.writeDouble((Double) obj);
+ }
+
+ private static void writeNull(DataOutput out, Object obj) {
+ // do nothing
+ }
+
+ private static Object readNull(DataInput in) {
+ return null;
+ }
+
+ public static void writeInteger(DataOutput out, Integer integer) throws
IOException {
+ if (integer == null) {
+ writeType(out, SerType.NULL);
+ } else {
+ writeInt(out, integer.intValue());
+ }
+ }
+
+ public static Integer readInteger(DataInput in) throws IOException {
+ SerType type = readType(in);
+ return type == SerType.NULL ? null : readInt(in, type);
+ }
+
+ public static void writeInt(DataOutput out, int size) throws IOException {
+ if (size < Byte.MAX_VALUE) {
+ writeType(out, SerType.BYTE);
+ out.writeByte((byte) size);
+ } else if (size < Short.MAX_VALUE) {
+ writeType(out, SerType.SHORT);
+ out.writeShort((short) size);
+ } else {
+ writeType(out, SerType.INT);
+ out.writeInt(size);
+ }
+ }
+
+ public static int readInt(DataInput in) throws IOException {
+ SerType type = readType(in);
+ return readInt(in, type);
+ }
+
+ private static int readInt(DataInput in, SerType type) throws IOException {
+ switch (type) {
+ case INT:
+ return in.readInt();
+ case SHORT:
+ return in.readShort();
+ case BYTE:
+ return in.readByte();
+ default:
+ throw new IOException("Stream corrupted. Read unrecognized
type " + type);
+ }
+ }
+
+ private static void writeJson(DataOutput out, Object obj) throws
IOException {
+ byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ private static Object readJson(DataInput in) throws IOException {
+ return readJson(in, JsonNode.class);
+ }
+
+ private static Object readDefault(DataInput in) throws IOException {
+ return readJson(in, Object.class);
+ }
+
+ private static Object readJson(DataInput in, Class<?> type) throws
IOException {
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ return ObjectMapperFactory.get().readValue(bytes, type);
+ }
+
+ public static Date toDate(OffsetDateTime time) {
+ return time == null ? null : Date.from(time.toInstant());
+ }
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
similarity index 65%
copy from
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to
api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
index 7db8c0e765..76693a64c6 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
@@ -18,17 +18,13 @@
*/
package org.kie.kogito.event.process;
-import java.net.URI;
-import java.util.Collection;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
-public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+public interface KogitoMarshallEventSupport {
- public static final String TYPE = "MultipleProcessInstanceDataEvent";
+ void writeEvent(DataOutput out) throws IOException;
- public MultipleProcessInstanceDataEvent() {
- }
-
- public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<?>> body) {
- super(TYPE, source, body);
- }
+ void readEvent(DataInput in) throws IOException;
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
index a6ea8e7785..e20807a3d0 100644
---
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
@@ -19,9 +19,16 @@
package org.kie.kogito.event.process;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Date;
-public class ProcessInstanceErrorEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceErrorEventBody implements
KogitoMarshallEventSupport, CloudEventVisitor {
// common fields for events
private Date eventDate;
@@ -138,4 +145,27 @@ public class ProcessInstanceErrorEventBody {
return instance;
}
}
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ nodeDefinitionId = in.readUTF();
+ nodeInstanceId = in.readUTF();
+ errorMessage = in.readUTF();
+ }
+
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ out.writeUTF(nodeDefinitionId);
+ out.writeUTF(nodeInstanceId);
+ out.writeUTF(errorMessage);
+ }
+
+ @Override
+ public void visit(DataEvent<?> dataEvent) {
+ this.processId = dataEvent.getKogitoProcessId();
+ this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+ this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+ this.eventDate = toDate(dataEvent.getTime());
+ this.eventUser = dataEvent.getKogitoIdentity();
+ }
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
index ca6621c0b1..349a8280b8 100644
---
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
@@ -18,12 +18,19 @@
*/
package org.kie.kogito.event.process;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-public class ProcessInstanceNodeEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceNodeEventBody implements
KogitoMarshallEventSupport, CloudEventVisitor {
public static final int EVENT_TYPE_ENTER = 1;
@@ -71,7 +78,42 @@ public class ProcessInstanceNodeEventBody {
private Map<String, Object> data;
- private ProcessInstanceNodeEventBody() {
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ writeInteger(out, eventType);
+ writeUTF(out, connectionNodeDefinitionId);
+ out.writeUTF(nodeDefinitionId);
+ writeUTF(out, nodeName);
+ out.writeUTF(nodeType);
+ out.writeUTF(nodeInstanceId);
+ writeUTF(out, workItemId);
+ writeDate(out, slaDueDate);
+ writeObject(out, data);
+ }
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ eventType = readInteger(in);
+ connectionNodeDefinitionId = readUTF(in);
+ nodeDefinitionId = in.readUTF();
+ nodeName = readUTF(in);
+ nodeType = in.readUTF();
+ nodeInstanceId = in.readUTF();
+ workItemId = readUTF(in);
+ slaDueDate = readDate(in);
+ data = (Map<String, Object>) readObject(in);
+ }
+
+ @Override
+ public void visit(DataEvent<?> dataEvent) {
+ this.processId = dataEvent.getKogitoProcessId();
+ this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+ this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+ this.eventDate = toDate(dataEvent.getTime());
+ this.eventUser = dataEvent.getKogitoIdentity();
+ }
+
+ public ProcessInstanceNodeEventBody() {
this.data = new HashMap<>();
}
@@ -246,5 +288,4 @@ public class ProcessInstanceNodeEventBody {
}
}
-
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
index 133c0e5715..d054a490b1 100644
---
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
@@ -18,9 +18,20 @@
*/
package org.kie.kogito.event.process;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Date;
-public class ProcessInstanceSLAEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readDate;
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readUTF;
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeDate;
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeUTF;
+
+public class ProcessInstanceSLAEventBody implements
KogitoMarshallEventSupport, CloudEventVisitor {
// common fields for events
private Date eventDate;
@@ -47,6 +58,34 @@ public class ProcessInstanceSLAEventBody {
private Date slaDueDate;
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ out.writeUTF(nodeDefinitionId);
+ writeUTF(out, nodeName);
+ out.writeUTF(nodeType);
+ out.writeUTF(nodeInstanceId);
+ writeDate(out, slaDueDate);
+
+ }
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ nodeDefinitionId = in.readUTF();
+ nodeName = readUTF(in);
+ nodeType = in.readUTF();
+ nodeInstanceId = in.readUTF();
+ slaDueDate = readDate(in);
+ }
+
+ @Override
+ public void visit(DataEvent<?> dataEvent) {
+ this.processId = dataEvent.getKogitoProcessId();
+ this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+ this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+ this.eventDate = toDate(dataEvent.getTime());
+ this.eventUser = dataEvent.getKogitoIdentity();
+ }
+
public Date getSlaDueDate() {
return slaDueDate;
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
index b9bb4fd9ad..8c7c291110 100644
---
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
@@ -18,13 +18,21 @@
*/
package org.kie.kogito.event.process;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class ProcessInstanceStateEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceStateEventBody implements
KogitoMarshallEventSupport, CloudEventVisitor {
public static final int EVENT_TYPE_STARTED = 1;
public static final int EVENT_TYPE_ENDED = 2;
@@ -65,6 +73,38 @@ public class ProcessInstanceStateEventBody {
public Date slaDueDate;
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ writeInteger(out, eventType);
+ writeUTF(out, processName);
+ writeInteger(out, state);
+ writeUTFCollection(out, roles);
+ writeDate(out, slaDueDate);
+ }
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ eventType = readInteger(in);
+ processName = readUTF(in);
+ state = readInteger(in);
+ roles = readUTFCollection(in, new LinkedHashSet<>());
+ slaDueDate = readDate(in);
+ }
+
+ @Override
+ public void visit(DataEvent<?> dataEvent) {
+ this.processId = dataEvent.getKogitoProcessId();
+ this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+ this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+ this.eventDate = toDate(dataEvent.getTime());
+ this.eventUser = dataEvent.getKogitoIdentity();
+ this.parentInstanceId = dataEvent.getKogitoParentProcessInstanceId();
+ this.rootProcessId = dataEvent.getKogitoRootProcessId();
+ this.rootProcessInstanceId =
dataEvent.getKogitoRootProcessInstanceId();
+ this.processType = dataEvent.getKogitoProcessType();
+ this.businessKey = dataEvent.getKogitoBusinessKey();
+ }
+
public Date getEventDate() {
return eventDate;
}
@@ -262,4 +302,5 @@ public class ProcessInstanceStateEventBody {
}
}
+
}
diff --git
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
index 00a55b9cd8..2008f264f6 100644
---
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
+++
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
@@ -18,12 +18,19 @@
*/
package org.kie.kogito.event.process;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-public class ProcessInstanceVariableEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceVariableEventBody implements
KogitoMarshallEventSupport, CloudEventVisitor {
// common fields for events
private Date eventDate;
@@ -46,6 +53,33 @@ public class ProcessInstanceVariableEventBody {
private String variableName;
private Object variableValue;
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ writeUTF(out, nodeContainerDefinitionId);
+ writeUTF(out, nodeContainerInstanceId);
+ writeUTF(out, variableId);
+ out.writeUTF(variableName);
+ writeObject(out, variableValue);
+ }
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ nodeContainerDefinitionId = readUTF(in);
+ nodeContainerInstanceId = readUTF(in);
+ variableId = readUTF(in);
+ variableName = in.readUTF();
+ variableValue = readObject(in);
+ }
+
+ @Override
+ public void visit(DataEvent<?> dataEvent) {
+ this.processId = dataEvent.getKogitoProcessId();
+ this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+ this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+ this.eventDate = toDate(dataEvent.getTime());
+ this.eventUser = dataEvent.getKogitoIdentity();
+ }
+
public Date getEventDate() {
return eventDate;
}
@@ -184,4 +218,5 @@ public class ProcessInstanceVariableEventBody {
return instance;
}
}
+
}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
index a52df39f9d..f2317a5d66 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
@@ -18,6 +18,7 @@
*/
package org.kie.kogito.event;
+import java.io.IOException;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Optional;
@@ -43,6 +44,22 @@ public class DataEventFactory {
return new CloudEventWrapDataEvent<>(event, dataUnmarshaller);
}
+ public static <T extends AbstractDataEvent<V>, V> T from(T dataEvent,
CloudEvent cloudEvent, Converter<CloudEventData, V> dataUnmarshaller) throws
IOException {
+ dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
+ dataEvent.setId(cloudEvent.getId());
+ dataEvent.setType(cloudEvent.getType());
+ dataEvent.setSource(cloudEvent.getSource());
+ dataEvent.setDataContentType(cloudEvent.getDataContentType());
+ dataEvent.setDataSchema(cloudEvent.getDataSchema());
+ dataEvent.setSubject(cloudEvent.getSubject());
+ dataEvent.setTime(cloudEvent.getTime());
+ cloudEvent.getExtensionNames().forEach(extensionName ->
dataEvent.addExtensionAttribute(extensionName,
cloudEvent.getExtension(extensionName)));
+ if (cloudEvent.getData() != null) {
+ dataEvent.setData(dataUnmarshaller.convert(cloudEvent.getData()));
+ }
+ return dataEvent;
+ }
+
public static <T> DataEvent<T> from(T eventData, String trigger,
KogitoProcessInstance pi) {
return from(eventData, trigger, URI.create("/process/" +
pi.getProcessId()), Optional.empty(),
ProcessMeta.fromKogitoProcessInstance(pi));
}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
similarity index 52%
copy from
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to
api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
index 7db8c0e765..3e64b44a5d 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
@@ -16,19 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.event.process;
+package org.kie.kogito.event.impl;
-import java.net.URI;
-import java.util.Collection;
+import java.io.IOException;
-public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+import org.kie.kogito.event.Converter;
- public static final String TYPE = "MultipleProcessInstanceDataEvent";
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
- public MultipleProcessInstanceDataEvent() {
+import io.cloudevents.CloudEventData;
+
+public class JacksonTypeCloudEventDataConverter<O> implements
Converter<CloudEventData, O> {
+
+ private ObjectMapper objectMapper;
+ private TypeReference<O> outputType;
+
+ public JacksonTypeCloudEventDataConverter(ObjectMapper objectMapper,
TypeReference<O> outputType) {
+ this.objectMapper = objectMapper;
+ this.outputType = outputType;
}
- public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<?>> body) {
- super(TYPE, source, body);
+ @Override
+ public O convert(CloudEventData value) throws IOException {
+ return objectMapper.readValue(value.toBytes(), outputType);
}
}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
index 7db8c0e765..f29a920c13 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
@@ -21,14 +21,25 @@ package org.kie.kogito.event.process;
import java.net.URI;
import java.util.Collection;
-public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>>> {
- public static final String TYPE = "MultipleProcessInstanceDataEvent";
+ public static final String MULTIPLE_TYPE =
"MultipleProcessInstanceDataEvent";
+ public static final String BINARY_CONTENT_TYPE =
"application/octet-stream";
+ public static final String COMPRESS_DATA = "compressdata";
public MultipleProcessInstanceDataEvent() {
}
- public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<?>> body) {
- super(TYPE, source, body);
+ public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>
body) {
+ super(MULTIPLE_TYPE, source, body);
+ }
+
+ public boolean isCompressed() {
+ Object extension =
getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+ return extension instanceof Boolean ? ((Boolean)
extension).booleanValue() : false;
+ }
+
+ public void setCompressed(boolean compressed) {
+ addExtensionAttribute(COMPRESS_DATA, compressed);
}
}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
index 1427e917f2..2c50f8e226 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
public class ProcessInstanceErrorDataEvent extends
ProcessInstanceDataEvent<ProcessInstanceErrorEventBody> {
+ public static final String ERROR_TYPE = "ProcessInstanceErrorDataEvent";
+
public ProcessInstanceErrorDataEvent() {
+ this.setType(ERROR_TYPE);
}
public ProcessInstanceErrorDataEvent(String source, String addons, String
identity, Map<String, Object> metaData, ProcessInstanceErrorEventBody body) {
- super("ProcessInstanceErrorDataEvent",
+ super(ERROR_TYPE,
source,
body,
(String)
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
index db118aa009..e1e6a74e76 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
public class ProcessInstanceNodeDataEvent extends
ProcessInstanceDataEvent<ProcessInstanceNodeEventBody> {
+ public static final String NODE_TYPE = "ProcessInstanceNodeDataEvent";
+
public ProcessInstanceNodeDataEvent() {
+ this.setType(NODE_TYPE);
}
public ProcessInstanceNodeDataEvent(String source, String addons, String
identity, Map<String, Object> metaData, ProcessInstanceNodeEventBody body) {
- super("ProcessInstanceNodeDataEvent",
+ super(NODE_TYPE,
source,
body,
(String)
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
index 90139ce002..e5b743aeac 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
public class ProcessInstanceSLADataEvent extends
ProcessInstanceDataEvent<ProcessInstanceSLAEventBody> {
+ public static final String SLA_TYPE = "ProcessInstanceSLADataEvent";
+
public ProcessInstanceSLADataEvent() {
+ this.setType(SLA_TYPE);
}
public ProcessInstanceSLADataEvent(String source, String addons, String
identity, Map<String, Object> metaData, ProcessInstanceSLAEventBody body) {
- super("ProcessInstanceSLADataEvent",
+ super(SLA_TYPE,
source,
body,
(String)
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
index 38d30defda..a0ce3e1003 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
public class ProcessInstanceStateDataEvent extends
ProcessInstanceDataEvent<ProcessInstanceStateEventBody> {
+ public static final String STATE_TYPE = "ProcessInstanceStateDataEvent";
+
public ProcessInstanceStateDataEvent() {
+ this.setType(STATE_TYPE);
}
public ProcessInstanceStateDataEvent(String source, String addons, String
identity, Map<String, Object> metaData, ProcessInstanceStateEventBody body) {
- super("ProcessInstanceStateDataEvent",
+ super(STATE_TYPE,
source,
body,
(String)
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
index b7c83d367f..1acf471d5b 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
@@ -32,15 +32,18 @@ public class ProcessInstanceVariableDataEvent extends
ProcessInstanceDataEvent<P
private static final Set<String> INTERNAL_EXTENSION_ATTRIBUTES =
Collections.singleton(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME);
+ public static final String VAR_TYPE = "ProcessInstanceVariableDataEvent";
+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME)
private String kogitoVariableName;
public ProcessInstanceVariableDataEvent() {
+ this.setType(VAR_TYPE);
}
public ProcessInstanceVariableDataEvent(String source, String addons,
String identity, Map<String, Object> metaData, ProcessInstanceVariableEventBody
body) {
- super("ProcessInstanceVariableDataEvent",
+ super(VAR_TYPE,
source,
body,
(String)
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..d97b251250
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class JsonProcessInstanceDataEventDeserializer extends
StdDeserializer<ProcessInstanceDataEvent<?>> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JsonProcessInstanceDataEventDeserializer.class);
+
+ private static final long serialVersionUID = 6152014726577574241L;
+
+ public JsonProcessInstanceDataEventDeserializer() {
+ this(JsonProcessInstanceDataEventDeserializer.class);
+ }
+
+ public JsonProcessInstanceDataEventDeserializer(Class<?> vc) {
+ super(vc);
+ }
+
+ @Override
+ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp,
DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ LOGGER.debug("Deserialize process instance data event: {}", node);
+ String type = node.get("type").asText();
+
+ switch (type) {
+ case MultipleProcessInstanceDataEvent.MULTIPLE_TYPE:
+ return jp.getCodec().treeToValue(node,
MultipleProcessInstanceDataEvent.class);
+ case ProcessInstanceErrorDataEvent.ERROR_TYPE:
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
+ case ProcessInstanceNodeDataEvent.NODE_TYPE:
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceNodeDataEvent.class);
+ case ProcessInstanceSLADataEvent.SLA_TYPE:
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceSLADataEvent.class);
+ case ProcessInstanceStateDataEvent.STATE_TYPE:
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceStateDataEvent.class);
+ case ProcessInstanceVariableDataEvent.VAR_TYPE:
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceVariableDataEvent.class);
+ default:
+ LOGGER.warn("Unknown type {} in json data {}", type, node);
+ return (ProcessInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, ProcessInstanceDataEvent.class);
+
+ }
+ }
+}
\ No newline at end of file
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..7e8bf5458a
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class JsonUserTaskInstanceDataEventDeserializer extends
StdDeserializer<UserTaskInstanceDataEvent<?>> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JsonUserTaskInstanceDataEventDeserializer.class);
+
+ private static final long serialVersionUID = -6626663191296012306L;
+
+ public JsonUserTaskInstanceDataEventDeserializer() {
+ this(null);
+ }
+
+ public JsonUserTaskInstanceDataEventDeserializer(Class<?> vc) {
+ super(vc);
+ }
+
+ @Override
+ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp,
DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ LOGGER.debug("Deserialize user task instance data event: {}", node);
+ String type = node.get("type").asText();
+
+ switch (type) {
+ case MultipleUserTaskInstanceDataEvent.TYPE:
+ return jp.getCodec().treeToValue(node,
MultipleUserTaskInstanceDataEvent.class);
+ case "UserTaskInstanceAssignmentDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
+ case "UserTaskInstanceAttachmentDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceAttachmentDataEvent.class);
+ case "UserTaskInstanceCommentDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceCommentDataEvent.class);
+ case "UserTaskInstanceDeadlineDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceDeadlineDataEvent.class);
+ case "UserTaskInstanceStateDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceStateDataEvent.class);
+ case "UserTaskInstanceVariableDataEvent":
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceVariableDataEvent.class);
+ default:
+ LOGGER.warn("Unknown type {} in json data {}", type, node);
+ return (UserTaskInstanceDataEvent<?>)
jp.getCodec().treeToValue(node, UserTaskInstanceDataEvent.class);
+
+ }
+ }
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
new file mode 100644
index 0000000000..f4e512eb22
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+
+import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import io.cloudevents.SpecVersion;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+class KogitoDataEventSerializationHelper {
+
+ private KogitoDataEventSerializationHelper() {
+ }
+
+ static void writeCloudEventAttrs(DataOutput out, DataEvent<?> data) throws
IOException {
+ out.writeUTF(data.getSpecVersion().toString());
+ out.writeUTF(data.getId());
+ writeUTF(out, data.getSubject());
+ writeUTF(out, data.getDataContentType());
+ writeUTF(out, data.getDataSchema() != null ?
data.getDataSchema().toString() : null);
+ }
+
+ static <T extends AbstractDataEvent<?>> T readCloudEventAttrs(DataInput
in, T data) throws IOException {
+ data.setSpecVersion(SpecVersion.parse(in.readUTF()));
+ data.setId(in.readUTF());
+ data.setSubject(readUTF(in));
+ data.setDataContentType(readUTF(in));
+ String dataSchema = readUTF(in);
+ if (dataSchema != null) {
+ data.setDataSchema(URI.create(dataSchema));
+ }
+ return data;
+ }
+
+ static void populateCloudEvent(ProcessInstanceDataEvent<?> event,
ProcessInstanceDataEventExtensionRecord info) {
+ event.setKogitoBusinessKey(info.getBusinessKey());
+ event.setKogitoProcessId(info.getId());
+ event.setKogitoProcessInstanceId(info.getInstanceId());
+ event.setKogitoParentProcessInstanceId(info.getParentInstanceId());
+ event.setKogitoProcessInstanceState(info.getState());
+ event.setKogitoProcessInstanceVersion(info.getVersion());
+ event.setKogitoProcessType(info.getType());
+ event.setKogitoRootProcessId(info.getRootId());
+ event.setKogitoRootProcessInstanceId(info.getRootInstanceId());
+ event.setKogitoIdentity(info.getIdentity());
+ event.setSource(info.getSource());
+ event.setKogitoAddons(info.getAddons());
+ }
+
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
similarity index 50%
copy from
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to
api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
index 7db8c0e765..381e0b93d2 100644
---
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
@@ -16,19 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.event.process;
+package org.kie.kogito.event.serializer;
-import java.net.URI;
-import java.util.Collection;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
-public class MultipleProcessInstanceDataEvent extends
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+import com.fasterxml.jackson.databind.module.SimpleModule;
- public static final String TYPE = "MultipleProcessInstanceDataEvent";
+public class KogitoSerializationModule extends SimpleModule {
- public MultipleProcessInstanceDataEvent() {
- }
+ private static final long serialVersionUID = 1L;
- public MultipleProcessInstanceDataEvent(URI source,
Collection<ProcessInstanceDataEvent<?>> body) {
- super(TYPE, source, body);
+ public KogitoSerializationModule() {
+ super("KogitoSerialization");
+ setSerializerModifier(new
MultipleProcessDataInstanceBeanSerializerModifier());
+ setDeserializerModifier(new
MultipleProcessDataInstanceBeanDeserializerModifier());
+ addDeserializer(ProcessInstanceDataEvent.class, new
JsonProcessInstanceDataEventDeserializer());
+ addDeserializer(UserTaskInstanceDataEvent.class, new
JsonUserTaskInstanceDataEventDeserializer());
}
}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
new file mode 100644
index 0000000000..d72be4e5b5
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+
+public class MultipleProcessDataInstanceBeanDeserializerModifier extends
BeanDeserializerModifier {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public JsonDeserializer<?> modifyDeserializer(
+ DeserializationConfig config, BeanDescription beanDesc,
JsonDeserializer<?> deserializer) {
+ if
(beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) {
+ return new
MultipleProcessInstanceDataEventDeserializer((JsonDeserializer<Object>)
deserializer);
+ }
+ return deserializer;
+ }
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
new file mode 100644
index 0000000000..3c62d932a3
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
+
+public class MultipleProcessDataInstanceBeanSerializerModifier extends
BeanSerializerModifier {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public JsonSerializer<?> modifySerializer(
+ SerializationConfig config, BeanDescription beanDesc,
JsonSerializer<?> serializer) {
+ if
(beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) {
+ return new
MultipleProcessInstanceDataEventSerializer((JsonSerializer<Object>) serializer);
+ }
+ return serializer;
+ }
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
new file mode 100644
index 0000000000..4f8a6205c2
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collection;
+
+import org.kie.kogito.event.Converter;
+import org.kie.kogito.event.impl.JacksonTypeCloudEventDataConverter;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+
+public class MultipleProcessDataInstanceConverterFactory {
+
+ private MultipleProcessDataInstanceConverterFactory() {
+ }
+
+ public static Converter<CloudEventData,
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>
fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) {
+ if
(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType()))
{
+ return isCompressed(cloudEvent) ? compressedConverter :
binaryConverter;
+ } else {
+ return new JacksonTypeCloudEventDataConverter<>(objectMapper, new
TypeReference<Collection<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>>>() {
+ });
+ }
+ }
+
+ private static boolean isCompressed(CloudEvent event) {
+ Object value =
event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+ return value instanceof Boolean ? ((Boolean) value).booleanValue() :
false;
+ }
+
+ private static Converter<CloudEventData,
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>
binaryConverter =
+ data -> deserialize(data, false);
+
+ private static Converter<CloudEventData,
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>
compressedConverter =
+ data -> deserialize(data, true);
+
+ private static Collection<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>> deserialize(CloudEventData data, boolean compress)
throws IOException {
+ return
MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()),
compress);
+ }
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..6e0b0f262c
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.zip.GZIPInputStream;
+
+import org.kie.kogito.event.process.CloudEventVisitor;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLAEventBody;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
+
+import io.cloudevents.SpecVersion;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;
+
+public class MultipleProcessInstanceDataEventDeserializer extends
JsonDeserializer<MultipleProcessInstanceDataEvent> implements
ResolvableDeserializer {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
+
+ private JsonDeserializer<Object> defaultDeserializer;
+
+ public
MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object>
deserializer) {
+ this.defaultDeserializer = deserializer;
+ }
+
+ @Override
+ public void resolve(DeserializationContext ctxt) throws
JsonMappingException {
+ ((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
+ }
+
+ @Override
+ public MultipleProcessInstanceDataEvent deserialize(JsonParser p,
DeserializationContext ctxt)
+ throws IOException, JacksonException {
+ JsonNode node = p.getCodec().readTree(p);
+ JsonNode dataContentType = node.get("datacontenttype");
+ if (dataContentType != null &&
MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(dataContentType.asText()))
{
+ MultipleProcessInstanceDataEvent event = new
MultipleProcessInstanceDataEvent();
+ event.setDataContentType(dataContentType.asText());
+ event.setSource(URI.create(node.get("source").asText()));
+ event.setType(node.get("type").asText());
+
event.setSpecVersion(SpecVersion.parse(node.get("specversion").asText()));
+ event.setId(node.get("id").asText());
+ JsonNode data = node.get("data");
+ if (data != null) {
+ event.setData(readFromBytes(data.binaryValue(),
isCompressed(node)));
+ }
+ return event;
+ } else {
+ JsonParser newParser = node.traverse(p.getCodec());
+ newParser.nextToken();
+ return (MultipleProcessInstanceDataEvent)
defaultDeserializer.deserialize(newParser, ctxt);
+ }
+ }
+
+ private static boolean isCompressed(JsonNode node) {
+ JsonNode compress =
node.get(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+ return compress != null && compress.isBoolean() ? compress.asBoolean()
: false;
+ }
+
+ static Collection<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean
compressed) throws IOException {
+ InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
+ if (compressed) {
+ logger.trace("Gzip compressed byte array");
+ wrappedIn = new GZIPInputStream(wrappedIn);
+ }
+ try (DataInputStream in = new DataInputStream(wrappedIn)) {
+ int size = readInt(in);
+ logger.trace("Reading collection of size {}", size);
+ Collection<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>> result = new ArrayList<>(size);
+ List<ProcessInstanceDataEventExtensionRecord> infos = new
ArrayList<>();
+ while (size-- > 0) {
+ byte readInfo = in.readByte();
+ logger.trace("Info ordinal is {}", readInfo);
+ ProcessInstanceDataEventExtensionRecord info;
+ if (readInfo == -1) {
+ info = new ProcessInstanceDataEventExtensionRecord();
+ info.readEvent(in);
+ logger.trace("Info readed is {}", info);
+ infos.add(info);
+ } else {
+ info = infos.get(readInfo);
+ logger.trace("Info cached is {}", info);
+ }
+ String type = in.readUTF();
+ logger.trace("Type is {}", info);
+ result.add(getCloudEvent(in, type, info));
+ logger.trace("{} events remaining", size);
+ }
+ return result;
+ }
+ }
+
+ private static ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type,
ProcessInstanceDataEventExtensionRecord info) throws IOException {
+ switch (type) {
+ case ProcessInstanceVariableDataEvent.VAR_TYPE:
+ ProcessInstanceVariableDataEvent item = buildDataEvent(in, new
ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new,
info);
+ item.setKogitoVariableName(item.getData().getVariableName());
+ return item;
+ case ProcessInstanceStateDataEvent.STATE_TYPE:
+ return buildDataEvent(in, new ProcessInstanceStateDataEvent(),
ProcessInstanceStateEventBody::new, info);
+ case ProcessInstanceNodeDataEvent.NODE_TYPE:
+ return buildDataEvent(in, new ProcessInstanceNodeDataEvent(),
ProcessInstanceNodeEventBody::new, info);
+ case ProcessInstanceErrorDataEvent.ERROR_TYPE:
+ return buildDataEvent(in, new ProcessInstanceErrorDataEvent(),
ProcessInstanceErrorEventBody::new, info);
+ case ProcessInstanceSLADataEvent.SLA_TYPE:
+ return buildDataEvent(in, new ProcessInstanceSLADataEvent(),
ProcessInstanceSLAEventBody::new, info);
+ default:
+ throw new UnsupportedOperationException("Unrecognized event
type " + type);
+ }
+ }
+
+ private static <T extends ProcessInstanceDataEvent<V>, V extends
KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in,
T cloudEvent, Supplier<V> bodySupplier,
+ ProcessInstanceDataEventExtensionRecord info) throws IOException {
+ int delta = readInt(in);
+ logger.trace("Time delta is {}", delta);
+ cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
+ KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
+ logger.trace("Cloud event before population {}", cloudEvent);
+ KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent,
info);
+ logger.trace("Cloud event after population {}", cloudEvent);
+
+ boolean isNotNull = in.readBoolean();
+ if (isNotNull) {
+ logger.trace("Data is not null");
+ V body = bodySupplier.get();
+ body.readEvent(in);
+ logger.trace("Event body before population {}", body);
+ body.visit(cloudEvent);
+ logger.trace("Event body after population {}", body);
+ cloudEvent.setData(body);
+ } else {
+ logger.trace("Data is null");
+ }
+ return cloudEvent;
+ }
+
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
new file mode 100644
index 0000000000..42825e9679
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;
+
+public class MultipleProcessInstanceDataEventSerializer extends
JsonSerializer<MultipleProcessInstanceDataEvent> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
+
+ private JsonSerializer<Object> defaultSerializer;
+
+ public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object>
serializer) {
+ this.defaultSerializer = serializer;
+ }
+
+ @Override
+ public void serialize(MultipleProcessInstanceDataEvent value,
JsonGenerator gen, SerializerProvider serializers)
+ throws IOException {
+ if
(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(value.getDataContentType()))
{
+ gen.writeStartObject();
+ gen.writeStringField("datacontenttype",
value.getDataContentType());
+ gen.writeStringField("source", value.getSource().toString());
+ gen.writeStringField("id", value.getId());
+ gen.writeStringField("specversion",
value.getSpecVersion().toString());
+ gen.writeStringField("type", value.getType());
+ boolean compress = value.isCompressed();
+ if (compress) {
+
gen.writeBooleanField(MultipleProcessInstanceDataEvent.COMPRESS_DATA, true);
+ }
+ gen.writeBinaryField("data", dataAsBytes(gen, value.getData(),
compress));
+ gen.writeEndObject();
+ } else {
+ defaultSerializer.serialize(value, gen, serializers);
+ }
+ }
+
+ private byte[] dataAsBytes(JsonGenerator gen,
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>
data, boolean compress) throws IOException {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(compress ? new
GZIPOutputStream(bytesOut) : bytesOut)) {
+ logger.trace("Writing size {}", data.size());
+ writeInt(out, data.size());
+ Map<String, ProcessInstanceDataEventExtensionRecord> infos = new
HashMap<>();
+ for (ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport> cloudEvent : data) {
+ String key = cloudEvent.getKogitoProcessInstanceId();
+ ProcessInstanceDataEventExtensionRecord info = infos.get(key);
+ if (info == null) {
+ logger.trace("Writing marker byte -1");
+ out.writeByte((byte) -1);
+ info = new
ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
+ logger.trace("Writing info", info);
+ info.writeEvent(out);
+ infos.put(key, info);
+ } else {
+ logger.trace("Writing marker byte {}", info.getOrdinal());
+ out.writeByte((byte) info.getOrdinal());
+ }
+ logger.trace("Writing type {}", cloudEvent.getType());
+ out.writeUTF(cloudEvent.getType());
+ int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
+ logger.trace("Writing time delta {}", timeDelta);
+ writeInt(out, timeDelta);
+ logger.trace("Writing cloud event attrs {}", cloudEvent);
+ KogitoDataEventSerializationHelper.writeCloudEventAttrs(out,
cloudEvent);
+ KogitoMarshallEventSupport itemData = cloudEvent.getData();
+ if (itemData != null) {
+ logger.trace("Writing data not null boolean");
+ out.writeBoolean(true);
+ logger.trace("Writing cloud event body {}", itemData);
+ itemData.writeEvent(out);
+ } else {
+ logger.trace("Writing data null boolean");
+ out.writeBoolean(false);
+ }
+ logger.trace("individual event writing completed");
+ }
+ }
+ return bytesOut.toByteArray();
+ }
+
+}
diff --git
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
new file mode 100644
index 0000000000..28abac1a28
--- /dev/null
+++
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.time.OffsetDateTime;
+
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+class ProcessInstanceDataEventExtensionRecord implements
KogitoMarshallEventSupport {
+
+ // referenceId and startFromNode are not used by process instance events
+ private String id;
+ private String instanceId;
+ private String version;
+ private String state;
+ private String type;
+ private String parentInstanceId;
+ private String rootId;
+ private String rootInstanceId;
+ private String businessKey;
+ private String identity;
+ private URI source;
+ private OffsetDateTime time;
+ private String addons;
+ private transient int ordinal;
+
+ public ProcessInstanceDataEventExtensionRecord() {
+ }
+
+ public ProcessInstanceDataEventExtensionRecord(int ordinal,
ProcessInstanceDataEvent<?> dataEvent) {
+ this.ordinal = ordinal;
+ id = dataEvent.getKogitoProcessId();
+ instanceId = dataEvent.getKogitoProcessInstanceId();
+ version = dataEvent.getKogitoProcessInstanceVersion();
+ state = dataEvent.getKogitoProcessInstanceState();
+ type = dataEvent.getKogitoProcessType();
+ parentInstanceId = dataEvent.getKogitoParentProcessInstanceId();
+ rootId = dataEvent.getKogitoRootProcessId();
+ rootInstanceId = dataEvent.getKogitoRootProcessInstanceId();
+ businessKey = dataEvent.getKogitoBusinessKey();
+ identity = dataEvent.getKogitoIdentity();
+ time = dataEvent.getTime();
+ source = dataEvent.getSource();
+ addons = dataEvent.getKogitoAddons();
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getBusinessKey() {
+ return businessKey;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getParentInstanceId() {
+ return parentInstanceId;
+ }
+
+ public String getRootId() {
+ return rootId;
+ }
+
+ public String getRootInstanceId() {
+ return rootInstanceId;
+ }
+
+ public String getIdentity() {
+ return identity;
+ }
+
+ public OffsetDateTime getTime() {
+ return time;
+ }
+
+ public URI getSource() {
+ return source;
+ }
+
+ public String getAddons() {
+ return addons;
+ }
+
+ @Override
+ public void writeEvent(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(instanceId);
+ out.writeUTF(version);
+ out.writeUTF(state);
+ writeUTF(out, type);
+ writeUTF(out, parentInstanceId);
+ writeUTF(out, rootId);
+ writeUTF(out, rootInstanceId);
+ writeUTF(out, businessKey);
+ writeUTF(out, identity);
+ writeTime(out, time);
+ out.writeUTF(source.toString());
+ writeUTF(out, addons);
+ }
+
+ @Override
+ public void readEvent(DataInput in) throws IOException {
+ id = in.readUTF();
+ instanceId = in.readUTF();
+ version = in.readUTF();
+ state = in.readUTF();
+ type = readUTF(in);
+ parentInstanceId = readUTF(in);
+ rootId = readUTF(in);
+ rootInstanceId = readUTF(in);
+ businessKey = readUTF(in);
+ identity = readUTF(in);
+ time = readTime(in);
+ source = URI.create(in.readUTF());
+ addons = readUTF(in);
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessInstanceDataEventExtensionRecord [id=" + id + ",
instanceId=" + instanceId + ", version="
+ + version + ", state=" + state + ", type=" + type + ",
parentInstanceId=" + parentInstanceId
+ + ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId
+ ", businessKey=" + businessKey
+ + ", identity=" + identity + ", source=" + source + ", time="
+ time + ", addons=" + addons + "]";
+ }
+
+}
diff --git
a/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
new file mode 100644
index 0000000000..85fd599eb4
--- /dev/null
+++
b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
@@ -0,0 +1 @@
+org.kie.kogito.event.serializer.KogitoSerializationModule
\ No newline at end of file
diff --git
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
index 0d4fb4fafa..d278e2c427 100644
---
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
+++
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
@@ -18,31 +18,41 @@
*/
package org.kie.kogito.event.process;
+import java.io.IOException;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEventFactory;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
+import
org.kie.kogito.event.serializer.MultipleProcessDataInstanceConverterFactory;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.jackson.utils.JsonObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.jackson.JsonFormat;
import static org.assertj.core.api.Assertions.assertThat;
+import static
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
class ProcessEventsTest {
-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
- .registerModule(new JavaTimeModule())
.registerModule(JsonFormat.getCloudEventJacksonModule())
-
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+ .findAndRegisterModules();
+
+ private static final Logger logger =
LoggerFactory.getLogger(ProcessEventsTest.class);
private static final Set<String> BASE_EXTENSION_NAMES = Arrays.stream(new
String[] {
CloudEventExtensionConstants.PROCESS_INSTANCE_ID,
@@ -53,10 +63,9 @@ class ProcessEventsTest {
CloudEventExtensionConstants.PROCESS_INSTANCE_VERSION,
CloudEventExtensionConstants.PROCESS_PARENT_PROCESS_INSTANCE_ID,
CloudEventExtensionConstants.PROCESS_INSTANCE_STATE,
- CloudEventExtensionConstants.PROCESS_REFERENCE_ID,
- CloudEventExtensionConstants.PROCESS_START_FROM_NODE,
CloudEventExtensionConstants.BUSINESS_KEY,
- CloudEventExtensionConstants.PROCESS_TYPE
}).collect(Collectors.toSet());
+ CloudEventExtensionConstants.PROCESS_TYPE,
+ CloudEventExtensionConstants.IDENTITY
}).collect(Collectors.toSet());
private static final String PROCESS_INSTANCE_EVENT_TYPE =
"ProcessInstanceEvent";
private static final String USER_TASK_INSTANCE_EVENT_TYPE =
"UserTaskInstanceEvent";
@@ -77,16 +86,22 @@ class ProcessEventsTest {
private static final String ROOT_PROCESS_ID = "ROOT_PROCESS_ID";
private static final String PROCESS_PARENT_PROCESS_INSTANCE_ID =
"PROCESS_PARENT_PROCESS_INSTANCE_ID";
private static final String PROCESS_INSTANCE_STATE =
"PROCESS_INSTANCE_STATE";
- private static final String PROCESS_REFERENCE_ID = "PROCESS_REFERENCE_ID";
- private static final String PROCESS_START_FROM_NODE =
"PROCESS_START_FROM_NODE";
private static final String BUSINESS_KEY = "BUSINESS_KEY";
private static final String PROCESS_TYPE = "PROCESS_TYPE";
private static final String ADDONS = "ADDONS";
-
+ private static final int PROCESS_STATE = 1;
+ private static final String NODE_CONTAINER_ID = "323";
+ private static final String NODE_CONTAINER_INSTANCEID = "323-3232-3232";
private static final String EXTENSION_1 = "EXTENSION_1";
private static final String EXTENSION_1_VALUE = "EXTENSION_1_VALUE";
private static final String EXTENSION_2 = "EXTENSION_2";
private static final String EXTENSION_2_VALUE = "EXTENSION_2_VALUE";
+ private static final String ERROR_MESSAGE = "AAAAAAHHHHH!!!!!";
+
+ private static final int EVENT_TYPE = 1;
+
+ private static final String NODE_NAME = "NODE_NAME";
+ private static final String NODE_TYPE = "NODE_TYPE";
private static final String VARIABLE_NAME = "VARIABLE_NAME";
@@ -112,6 +127,178 @@ class ProcessEventsTest {
assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES,
EXTENSION_1, EXTENSION_2);
}
+ @Test
+ void multipleInstanceDataEvent() throws IOException {
+ JsonNode expectedVarValue =
OBJECT_MAPPER.createObjectNode().put("name", "John Doe");
+ int standard = processMultipleInstanceDataEvent(expectedVarValue,
false, false);
+ int binary = processMultipleInstanceDataEvent(expectedVarValue, true,
false);
+ int binaryCompressed =
processMultipleInstanceDataEvent(expectedVarValue, true, true);
+ assertThat(standard).isGreaterThan(binary);
+ assertThat(binary).isGreaterThan(binaryCompressed);
+ }
+
+ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue,
boolean binary, boolean compress) throws IOException {
+ ProcessInstanceStateDataEvent stateEvent = new
ProcessInstanceStateDataEvent();
+ setBaseEventValues(stateEvent,
ProcessInstanceStateDataEvent.STATE_TYPE);
+
stateEvent.setData(ProcessInstanceStateEventBody.create().eventDate(toDate(TIME)).eventType(EVENT_TYPE).eventUser(SUBJECT)
+
.businessKey(BUSINESS_KEY).processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).state(PROCESS_STATE)
+
.processVersion(PROCESS_INSTANCE_VERSION).parentInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID).processName(PROCESS_ID)
+
.processType(PROCESS_TYPE).rootProcessId(ROOT_PROCESS_ID).rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID).build());
+
+ ProcessInstanceVariableDataEvent varEvent = new
ProcessInstanceVariableDataEvent();
+ setBaseEventValues(varEvent,
ProcessInstanceVariableDataEvent.VAR_TYPE);
+
varEvent.addExtensionAttribute(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME,
VARIABLE_NAME);
+
varEvent.setData(ProcessInstanceVariableEventBody.create().eventDate(toDate(TIME)).eventUser(SUBJECT)
+
.processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION)
+
.nodeContainerDefinitionId(NODE_CONTAINER_ID).nodeContainerInstanceId(NODE_CONTAINER_INSTANCEID)
+ .variableName(VARIABLE_NAME)
+ .variableId(VARIABLE_NAME)
+ .variableValue(expectedVarValue)
+ .build());
+
+ ProcessInstanceErrorDataEvent errorEvent = new
ProcessInstanceErrorDataEvent();
+ setBaseEventValues(errorEvent,
ProcessInstanceErrorDataEvent.ERROR_TYPE);
+
errorEvent.setData(ProcessInstanceErrorEventBody.create().errorMessage(ERROR_MESSAGE).eventDate(toDate(TIME)).eventUser(SUBJECT)
+
.processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+ .nodeInstanceId(NODE_CONTAINER_INSTANCEID).build());
+
+ ProcessInstanceNodeDataEvent nodeEvent = new
ProcessInstanceNodeDataEvent();
+ setBaseEventValues(nodeEvent, ProcessInstanceNodeDataEvent.NODE_TYPE);
+ nodeEvent
+
.setData(ProcessInstanceNodeEventBody.create().processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+
.nodeInstanceId(NODE_CONTAINER_INSTANCEID).eventDate(toDate(TIME)).eventUser(SUBJECT).connectionNodeDefinitionId(NODE_CONTAINER_ID).workItemId(NODE_CONTAINER_ID)
+ .nodeType(NODE_TYPE).nodeName(NODE_NAME)
+
.eventType(EVENT_TYPE).slaDueDate(toDate(TIME)).build());
+
+ ProcessInstanceSLADataEvent slaEvent = new
ProcessInstanceSLADataEvent();
+ setBaseEventValues(slaEvent, ProcessInstanceSLADataEvent.SLA_TYPE);
+ slaEvent
+
.setData(ProcessInstanceSLAEventBody.create().processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+
.nodeInstanceId(NODE_CONTAINER_INSTANCEID).eventDate(toDate(TIME)).eventUser(SUBJECT)
+
.nodeType(NODE_TYPE).nodeName(NODE_NAME).slaDueDate(toDate(TIME)).build());
+
+ MultipleProcessInstanceDataEvent event = new
MultipleProcessInstanceDataEvent(SOURCE, Arrays.asList(stateEvent, varEvent,
errorEvent, nodeEvent, slaEvent));
+ if (binary) {
+
event.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE);
+ }
+ if (compress) {
+ event.setCompressed(compress);
+ }
+
+ byte[] json = OBJECT_MAPPER.writeValueAsBytes(event);
+ logger.info("Serialized chunk size is {}", json.length);
+
+ // cloud event structured mode check
+ MultipleProcessInstanceDataEvent deserializedEvent =
OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class);
+
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
+ assertMultipleIntance(deserializedEvent, expectedVarValue);
+
+ // cloud event binary mode check
+ CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json,
CloudEvent.class);
+ deserializedEvent = DataEventFactory.from(new
MultipleProcessInstanceDataEvent(), cloudEvent,
MultipleProcessDataInstanceConverterFactory.fromCloudEvent(cloudEvent,
OBJECT_MAPPER));
+
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
+ assertMultipleIntance(deserializedEvent, expectedVarValue);
+ return json.length;
+ }
+
+ private void assertMultipleIntance(MultipleProcessInstanceDataEvent
deserializedEvent, JsonNode expectedVarValue) {
+
+ Iterator<ProcessInstanceDataEvent<? extends
KogitoMarshallEventSupport>> iter = deserializedEvent.getData().iterator();
+ ProcessInstanceStateDataEvent deserializedStateEvent =
(ProcessInstanceStateDataEvent) iter.next();
+ assertBaseEventValues(deserializedStateEvent,
ProcessInstanceStateDataEvent.STATE_TYPE);
+ assertExtensionNames(deserializedStateEvent, BASE_EXTENSION_NAMES);
+ assertStateBody(deserializedStateEvent.getData());
+
+ ProcessInstanceVariableDataEvent deserializedVariableEvent =
(ProcessInstanceVariableDataEvent) iter.next();
+ assertBaseEventValues(deserializedVariableEvent,
ProcessInstanceVariableDataEvent.VAR_TYPE);
+ assertExtensionNames(deserializedVariableEvent, BASE_EXTENSION_NAMES,
CloudEventExtensionConstants.KOGITO_VARIABLE_NAME);
+
assertThat(deserializedVariableEvent.getExtension(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME)).isEqualTo(VARIABLE_NAME);
+ assertVarBody(deserializedVariableEvent.getData(), expectedVarValue);
+
+ ProcessInstanceErrorDataEvent deserializedErrorEvent =
(ProcessInstanceErrorDataEvent) iter.next();
+ assertBaseEventValues(deserializedErrorEvent,
ProcessInstanceErrorDataEvent.ERROR_TYPE);
+ assertExtensionNames(deserializedErrorEvent, BASE_EXTENSION_NAMES);
+ assertErrorBody(deserializedErrorEvent.getData());
+
+ ProcessInstanceNodeDataEvent deserializedNodeEvent =
(ProcessInstanceNodeDataEvent) iter.next();
+ assertBaseEventValues(deserializedNodeEvent,
ProcessInstanceNodeDataEvent.NODE_TYPE);
+ assertExtensionNames(deserializedNodeEvent, BASE_EXTENSION_NAMES);
+ assertNodeBody(deserializedNodeEvent.getData());
+
+ ProcessInstanceSLADataEvent deserializedSLAEvent =
(ProcessInstanceSLADataEvent) iter.next();
+ assertBaseEventValues(deserializedSLAEvent,
ProcessInstanceSLADataEvent.SLA_TYPE);
+ assertExtensionNames(deserializedSLAEvent, BASE_EXTENSION_NAMES);
+ assertSLABody(deserializedSLAEvent.getData());
+ }
+
+ private void assertSLABody(ProcessInstanceSLAEventBody data) {
+ assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+ assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+ assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+ assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+ assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+ assertThat(data.getSlaDueDate()).isEqualTo(toDate(TIME));
+ assertThat(data.getNodeName()).isEqualTo(NODE_NAME);
+ assertThat(data.getNodeType()).isEqualTo(NODE_TYPE);
+ }
+
+ private void assertNodeBody(ProcessInstanceNodeEventBody data) {
+ assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+ assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+ assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+ assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+ assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+ assertThat(data.getEventType()).isEqualTo(EVENT_TYPE);
+
assertThat(data.getConnectionNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+ assertThat(data.getWorkItemId()).isEqualTo(NODE_CONTAINER_ID);
+ assertThat(data.getSlaDueDate()).isEqualTo(toDate(TIME));
+ assertThat(data.getNodeName()).isEqualTo(NODE_NAME);
+ assertThat(data.getNodeType()).isEqualTo(NODE_TYPE);
+ }
+
+ private void assertErrorBody(ProcessInstanceErrorEventBody data) {
+ assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+ assertThat(data.getErrorMessage()).isEqualTo(ERROR_MESSAGE);
+ assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+ assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+ assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+ assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+ }
+
+ private static void assertVarBody(ProcessInstanceVariableEventBody data,
JsonNode expectedVarValue) {
+ assertThat(data.getVariableId()).isEqualTo(VARIABLE_NAME);
+ assertThat(data.getVariableName()).isEqualTo(VARIABLE_NAME);
+
assertThat(JsonObjectUtils.fromValue(data.getVariableValue())).isEqualTo(expectedVarValue);
+
assertThat(data.getNodeContainerDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+
assertThat(data.getNodeContainerInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+ assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+ assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+ assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+ assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+ }
+
+ private static void assertStateBody(ProcessInstanceStateEventBody data) {
+ assertThat(data.getBusinessKey()).isEqualTo(BUSINESS_KEY);
+
assertThat(data.getParentInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID);
+ assertThat(data.getRootProcessId()).isEqualTo(ROOT_PROCESS_ID);
+ assertThat(data.getProcessType()).isEqualTo(PROCESS_TYPE);
+ assertThat(data.getState()).isEqualTo(PROCESS_STATE);
+
assertThat(data.getRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID);
+ assertThat(data.getEventType()).isEqualTo(EVENT_TYPE);
+ assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+ assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+ assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+ assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+ }
+
@Test
void userTaskInstanceDataEvent() throws Exception {
UserTaskInstanceStateDataEvent event = new
UserTaskInstanceStateDataEvent();
@@ -136,7 +323,6 @@ class ProcessEventsTest {
assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES,
CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_ID,
CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_STATE,
EXTENSION_1, EXTENSION_2);
-
}
@Test
@@ -175,12 +361,11 @@ class ProcessEventsTest {
event.setKogitoRootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID);
event.setKogitoRootProcessId(ROOT_PROCESS_ID);
event.setKogitoParentProcessInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID);
- event.setKogitoReferenceId(PROCESS_REFERENCE_ID);
event.setKogitoProcessInstanceState(PROCESS_INSTANCE_STATE);
- event.setKogitoStartFromNode(PROCESS_START_FROM_NODE);
event.setKogitoBusinessKey(BUSINESS_KEY);
event.setKogitoProcessType(PROCESS_TYPE);
event.setKogitoAddons(ADDONS);
+ event.setKogitoIdentity(SUBJECT);
}
private static void setAdditionalExtensions(AbstractDataEvent<?> event) {
@@ -197,25 +382,25 @@ class ProcessEventsTest {
assertThat(deserializedEvent.getSubject()).isEqualTo(SUBJECT);
assertThat(deserializedEvent.getDataContentType()).isEqualTo(DATA_CONTENT_TYPE);
assertThat(deserializedEvent.getDataSchema()).isEqualTo(DATA_SCHEMA);
-
assertThat(deserializedEvent.getKogitoProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
assertThat(deserializedEvent.getKogitoProcessId()).isEqualTo(PROCESS_ID);
assertThat(deserializedEvent.getKogitoRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID);
assertThat(deserializedEvent.getKogitoRootProcessId()).isEqualTo(ROOT_PROCESS_ID);
assertThat(deserializedEvent.getKogitoParentProcessInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID);
-
assertThat(deserializedEvent.getKogitoReferenceId()).isEqualTo(PROCESS_REFERENCE_ID);
assertThat(deserializedEvent.getKogitoProcessInstanceState()).isEqualTo(PROCESS_INSTANCE_STATE);
-
assertThat(deserializedEvent.getKogitoStartFromNode()).isEqualTo(PROCESS_START_FROM_NODE);
assertThat(deserializedEvent.getKogitoBusinessKey()).isEqualTo(BUSINESS_KEY);
assertThat(deserializedEvent.getKogitoProcessType()).isEqualTo(PROCESS_TYPE);
+ assertThat(deserializedEvent.getKogitoIdentity()).isEqualTo(SUBJECT);
assertThat(deserializedEvent.getKogitoAddons()).isEqualTo(ADDONS);
}
private static void assertExtensionNames(AbstractDataEvent<?> event,
Set<String> baseNames, String... names) {
Set<String> extensionNames = event.getExtensionNames();
assertThat(extensionNames).hasSize(baseNames.size() + names.length)
- .containsAll(baseNames)
- .contains(names);
+ .containsAll(baseNames);
+ if (names.length > 0) {
+ assertThat(extensionNames).contains(names);
+ }
}
private static void assertExtensionsNotDuplicated(String json, Set<String>
extensionNames) {
diff --git
a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
index 050097b1d1..c670e699f9 100644
---
a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
+++
b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
@@ -43,6 +43,6 @@ public class GlobalObjectMapper implements
ObjectMapperCustomizer {
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
}
mapper.setDateFormat(new
StdDateFormat().withColonInTimeZone(true).withTimeZone(TimeZone.getDefault()));
- mapper.registerModule(new
JavaTimeModule()).registerModule(JsonFormat.getCloudEventJacksonModule());
+
mapper.registerModule(JsonFormat.getCloudEventJacksonModule()).findAndRegisterModules();
}
}
\ No newline at end of file
diff --git
a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
index 66e007d039..ee30e0dc36 100644
---
a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
+++
b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
@@ -26,7 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
@@ -45,6 +47,12 @@ public class GroupingMessagingEventPublisher extends
AbstractMessagingEventPubli
publish(Collections.singletonList(event));
}
+ @ConfigProperty(name = "kogito.events.grouping.binary", defaultValue =
"false")
+ private boolean binary;
+
+ @ConfigProperty(name = "kogito.events.grouping.compress", defaultValue =
"false")
+ private boolean compress;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void publish(Collection<DataEvent<?>> events) {
@@ -62,7 +70,12 @@ public class GroupingMessagingEventPublisher extends
AbstractMessagingEventPubli
if (firstEvent instanceof UserTaskInstanceDataEvent) {
publishToTopic(entry.getKey(), new
MultipleUserTaskInstanceDataEvent(source,
(Collection<UserTaskInstanceDataEvent<?>>) entry.getValue()));
} else if (firstEvent instanceof ProcessInstanceDataEvent) {
- publishToTopic(entry.getKey(), new
MultipleProcessInstanceDataEvent(source,
(Collection<ProcessInstanceDataEvent<?>>) entry.getValue()));
+ MultipleProcessInstanceDataEvent sent = new
MultipleProcessInstanceDataEvent(source, (Collection<ProcessInstanceDataEvent<?
extends KogitoMarshallEventSupport>>) entry.getValue());
+ if (binary) {
+
sent.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE);
+ sent.setCompressed(compress);
+ }
+ publishToTopic(entry.getKey(), sent);
} else {
for (DataEvent<?> event : (Collection<DataEvent<?>>)
entry.getValue()) {
publishToTopic(entry.getKey(), event);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]