This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new ce37bdc31a [Fix #3721] Optimize event grouping (#3739)
ce37bdc31a is described below

commit ce37bdc31a97d955a53d0ff57460f62246de1243
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Fri Oct 25 13:36:14 2024 +0200

    [Fix #3721] Optimize event grouping (#3739)
    
    * [Fix #3721] Optimize event grouping
    
    * [Fix #3721] Adding addons
    
    * [Fix #3721] Refactoring KogitoIndexConverter
---
 api/kogito-events-api/pom.xml                      |   8 +-
 .../kogito/event/process/CloudEventVisitor.java}   |  15 +-
 .../KogitoEventBodySerializationHelper.java        | 296 +++++++++++++++++++++
 .../event/process/KogitoMarshallEventSupport.java} |  16 +-
 .../process/ProcessInstanceErrorEventBody.java     |  32 ++-
 .../process/ProcessInstanceNodeEventBody.java      |  47 +++-
 .../event/process/ProcessInstanceSLAEventBody.java |  41 ++-
 .../process/ProcessInstanceStateEventBody.java     |  43 ++-
 .../process/ProcessInstanceVariableEventBody.java  |  37 ++-
 .../org/kie/kogito/event/DataEventFactory.java     |  17 ++
 .../JacksonTypeCloudEventDataConverter.java}       |  26 +-
 .../process/MultipleProcessInstanceDataEvent.java  |  19 +-
 .../process/ProcessInstanceErrorDataEvent.java     |   5 +-
 .../process/ProcessInstanceNodeDataEvent.java      |   5 +-
 .../event/process/ProcessInstanceSLADataEvent.java |   5 +-
 .../process/ProcessInstanceStateDataEvent.java     |   5 +-
 .../process/ProcessInstanceVariableDataEvent.java  |   5 +-
 .../JsonProcessInstanceDataEventDeserializer.java  |  79 ++++++
 .../JsonUserTaskInstanceDataEventDeserializer.java |  82 ++++++
 .../KogitoDataEventSerializationHelper.java        |  74 ++++++
 .../KogitoSerializationModule.java}                |  21 +-
 ...rocessDataInstanceBeanDeserializerModifier.java |  40 +++
 ...eProcessDataInstanceBeanSerializerModifier.java |  40 +++
 ...ultipleProcessDataInstanceConverterFactory.java |  65 +++++
 ...ltipleProcessInstanceDataEventDeserializer.java | 184 +++++++++++++
 ...MultipleProcessInstanceDataEventSerializer.java | 115 ++++++++
 .../ProcessInstanceDataEventExtensionRecord.java   | 168 ++++++++++++
 .../services/com.fasterxml.jackson.databind.Module |   1 +
 .../kogito/event/process/ProcessEventsTest.java    | 221 +++++++++++++--
 .../config/GlobalObjectMapperQuarkusTemplate.java  |   2 +-
 .../process/GroupingMessagingEventPublisher.java   |  15 +-
 31 files changed, 1650 insertions(+), 79 deletions(-)

diff --git a/api/kogito-events-api/pom.xml b/api/kogito-events-api/pom.xml
index 63726ae825..1bd9097e94 100644
--- a/api/kogito-events-api/pom.xml
+++ b/api/kogito-events-api/pom.xml
@@ -47,10 +47,6 @@
     </dependency>
     
     <!-- CloudEvents -->
-     <dependency>
-      <groupId>io.cloudevents</groupId>
-      <artifactId>cloudevents-core</artifactId>
-    </dependency>
     <!-- Tests -->
     <dependency>
       <groupId>org.junit.jupiter</groupId>
@@ -72,6 +68,10 @@
       <artifactId>slf4j-simple</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+       <groupId>org.kie.kogito</groupId>
+       <artifactId>kogito-jackson-utils</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
similarity index 65%
copy from 
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to 
api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
index 7db8c0e765..1d6f2c1c59 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java
@@ -18,17 +18,8 @@
  */
 package org.kie.kogito.event.process;
 
-import java.net.URI;
-import java.util.Collection;
+import org.kie.kogito.event.DataEvent;
 
-public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
-
-    public static final String TYPE = "MultipleProcessInstanceDataEvent";
-
-    public MultipleProcessInstanceDataEvent() {
-    }
-
-    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<?>> body) {
-        super(TYPE, source, body);
-    }
+public interface CloudEventVisitor {
+    void visit(DataEvent<?> cloudEvent);
 }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
new file mode 100644
index 0000000000..d00146eb9c
--- /dev/null
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.process;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.Date;
+
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class KogitoEventBodySerializationHelper {
+
+    private KogitoEventBodySerializationHelper() {
+    }
+
+    public static String readUTF(DataInput in) throws IOException {
+        boolean isNotNull = in.readBoolean();
+        return isNotNull ? in.readUTF() : null;
+    }
+
+    public static void writeUTF(DataOutput out, String string) throws 
IOException {
+        if (string == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeUTF(string);
+        }
+    }
+
+    public static void writeDate(DataOutput out, Date date) throws IOException 
{
+        if (date == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeLong(date.getTime());
+        }
+    }
+
+    public static Date readDate(DataInput in) throws IOException {
+        boolean isNotNull = in.readBoolean();
+        return isNotNull ? new Date(in.readLong()) : null;
+    }
+
+    public static void writeTime(DataOutput out, OffsetDateTime date) throws 
IOException {
+        if (date == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeLong(date.toInstant().toEpochMilli());
+        }
+    }
+
+    public static OffsetDateTime readTime(DataInput in) throws IOException {
+        boolean isNotNull = in.readBoolean();
+        return isNotNull ? 
Instant.ofEpochMilli(in.readLong()).atOffset(ZoneOffset.UTC) : null;
+    }
+
+    public static void writeUTFCollection(DataOutput out, Collection<String> 
collection) throws IOException {
+        if (collection == null) {
+            writeInt(out, -1);
+        } else {
+            writeInt(out, collection.size());
+            for (String item : collection) {
+                writeUTF(out, item);
+            }
+        }
+    }
+
+    public static <T extends Collection<String>> T readUTFCollection(DataInput 
in, T holder) throws IOException {
+        int size = readInt(in);
+        if (size == -1) {
+            return null;
+        }
+        while (size-- > 0) {
+            holder.add(readUTF(in));
+        }
+        return holder;
+    }
+
+    private enum SerType {
+
+        NULL(KogitoEventBodySerializationHelper::writeNull, 
KogitoEventBodySerializationHelper::readNull),
+        JSON(KogitoEventBodySerializationHelper::writeJson, 
KogitoEventBodySerializationHelper::readJson),
+        DEFAULT(KogitoEventBodySerializationHelper::writeJson, 
KogitoEventBodySerializationHelper::readDefault),
+        STRING(KogitoEventBodySerializationHelper::writeString, 
DataInput::readUTF),
+        INT(KogitoEventBodySerializationHelper::writeInt, DataInput::readInt),
+        SHORT(KogitoEventBodySerializationHelper::writeShort, 
DataInput::readShort),
+        LONG(KogitoEventBodySerializationHelper::writeLong, 
DataInput::readLong),
+        BYTE(KogitoEventBodySerializationHelper::writeByte, 
DataInput::readByte),
+        BOOLEAN(KogitoEventBodySerializationHelper::writeBoolean, 
DataInput::readBoolean),
+        FLOAT(KogitoEventBodySerializationHelper::writeFloat, 
DataInput::readFloat),
+        DOUBLE(KogitoEventBodySerializationHelper::writeDouble, 
DataInput::readDouble);
+
+        final ObjectWriter writer;
+        final ObjectReader reader;
+
+        SerType(ObjectWriter writer, ObjectReader reader) {
+            this.writer = writer;
+            this.reader = reader;
+        }
+
+        ObjectWriter writer() {
+            return writer;
+        }
+
+        ObjectReader reader() {
+            return reader;
+        }
+
+        static SerType fromType(Class<?> type) {
+            if (JsonNode.class.isAssignableFrom(type)) {
+                return JSON;
+            } else if (String.class.isAssignableFrom(type)) {
+                return STRING;
+            } else if (Boolean.class.isAssignableFrom(type)) {
+                return BOOLEAN;
+            } else if (Integer.class.isAssignableFrom(type)) {
+                return INT;
+            } else if (Short.class.isAssignableFrom(type)) {
+                return SHORT;
+            } else if (Byte.class.isAssignableFrom(type)) {
+                return BYTE;
+            } else if (Long.class.isAssignableFrom(type)) {
+                return LONG;
+            } else if (Float.class.isAssignableFrom(type)) {
+                return FLOAT;
+            } else if (Double.class.isAssignableFrom(type)) {
+                return DOUBLE;
+            } else {
+                return DEFAULT;
+            }
+        }
+
+        static SerType fromObject(Object obj) {
+            return obj == null ? NULL : fromType(obj.getClass());
+        }
+    }
+
+    private static void writeType(DataOutput out, SerType type) throws 
IOException {
+        out.writeByte(type.ordinal());
+    }
+
+    private static SerType readType(DataInput in) throws IOException {
+        return SerType.values()[in.readByte()];
+    }
+
+    public static void writeObject(DataOutput out, Object obj) throws 
IOException {
+        SerType type = SerType.fromObject(obj);
+        writeType(out, type);
+        type.writer().accept(out, obj);
+    }
+
+    public static Object readObject(DataInput in) throws IOException {
+        return readType(in).reader().apply(in);
+    }
+
+    @FunctionalInterface
+    private static interface ObjectWriter {
+        void accept(DataOutput out, Object obj) throws IOException;
+    }
+
+    private static interface ObjectReader {
+        Object apply(DataInput out) throws IOException;
+    }
+
+    private static void writeString(DataOutput out, Object obj) throws 
IOException {
+        out.writeUTF((String) obj);
+    }
+
+    private static void writeBoolean(DataOutput out, Object obj) throws 
IOException {
+        out.writeBoolean((Boolean) obj);
+    }
+
+    private static void writeInt(DataOutput out, Object obj) throws 
IOException {
+        out.writeInt((Integer) obj);
+    }
+
+    private static void writeLong(DataOutput out, Object obj) throws 
IOException {
+        out.writeInt((Integer) obj);
+    }
+
+    private static void writeShort(DataOutput out, Object obj) throws 
IOException {
+        out.writeShort((Short) obj);
+    }
+
+    private static void writeByte(DataOutput out, Object obj) throws 
IOException {
+        out.writeByte((Byte) obj);
+    }
+
+    private static void writeFloat(DataOutput out, Object obj) throws 
IOException {
+        out.writeFloat((Float) obj);
+    }
+
+    private static void writeDouble(DataOutput out, Object obj) throws 
IOException {
+        out.writeDouble((Double) obj);
+    }
+
+    private static void writeNull(DataOutput out, Object obj) {
+        // do nothing
+    }
+
+    private static Object readNull(DataInput in) {
+        return null;
+    }
+
+    public static void writeInteger(DataOutput out, Integer integer) throws 
IOException {
+        if (integer == null) {
+            writeType(out, SerType.NULL);
+        } else {
+            writeInt(out, integer.intValue());
+        }
+    }
+
+    public static Integer readInteger(DataInput in) throws IOException {
+        SerType type = readType(in);
+        return type == SerType.NULL ? null : readInt(in, type);
+    }
+
+    public static void writeInt(DataOutput out, int size) throws IOException {
+        if (size < Byte.MAX_VALUE) {
+            writeType(out, SerType.BYTE);
+            out.writeByte((byte) size);
+        } else if (size < Short.MAX_VALUE) {
+            writeType(out, SerType.SHORT);
+            out.writeShort((short) size);
+        } else {
+            writeType(out, SerType.INT);
+            out.writeInt(size);
+        }
+    }
+
+    public static int readInt(DataInput in) throws IOException {
+        SerType type = readType(in);
+        return readInt(in, type);
+    }
+
+    private static int readInt(DataInput in, SerType type) throws IOException {
+        switch (type) {
+            case INT:
+                return in.readInt();
+            case SHORT:
+                return in.readShort();
+            case BYTE:
+                return in.readByte();
+            default:
+                throw new IOException("Stream corrupted. Read unrecognized 
type " + type);
+        }
+    }
+
+    private static void writeJson(DataOutput out, Object obj) throws 
IOException {
+        byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj);
+        out.writeInt(bytes.length);
+        out.write(bytes);
+    }
+
+    private static Object readJson(DataInput in) throws IOException {
+        return readJson(in, JsonNode.class);
+    }
+
+    private static Object readDefault(DataInput in) throws IOException {
+        return readJson(in, Object.class);
+    }
+
+    private static Object readJson(DataInput in, Class<?> type) throws 
IOException {
+        byte[] bytes = new byte[in.readInt()];
+        in.readFully(bytes);
+        return ObjectMapperFactory.get().readValue(bytes, type);
+    }
+
+    public static Date toDate(OffsetDateTime time) {
+        return time == null ? null : Date.from(time.toInstant());
+    }
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
similarity index 65%
copy from 
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to 
api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
index 7db8c0e765..76693a64c6 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java
@@ -18,17 +18,13 @@
  */
 package org.kie.kogito.event.process;
 
-import java.net.URI;
-import java.util.Collection;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+public interface KogitoMarshallEventSupport {
 
-    public static final String TYPE = "MultipleProcessInstanceDataEvent";
+    void writeEvent(DataOutput out) throws IOException;
 
-    public MultipleProcessInstanceDataEvent() {
-    }
-
-    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<?>> body) {
-        super(TYPE, source, body);
-    }
+    void readEvent(DataInput in) throws IOException;
 }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
index a6ea8e7785..e20807a3d0 100644
--- 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java
@@ -19,9 +19,16 @@
 
 package org.kie.kogito.event.process;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Date;
 
-public class ProcessInstanceErrorEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceErrorEventBody implements 
KogitoMarshallEventSupport, CloudEventVisitor {
 
     // common fields for events
     private Date eventDate;
@@ -138,4 +145,27 @@ public class ProcessInstanceErrorEventBody {
             return instance;
         }
     }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        nodeDefinitionId = in.readUTF();
+        nodeInstanceId = in.readUTF();
+        errorMessage = in.readUTF();
+    }
+
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        out.writeUTF(nodeDefinitionId);
+        out.writeUTF(nodeInstanceId);
+        out.writeUTF(errorMessage);
+    }
+
+    @Override
+    public void visit(DataEvent<?> dataEvent) {
+        this.processId = dataEvent.getKogitoProcessId();
+        this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+        this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+        this.eventDate = toDate(dataEvent.getTime());
+        this.eventUser = dataEvent.getKogitoIdentity();
+    }
 }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
index ca6621c0b1..349a8280b8 100644
--- 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java
@@ -18,12 +18,19 @@
  */
 package org.kie.kogito.event.process;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-public class ProcessInstanceNodeEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceNodeEventBody implements 
KogitoMarshallEventSupport, CloudEventVisitor {
 
     public static final int EVENT_TYPE_ENTER = 1;
 
@@ -71,7 +78,42 @@ public class ProcessInstanceNodeEventBody {
 
     private Map<String, Object> data;
 
-    private ProcessInstanceNodeEventBody() {
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        writeInteger(out, eventType);
+        writeUTF(out, connectionNodeDefinitionId);
+        out.writeUTF(nodeDefinitionId);
+        writeUTF(out, nodeName);
+        out.writeUTF(nodeType);
+        out.writeUTF(nodeInstanceId);
+        writeUTF(out, workItemId);
+        writeDate(out, slaDueDate);
+        writeObject(out, data);
+    }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        eventType = readInteger(in);
+        connectionNodeDefinitionId = readUTF(in);
+        nodeDefinitionId = in.readUTF();
+        nodeName = readUTF(in);
+        nodeType = in.readUTF();
+        nodeInstanceId = in.readUTF();
+        workItemId = readUTF(in);
+        slaDueDate = readDate(in);
+        data = (Map<String, Object>) readObject(in);
+    }
+
+    @Override
+    public void visit(DataEvent<?> dataEvent) {
+        this.processId = dataEvent.getKogitoProcessId();
+        this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+        this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+        this.eventDate = toDate(dataEvent.getTime());
+        this.eventUser = dataEvent.getKogitoIdentity();
+    }
+
+    public ProcessInstanceNodeEventBody() {
         this.data = new HashMap<>();
     }
 
@@ -246,5 +288,4 @@ public class ProcessInstanceNodeEventBody {
         }
 
     }
-
 }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
index 133c0e5715..d054a490b1 100644
--- 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java
@@ -18,9 +18,20 @@
  */
 package org.kie.kogito.event.process;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Date;
 
-public class ProcessInstanceSLAEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readDate;
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readUTF;
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeDate;
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeUTF;
+
+public class ProcessInstanceSLAEventBody implements 
KogitoMarshallEventSupport, CloudEventVisitor {
 
     // common fields for events
     private Date eventDate;
@@ -47,6 +58,34 @@ public class ProcessInstanceSLAEventBody {
 
     private Date slaDueDate;
 
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        out.writeUTF(nodeDefinitionId);
+        writeUTF(out, nodeName);
+        out.writeUTF(nodeType);
+        out.writeUTF(nodeInstanceId);
+        writeDate(out, slaDueDate);
+
+    }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        nodeDefinitionId = in.readUTF();
+        nodeName = readUTF(in);
+        nodeType = in.readUTF();
+        nodeInstanceId = in.readUTF();
+        slaDueDate = readDate(in);
+    }
+
+    @Override
+    public void visit(DataEvent<?> dataEvent) {
+        this.processId = dataEvent.getKogitoProcessId();
+        this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+        this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+        this.eventDate = toDate(dataEvent.getTime());
+        this.eventUser = dataEvent.getKogitoIdentity();
+    }
+
     public Date getSlaDueDate() {
         return slaDueDate;
     }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
index b9bb4fd9ad..8c7c291110 100644
--- 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java
@@ -18,13 +18,21 @@
  */
 package org.kie.kogito.event.process;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class ProcessInstanceStateEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceStateEventBody implements 
KogitoMarshallEventSupport, CloudEventVisitor {
 
     public static final int EVENT_TYPE_STARTED = 1;
     public static final int EVENT_TYPE_ENDED = 2;
@@ -65,6 +73,38 @@ public class ProcessInstanceStateEventBody {
 
     public Date slaDueDate;
 
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        writeInteger(out, eventType);
+        writeUTF(out, processName);
+        writeInteger(out, state);
+        writeUTFCollection(out, roles);
+        writeDate(out, slaDueDate);
+    }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        eventType = readInteger(in);
+        processName = readUTF(in);
+        state = readInteger(in);
+        roles = readUTFCollection(in, new LinkedHashSet<>());
+        slaDueDate = readDate(in);
+    }
+
+    @Override
+    public void visit(DataEvent<?> dataEvent) {
+        this.processId = dataEvent.getKogitoProcessId();
+        this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+        this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+        this.eventDate = toDate(dataEvent.getTime());
+        this.eventUser = dataEvent.getKogitoIdentity();
+        this.parentInstanceId = dataEvent.getKogitoParentProcessInstanceId();
+        this.rootProcessId = dataEvent.getKogitoRootProcessId();
+        this.rootProcessInstanceId = 
dataEvent.getKogitoRootProcessInstanceId();
+        this.processType = dataEvent.getKogitoProcessType();
+        this.businessKey = dataEvent.getKogitoBusinessKey();
+    }
+
     public Date getEventDate() {
         return eventDate;
     }
@@ -262,4 +302,5 @@ public class ProcessInstanceStateEventBody {
         }
 
     }
+
 }
diff --git 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
index 00a55b9cd8..2008f264f6 100644
--- 
a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
+++ 
b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java
@@ -18,12 +18,19 @@
  */
 package org.kie.kogito.event.process;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-public class ProcessInstanceVariableEventBody {
+import org.kie.kogito.event.DataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+public class ProcessInstanceVariableEventBody implements 
KogitoMarshallEventSupport, CloudEventVisitor {
 
     // common fields for events
     private Date eventDate;
@@ -46,6 +53,33 @@ public class ProcessInstanceVariableEventBody {
     private String variableName;
     private Object variableValue;
 
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        writeUTF(out, nodeContainerDefinitionId);
+        writeUTF(out, nodeContainerInstanceId);
+        writeUTF(out, variableId);
+        out.writeUTF(variableName);
+        writeObject(out, variableValue);
+    }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        nodeContainerDefinitionId = readUTF(in);
+        nodeContainerInstanceId = readUTF(in);
+        variableId = readUTF(in);
+        variableName = in.readUTF();
+        variableValue = readObject(in);
+    }
+
+    @Override
+    public void visit(DataEvent<?> dataEvent) {
+        this.processId = dataEvent.getKogitoProcessId();
+        this.processInstanceId = dataEvent.getKogitoProcessInstanceId();
+        this.processVersion = dataEvent.getKogitoProcessInstanceVersion();
+        this.eventDate = toDate(dataEvent.getTime());
+        this.eventUser = dataEvent.getKogitoIdentity();
+    }
+
     public Date getEventDate() {
         return eventDate;
     }
@@ -184,4 +218,5 @@ public class ProcessInstanceVariableEventBody {
             return instance;
         }
     }
+
 }
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
index a52df39f9d..f2317a5d66 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java
@@ -18,6 +18,7 @@
  */
 package org.kie.kogito.event;
 
+import java.io.IOException;
 import java.net.URI;
 import java.time.OffsetDateTime;
 import java.util.Optional;
@@ -43,6 +44,22 @@ public class DataEventFactory {
         return new CloudEventWrapDataEvent<>(event, dataUnmarshaller);
     }
 
+    public static <T extends AbstractDataEvent<V>, V> T from(T dataEvent, 
CloudEvent cloudEvent, Converter<CloudEventData, V> dataUnmarshaller) throws 
IOException {
+        dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
+        dataEvent.setId(cloudEvent.getId());
+        dataEvent.setType(cloudEvent.getType());
+        dataEvent.setSource(cloudEvent.getSource());
+        dataEvent.setDataContentType(cloudEvent.getDataContentType());
+        dataEvent.setDataSchema(cloudEvent.getDataSchema());
+        dataEvent.setSubject(cloudEvent.getSubject());
+        dataEvent.setTime(cloudEvent.getTime());
+        cloudEvent.getExtensionNames().forEach(extensionName -> 
dataEvent.addExtensionAttribute(extensionName, 
cloudEvent.getExtension(extensionName)));
+        if (cloudEvent.getData() != null) {
+            dataEvent.setData(dataUnmarshaller.convert(cloudEvent.getData()));
+        }
+        return dataEvent;
+    }
+
     public static <T> DataEvent<T> from(T eventData, String trigger, 
KogitoProcessInstance pi) {
         return from(eventData, trigger, URI.create("/process/" + 
pi.getProcessId()), Optional.empty(), 
ProcessMeta.fromKogitoProcessInstance(pi));
     }
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
similarity index 52%
copy from 
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to 
api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
index 7db8c0e765..3e64b44a5d 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java
@@ -16,19 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.event.process;
+package org.kie.kogito.event.impl;
 
-import java.net.URI;
-import java.util.Collection;
+import java.io.IOException;
 
-public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+import org.kie.kogito.event.Converter;
 
-    public static final String TYPE = "MultipleProcessInstanceDataEvent";
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
-    public MultipleProcessInstanceDataEvent() {
+import io.cloudevents.CloudEventData;
+
+public class JacksonTypeCloudEventDataConverter<O> implements 
Converter<CloudEventData, O> {
+
+    private ObjectMapper objectMapper;
+    private TypeReference<O> outputType;
+
+    public JacksonTypeCloudEventDataConverter(ObjectMapper objectMapper, 
TypeReference<O> outputType) {
+        this.objectMapper = objectMapper;
+        this.outputType = outputType;
     }
 
-    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<?>> body) {
-        super(TYPE, source, body);
+    @Override
+    public O convert(CloudEventData value) throws IOException {
+        return objectMapper.readValue(value.toBytes(), outputType);
     }
 }
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
index 7db8c0e765..f29a920c13 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
@@ -21,14 +21,25 @@ package org.kie.kogito.event.process;
 import java.net.URI;
 import java.util.Collection;
 
-public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>>> {
 
-    public static final String TYPE = "MultipleProcessInstanceDataEvent";
+    public static final String MULTIPLE_TYPE = 
"MultipleProcessInstanceDataEvent";
+    public static final String BINARY_CONTENT_TYPE = 
"application/octet-stream";
+    public static final String COMPRESS_DATA = "compressdata";
 
     public MultipleProcessInstanceDataEvent() {
     }
 
-    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<?>> body) {
-        super(TYPE, source, body);
+    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> 
body) {
+        super(MULTIPLE_TYPE, source, body);
+    }
+
+    public boolean isCompressed() {
+        Object extension = 
getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+        return extension instanceof Boolean ? ((Boolean) 
extension).booleanValue() : false;
+    }
+
+    public void setCompressed(boolean compressed) {
+        addExtensionAttribute(COMPRESS_DATA, compressed);
     }
 }
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
index 1427e917f2..2c50f8e226 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
 
 public class ProcessInstanceErrorDataEvent extends 
ProcessInstanceDataEvent<ProcessInstanceErrorEventBody> {
 
+    public static final String ERROR_TYPE = "ProcessInstanceErrorDataEvent";
+
     public ProcessInstanceErrorDataEvent() {
+        this.setType(ERROR_TYPE);
     }
 
     public ProcessInstanceErrorDataEvent(String source, String addons, String 
identity, Map<String, Object> metaData, ProcessInstanceErrorEventBody body) {
-        super("ProcessInstanceErrorDataEvent",
+        super(ERROR_TYPE,
                 source,
                 body,
                 (String) 
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
index db118aa009..e1e6a74e76 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
 
 public class ProcessInstanceNodeDataEvent extends 
ProcessInstanceDataEvent<ProcessInstanceNodeEventBody> {
 
+    public static final String NODE_TYPE = "ProcessInstanceNodeDataEvent";
+
     public ProcessInstanceNodeDataEvent() {
+        this.setType(NODE_TYPE);
     }
 
     public ProcessInstanceNodeDataEvent(String source, String addons, String 
identity, Map<String, Object> metaData, ProcessInstanceNodeEventBody body) {
-        super("ProcessInstanceNodeDataEvent",
+        super(NODE_TYPE,
                 source,
                 body,
                 (String) 
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
index 90139ce002..e5b743aeac 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
 
 public class ProcessInstanceSLADataEvent extends 
ProcessInstanceDataEvent<ProcessInstanceSLAEventBody> {
 
+    public static final String SLA_TYPE = "ProcessInstanceSLADataEvent";
+
     public ProcessInstanceSLADataEvent() {
+        this.setType(SLA_TYPE);
     }
 
     public ProcessInstanceSLADataEvent(String source, String addons, String 
identity, Map<String, Object> metaData, ProcessInstanceSLAEventBody body) {
-        super("ProcessInstanceSLADataEvent",
+        super(SLA_TYPE,
                 source,
                 body,
                 (String) 
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
index 38d30defda..a0ce3e1003 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java
@@ -22,11 +22,14 @@ import java.util.Map;
 
 public class ProcessInstanceStateDataEvent extends 
ProcessInstanceDataEvent<ProcessInstanceStateEventBody> {
 
+    public static final String STATE_TYPE = "ProcessInstanceStateDataEvent";
+
     public ProcessInstanceStateDataEvent() {
+        this.setType(STATE_TYPE);
     }
 
     public ProcessInstanceStateDataEvent(String source, String addons, String 
identity, Map<String, Object> metaData, ProcessInstanceStateEventBody body) {
-        super("ProcessInstanceStateDataEvent",
+        super(STATE_TYPE,
                 source,
                 body,
                 (String) 
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
index b7c83d367f..1acf471d5b 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java
@@ -32,15 +32,18 @@ public class ProcessInstanceVariableDataEvent extends 
ProcessInstanceDataEvent<P
 
     private static final Set<String> INTERNAL_EXTENSION_ATTRIBUTES = 
Collections.singleton(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME);
 
+    public static final String VAR_TYPE = "ProcessInstanceVariableDataEvent";
+
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     @JsonProperty(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME)
     private String kogitoVariableName;
 
     public ProcessInstanceVariableDataEvent() {
+        this.setType(VAR_TYPE);
     }
 
     public ProcessInstanceVariableDataEvent(String source, String addons, 
String identity, Map<String, Object> metaData, ProcessInstanceVariableEventBody 
body) {
-        super("ProcessInstanceVariableDataEvent",
+        super(VAR_TYPE,
                 source,
                 body,
                 (String) 
metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA),
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..d97b251250
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class JsonProcessInstanceDataEventDeserializer extends 
StdDeserializer<ProcessInstanceDataEvent<?>> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonProcessInstanceDataEventDeserializer.class);
+
+    private static final long serialVersionUID = 6152014726577574241L;
+
+    public JsonProcessInstanceDataEventDeserializer() {
+        this(JsonProcessInstanceDataEventDeserializer.class);
+    }
+
+    public JsonProcessInstanceDataEventDeserializer(Class<?> vc) {
+        super(vc);
+    }
+
+    @Override
+    public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, 
DeserializationContext ctxt)
+            throws IOException, JsonProcessingException {
+        JsonNode node = jp.getCodec().readTree(jp);
+        LOGGER.debug("Deserialize process instance data event: {}", node);
+        String type = node.get("type").asText();
+
+        switch (type) {
+            case MultipleProcessInstanceDataEvent.MULTIPLE_TYPE:
+                return jp.getCodec().treeToValue(node, 
MultipleProcessInstanceDataEvent.class);
+            case ProcessInstanceErrorDataEvent.ERROR_TYPE:
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
+            case ProcessInstanceNodeDataEvent.NODE_TYPE:
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceNodeDataEvent.class);
+            case ProcessInstanceSLADataEvent.SLA_TYPE:
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceSLADataEvent.class);
+            case ProcessInstanceStateDataEvent.STATE_TYPE:
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceStateDataEvent.class);
+            case ProcessInstanceVariableDataEvent.VAR_TYPE:
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceVariableDataEvent.class);
+            default:
+                LOGGER.warn("Unknown type {} in json data {}", type, node);
+                return (ProcessInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, ProcessInstanceDataEvent.class);
+
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..7e8bf5458a
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class JsonUserTaskInstanceDataEventDeserializer extends 
StdDeserializer<UserTaskInstanceDataEvent<?>> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonUserTaskInstanceDataEventDeserializer.class);
+
+    private static final long serialVersionUID = -6626663191296012306L;
+
+    public JsonUserTaskInstanceDataEventDeserializer() {
+        this(null);
+    }
+
+    public JsonUserTaskInstanceDataEventDeserializer(Class<?> vc) {
+        super(vc);
+    }
+
+    @Override
+    public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, 
DeserializationContext ctxt)
+            throws IOException, JsonProcessingException {
+        JsonNode node = jp.getCodec().readTree(jp);
+        LOGGER.debug("Deserialize user task instance data event: {}", node);
+        String type = node.get("type").asText();
+
+        switch (type) {
+            case MultipleUserTaskInstanceDataEvent.TYPE:
+                return jp.getCodec().treeToValue(node, 
MultipleUserTaskInstanceDataEvent.class);
+            case "UserTaskInstanceAssignmentDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
+            case "UserTaskInstanceAttachmentDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceAttachmentDataEvent.class);
+            case "UserTaskInstanceCommentDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceCommentDataEvent.class);
+            case "UserTaskInstanceDeadlineDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceDeadlineDataEvent.class);
+            case "UserTaskInstanceStateDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceStateDataEvent.class);
+            case "UserTaskInstanceVariableDataEvent":
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceVariableDataEvent.class);
+            default:
+                LOGGER.warn("Unknown type {} in json data {}", type, node);
+                return (UserTaskInstanceDataEvent<?>) 
jp.getCodec().treeToValue(node, UserTaskInstanceDataEvent.class);
+
+        }
+    }
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
new file mode 100644
index 0000000000..f4e512eb22
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+
+import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import io.cloudevents.SpecVersion;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+class KogitoDataEventSerializationHelper {
+
+    private KogitoDataEventSerializationHelper() {
+    }
+
+    static void writeCloudEventAttrs(DataOutput out, DataEvent<?> data) throws 
IOException {
+        out.writeUTF(data.getSpecVersion().toString());
+        out.writeUTF(data.getId());
+        writeUTF(out, data.getSubject());
+        writeUTF(out, data.getDataContentType());
+        writeUTF(out, data.getDataSchema() != null ? 
data.getDataSchema().toString() : null);
+    }
+
+    static <T extends AbstractDataEvent<?>> T readCloudEventAttrs(DataInput 
in, T data) throws IOException {
+        data.setSpecVersion(SpecVersion.parse(in.readUTF()));
+        data.setId(in.readUTF());
+        data.setSubject(readUTF(in));
+        data.setDataContentType(readUTF(in));
+        String dataSchema = readUTF(in);
+        if (dataSchema != null) {
+            data.setDataSchema(URI.create(dataSchema));
+        }
+        return data;
+    }
+
+    static void populateCloudEvent(ProcessInstanceDataEvent<?> event, 
ProcessInstanceDataEventExtensionRecord info) {
+        event.setKogitoBusinessKey(info.getBusinessKey());
+        event.setKogitoProcessId(info.getId());
+        event.setKogitoProcessInstanceId(info.getInstanceId());
+        event.setKogitoParentProcessInstanceId(info.getParentInstanceId());
+        event.setKogitoProcessInstanceState(info.getState());
+        event.setKogitoProcessInstanceVersion(info.getVersion());
+        event.setKogitoProcessType(info.getType());
+        event.setKogitoRootProcessId(info.getRootId());
+        event.setKogitoRootProcessInstanceId(info.getRootInstanceId());
+        event.setKogitoIdentity(info.getIdentity());
+        event.setSource(info.getSource());
+        event.setKogitoAddons(info.getAddons());
+    }
+
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
similarity index 50%
copy from 
api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
copy to 
api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
index 7db8c0e765..381e0b93d2 100644
--- 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java
@@ -16,19 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.kie.kogito.event.process;
+package org.kie.kogito.event.serializer;
 
-import java.net.URI;
-import java.util.Collection;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
 
-public class MultipleProcessInstanceDataEvent extends 
ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+import com.fasterxml.jackson.databind.module.SimpleModule;
 
-    public static final String TYPE = "MultipleProcessInstanceDataEvent";
+public class KogitoSerializationModule extends SimpleModule {
 
-    public MultipleProcessInstanceDataEvent() {
-    }
+    private static final long serialVersionUID = 1L;
 
-    public MultipleProcessInstanceDataEvent(URI source, 
Collection<ProcessInstanceDataEvent<?>> body) {
-        super(TYPE, source, body);
+    public KogitoSerializationModule() {
+        super("KogitoSerialization");
+        setSerializerModifier(new 
MultipleProcessDataInstanceBeanSerializerModifier());
+        setDeserializerModifier(new 
MultipleProcessDataInstanceBeanDeserializerModifier());
+        addDeserializer(ProcessInstanceDataEvent.class, new 
JsonProcessInstanceDataEventDeserializer());
+        addDeserializer(UserTaskInstanceDataEvent.class, new 
JsonUserTaskInstanceDataEventDeserializer());
     }
 }
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
new file mode 100644
index 0000000000..d72be4e5b5
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+
+public class MultipleProcessDataInstanceBeanDeserializerModifier extends 
BeanDeserializerModifier {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public JsonDeserializer<?> modifyDeserializer(
+            DeserializationConfig config, BeanDescription beanDesc, 
JsonDeserializer<?> deserializer) {
+        if 
(beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) {
+            return new 
MultipleProcessInstanceDataEventDeserializer((JsonDeserializer<Object>) 
deserializer);
+        }
+        return deserializer;
+    }
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
new file mode 100644
index 0000000000..3c62d932a3
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
+
+public class MultipleProcessDataInstanceBeanSerializerModifier extends 
BeanSerializerModifier {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public JsonSerializer<?> modifySerializer(
+            SerializationConfig config, BeanDescription beanDesc, 
JsonSerializer<?> serializer) {
+        if 
(beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) {
+            return new 
MultipleProcessInstanceDataEventSerializer((JsonSerializer<Object>) serializer);
+        }
+        return serializer;
+    }
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
new file mode 100644
index 0000000000..4f8a6205c2
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collection;
+
+import org.kie.kogito.event.Converter;
+import org.kie.kogito.event.impl.JacksonTypeCloudEventDataConverter;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventData;
+
+public class MultipleProcessDataInstanceConverterFactory {
+
+    private MultipleProcessDataInstanceConverterFactory() {
+    }
+
+    public static Converter<CloudEventData, 
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> 
fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) {
+        if 
(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType()))
 {
+            return isCompressed(cloudEvent) ? compressedConverter : 
binaryConverter;
+        } else {
+            return new JacksonTypeCloudEventDataConverter<>(objectMapper, new 
TypeReference<Collection<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>>>() {
+            });
+        }
+    }
+
+    private static boolean isCompressed(CloudEvent event) {
+        Object value = 
event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+        return value instanceof Boolean ? ((Boolean) value).booleanValue() : 
false;
+    }
+
+    private static Converter<CloudEventData, 
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> 
binaryConverter =
+            data -> deserialize(data, false);
+
+    private static Converter<CloudEventData, 
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> 
compressedConverter =
+            data -> deserialize(data, true);
+
+    private static Collection<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>> deserialize(CloudEventData data, boolean compress) 
throws IOException {
+        return 
MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()),
 compress);
+    }
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
new file mode 100644
index 0000000000..6e0b0f262c
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.zip.GZIPInputStream;
+
+import org.kie.kogito.event.process.CloudEventVisitor;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
+import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
+import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
+import org.kie.kogito.event.process.ProcessInstanceSLAEventBody;
+import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
+import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
+
+import io.cloudevents.SpecVersion;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;
+
+public class MultipleProcessInstanceDataEventDeserializer extends 
JsonDeserializer<MultipleProcessInstanceDataEvent> implements 
ResolvableDeserializer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
+
+    private JsonDeserializer<Object> defaultDeserializer;
+
+    public 
MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object> 
deserializer) {
+        this.defaultDeserializer = deserializer;
+    }
+
+    @Override
+    public void resolve(DeserializationContext ctxt) throws 
JsonMappingException {
+        ((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
+    }
+
+    @Override
+    public MultipleProcessInstanceDataEvent deserialize(JsonParser p, 
DeserializationContext ctxt)
+            throws IOException, JacksonException {
+        JsonNode node = p.getCodec().readTree(p);
+        JsonNode dataContentType = node.get("datacontenttype");
+        if (dataContentType != null && 
MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(dataContentType.asText()))
 {
+            MultipleProcessInstanceDataEvent event = new 
MultipleProcessInstanceDataEvent();
+            event.setDataContentType(dataContentType.asText());
+            event.setSource(URI.create(node.get("source").asText()));
+            event.setType(node.get("type").asText());
+            
event.setSpecVersion(SpecVersion.parse(node.get("specversion").asText()));
+            event.setId(node.get("id").asText());
+            JsonNode data = node.get("data");
+            if (data != null) {
+                event.setData(readFromBytes(data.binaryValue(), 
isCompressed(node)));
+            }
+            return event;
+        } else {
+            JsonParser newParser = node.traverse(p.getCodec());
+            newParser.nextToken();
+            return (MultipleProcessInstanceDataEvent) 
defaultDeserializer.deserialize(newParser, ctxt);
+        }
+    }
+
+    private static boolean isCompressed(JsonNode node) {
+        JsonNode compress = 
node.get(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
+        return compress != null && compress.isBoolean() ? compress.asBoolean() 
: false;
+    }
+
+    static Collection<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean 
compressed) throws IOException {
+        InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
+        if (compressed) {
+            logger.trace("Gzip compressed byte array");
+            wrappedIn = new GZIPInputStream(wrappedIn);
+        }
+        try (DataInputStream in = new DataInputStream(wrappedIn)) {
+            int size = readInt(in);
+            logger.trace("Reading collection of size {}", size);
+            Collection<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>> result = new ArrayList<>(size);
+            List<ProcessInstanceDataEventExtensionRecord> infos = new 
ArrayList<>();
+            while (size-- > 0) {
+                byte readInfo = in.readByte();
+                logger.trace("Info ordinal is {}", readInfo);
+                ProcessInstanceDataEventExtensionRecord info;
+                if (readInfo == -1) {
+                    info = new ProcessInstanceDataEventExtensionRecord();
+                    info.readEvent(in);
+                    logger.trace("Info readed is {}", info);
+                    infos.add(info);
+                } else {
+                    info = infos.get(readInfo);
+                    logger.trace("Info cached is {}", info);
+                }
+                String type = in.readUTF();
+                logger.trace("Type is {}", info);
+                result.add(getCloudEvent(in, type, info));
+                logger.trace("{} events remaining", size);
+            }
+            return result;
+        }
+    }
+
+    private static ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type, 
ProcessInstanceDataEventExtensionRecord info) throws IOException {
+        switch (type) {
+            case ProcessInstanceVariableDataEvent.VAR_TYPE:
+                ProcessInstanceVariableDataEvent item = buildDataEvent(in, new 
ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, 
info);
+                item.setKogitoVariableName(item.getData().getVariableName());
+                return item;
+            case ProcessInstanceStateDataEvent.STATE_TYPE:
+                return buildDataEvent(in, new ProcessInstanceStateDataEvent(), 
ProcessInstanceStateEventBody::new, info);
+            case ProcessInstanceNodeDataEvent.NODE_TYPE:
+                return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), 
ProcessInstanceNodeEventBody::new, info);
+            case ProcessInstanceErrorDataEvent.ERROR_TYPE:
+                return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), 
ProcessInstanceErrorEventBody::new, info);
+            case ProcessInstanceSLADataEvent.SLA_TYPE:
+                return buildDataEvent(in, new ProcessInstanceSLADataEvent(), 
ProcessInstanceSLAEventBody::new, info);
+            default:
+                throw new UnsupportedOperationException("Unrecognized event 
type " + type);
+        }
+    }
+
+    private static <T extends ProcessInstanceDataEvent<V>, V extends 
KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, 
T cloudEvent, Supplier<V> bodySupplier,
+            ProcessInstanceDataEventExtensionRecord info) throws IOException {
+        int delta = readInt(in);
+        logger.trace("Time delta is {}", delta);
+        cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
+        KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
+        logger.trace("Cloud event before population {}", cloudEvent);
+        KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, 
info);
+        logger.trace("Cloud event after population {}", cloudEvent);
+
+        boolean isNotNull = in.readBoolean();
+        if (isNotNull) {
+            logger.trace("Data is not null");
+            V body = bodySupplier.get();
+            body.readEvent(in);
+            logger.trace("Event body before population {}", body);
+            body.visit(cloudEvent);
+            logger.trace("Event body after population {}", body);
+            cloudEvent.setData(body);
+        } else {
+            logger.trace("Data is null");
+        }
+        return cloudEvent;
+    }
+
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
new file mode 100644
index 0000000000..42825e9679
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;
+
+public class MultipleProcessInstanceDataEventSerializer extends 
JsonSerializer<MultipleProcessInstanceDataEvent> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
+
+    private JsonSerializer<Object> defaultSerializer;
+
+    public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object> 
serializer) {
+        this.defaultSerializer = serializer;
+    }
+
+    @Override
+    public void serialize(MultipleProcessInstanceDataEvent value, 
JsonGenerator gen, SerializerProvider serializers)
+            throws IOException {
+        if 
(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(value.getDataContentType()))
 {
+            gen.writeStartObject();
+            gen.writeStringField("datacontenttype", 
value.getDataContentType());
+            gen.writeStringField("source", value.getSource().toString());
+            gen.writeStringField("id", value.getId());
+            gen.writeStringField("specversion", 
value.getSpecVersion().toString());
+            gen.writeStringField("type", value.getType());
+            boolean compress = value.isCompressed();
+            if (compress) {
+                
gen.writeBooleanField(MultipleProcessInstanceDataEvent.COMPRESS_DATA, true);
+            }
+            gen.writeBinaryField("data", dataAsBytes(gen, value.getData(), 
compress));
+            gen.writeEndObject();
+        } else {
+            defaultSerializer.serialize(value, gen, serializers);
+        }
+    }
+
+    private byte[] dataAsBytes(JsonGenerator gen, 
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> 
data, boolean compress) throws IOException {
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(compress ? new 
GZIPOutputStream(bytesOut) : bytesOut)) {
+            logger.trace("Writing size {}", data.size());
+            writeInt(out, data.size());
+            Map<String, ProcessInstanceDataEventExtensionRecord> infos = new 
HashMap<>();
+            for (ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport> cloudEvent : data) {
+                String key = cloudEvent.getKogitoProcessInstanceId();
+                ProcessInstanceDataEventExtensionRecord info = infos.get(key);
+                if (info == null) {
+                    logger.trace("Writing marker byte -1");
+                    out.writeByte((byte) -1);
+                    info = new 
ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
+                    logger.trace("Writing info", info);
+                    info.writeEvent(out);
+                    infos.put(key, info);
+                } else {
+                    logger.trace("Writing marker byte {}", info.getOrdinal());
+                    out.writeByte((byte) info.getOrdinal());
+                }
+                logger.trace("Writing type {}", cloudEvent.getType());
+                out.writeUTF(cloudEvent.getType());
+                int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
+                logger.trace("Writing time delta {}", timeDelta);
+                writeInt(out, timeDelta);
+                logger.trace("Writing cloud event attrs {}", cloudEvent);
+                KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, 
cloudEvent);
+                KogitoMarshallEventSupport itemData = cloudEvent.getData();
+                if (itemData != null) {
+                    logger.trace("Writing data not null boolean");
+                    out.writeBoolean(true);
+                    logger.trace("Writing cloud event body {}", itemData);
+                    itemData.writeEvent(out);
+                } else {
+                    logger.trace("Writing data null boolean");
+                    out.writeBoolean(false);
+                }
+                logger.trace("individual event writing completed");
+            }
+        }
+        return bytesOut.toByteArray();
+    }
+
+}
diff --git 
a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
new file mode 100644
index 0000000000..28abac1a28
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.time.OffsetDateTime;
+
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
+
+class ProcessInstanceDataEventExtensionRecord implements 
KogitoMarshallEventSupport {
+
+    // referenceId and startFromNode are not used by process instance events
+    private String id;
+    private String instanceId;
+    private String version;
+    private String state;
+    private String type;
+    private String parentInstanceId;
+    private String rootId;
+    private String rootInstanceId;
+    private String businessKey;
+    private String identity;
+    private URI source;
+    private OffsetDateTime time;
+    private String addons;
+    private transient int ordinal;
+
+    public ProcessInstanceDataEventExtensionRecord() {
+    }
+
+    public ProcessInstanceDataEventExtensionRecord(int ordinal, 
ProcessInstanceDataEvent<?> dataEvent) {
+        this.ordinal = ordinal;
+        id = dataEvent.getKogitoProcessId();
+        instanceId = dataEvent.getKogitoProcessInstanceId();
+        version = dataEvent.getKogitoProcessInstanceVersion();
+        state = dataEvent.getKogitoProcessInstanceState();
+        type = dataEvent.getKogitoProcessType();
+        parentInstanceId = dataEvent.getKogitoParentProcessInstanceId();
+        rootId = dataEvent.getKogitoRootProcessId();
+        rootInstanceId = dataEvent.getKogitoRootProcessInstanceId();
+        businessKey = dataEvent.getKogitoBusinessKey();
+        identity = dataEvent.getKogitoIdentity();
+        time = dataEvent.getTime();
+        source = dataEvent.getSource();
+        addons = dataEvent.getKogitoAddons();
+    }
+
+    public int getOrdinal() {
+        return ordinal;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getBusinessKey() {
+        return businessKey;
+    }
+
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public String getParentInstanceId() {
+        return parentInstanceId;
+    }
+
+    public String getRootId() {
+        return rootId;
+    }
+
+    public String getRootInstanceId() {
+        return rootInstanceId;
+    }
+
+    public String getIdentity() {
+        return identity;
+    }
+
+    public OffsetDateTime getTime() {
+        return time;
+    }
+
+    public URI getSource() {
+        return source;
+    }
+
+    public String getAddons() {
+        return addons;
+    }
+
+    @Override
+    public void writeEvent(DataOutput out) throws IOException {
+        out.writeUTF(id);
+        out.writeUTF(instanceId);
+        out.writeUTF(version);
+        out.writeUTF(state);
+        writeUTF(out, type);
+        writeUTF(out, parentInstanceId);
+        writeUTF(out, rootId);
+        writeUTF(out, rootInstanceId);
+        writeUTF(out, businessKey);
+        writeUTF(out, identity);
+        writeTime(out, time);
+        out.writeUTF(source.toString());
+        writeUTF(out, addons);
+    }
+
+    @Override
+    public void readEvent(DataInput in) throws IOException {
+        id = in.readUTF();
+        instanceId = in.readUTF();
+        version = in.readUTF();
+        state = in.readUTF();
+        type = readUTF(in);
+        parentInstanceId = readUTF(in);
+        rootId = readUTF(in);
+        rootInstanceId = readUTF(in);
+        businessKey = readUTF(in);
+        identity = readUTF(in);
+        time = readTime(in);
+        source = URI.create(in.readUTF());
+        addons = readUTF(in);
+    }
+
+    @Override
+    public String toString() {
+        return "ProcessInstanceDataEventExtensionRecord [id=" + id + ", 
instanceId=" + instanceId + ", version="
+                + version + ", state=" + state + ", type=" + type + ", 
parentInstanceId=" + parentInstanceId
+                + ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId 
+ ", businessKey=" + businessKey
+                + ", identity=" + identity + ", source=" + source + ", time=" 
+ time + ", addons=" + addons + "]";
+    }
+
+}
diff --git 
a/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
 
b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
new file mode 100644
index 0000000000..85fd599eb4
--- /dev/null
+++ 
b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module
@@ -0,0 +1 @@
+org.kie.kogito.event.serializer.KogitoSerializationModule
\ No newline at end of file
diff --git 
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
 
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
index 0d4fb4fafa..d278e2c427 100644
--- 
a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
+++ 
b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java
@@ -18,31 +18,41 @@
  */
 package org.kie.kogito.event.process;
 
+import java.io.IOException;
 import java.net.URI;
 import java.time.OffsetDateTime;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Test;
 import org.kie.kogito.event.AbstractDataEvent;
+import org.kie.kogito.event.DataEventFactory;
 import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
+import 
org.kie.kogito.event.serializer.MultipleProcessDataInstanceConverterFactory;
 import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
+import org.kie.kogito.jackson.utils.JsonObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
+import io.cloudevents.CloudEvent;
 import io.cloudevents.SpecVersion;
 import io.cloudevents.jackson.JsonFormat;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
 
 class ProcessEventsTest {
-
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .registerModule(new JavaTimeModule())
             .registerModule(JsonFormat.getCloudEventJacksonModule())
-            
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+            
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+            .findAndRegisterModules();
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProcessEventsTest.class);
 
     private static final Set<String> BASE_EXTENSION_NAMES = Arrays.stream(new 
String[] {
             CloudEventExtensionConstants.PROCESS_INSTANCE_ID,
@@ -53,10 +63,9 @@ class ProcessEventsTest {
             CloudEventExtensionConstants.PROCESS_INSTANCE_VERSION,
             CloudEventExtensionConstants.PROCESS_PARENT_PROCESS_INSTANCE_ID,
             CloudEventExtensionConstants.PROCESS_INSTANCE_STATE,
-            CloudEventExtensionConstants.PROCESS_REFERENCE_ID,
-            CloudEventExtensionConstants.PROCESS_START_FROM_NODE,
             CloudEventExtensionConstants.BUSINESS_KEY,
-            CloudEventExtensionConstants.PROCESS_TYPE 
}).collect(Collectors.toSet());
+            CloudEventExtensionConstants.PROCESS_TYPE,
+            CloudEventExtensionConstants.IDENTITY 
}).collect(Collectors.toSet());
 
     private static final String PROCESS_INSTANCE_EVENT_TYPE = 
"ProcessInstanceEvent";
     private static final String USER_TASK_INSTANCE_EVENT_TYPE = 
"UserTaskInstanceEvent";
@@ -77,16 +86,22 @@ class ProcessEventsTest {
     private static final String ROOT_PROCESS_ID = "ROOT_PROCESS_ID";
     private static final String PROCESS_PARENT_PROCESS_INSTANCE_ID = 
"PROCESS_PARENT_PROCESS_INSTANCE_ID";
     private static final String PROCESS_INSTANCE_STATE = 
"PROCESS_INSTANCE_STATE";
-    private static final String PROCESS_REFERENCE_ID = "PROCESS_REFERENCE_ID";
-    private static final String PROCESS_START_FROM_NODE = 
"PROCESS_START_FROM_NODE";
     private static final String BUSINESS_KEY = "BUSINESS_KEY";
     private static final String PROCESS_TYPE = "PROCESS_TYPE";
     private static final String ADDONS = "ADDONS";
-
+    private static final int PROCESS_STATE = 1;
+    private static final String NODE_CONTAINER_ID = "323";
+    private static final String NODE_CONTAINER_INSTANCEID = "323-3232-3232";
     private static final String EXTENSION_1 = "EXTENSION_1";
     private static final String EXTENSION_1_VALUE = "EXTENSION_1_VALUE";
     private static final String EXTENSION_2 = "EXTENSION_2";
     private static final String EXTENSION_2_VALUE = "EXTENSION_2_VALUE";
+    private static final String ERROR_MESSAGE = "AAAAAAHHHHH!!!!!";
+
+    private static final int EVENT_TYPE = 1;
+
+    private static final String NODE_NAME = "NODE_NAME";
+    private static final String NODE_TYPE = "NODE_TYPE";
 
     private static final String VARIABLE_NAME = "VARIABLE_NAME";
 
@@ -112,6 +127,178 @@ class ProcessEventsTest {
         assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES, 
EXTENSION_1, EXTENSION_2);
     }
 
+    @Test
+    void multipleInstanceDataEvent() throws IOException {
+        JsonNode expectedVarValue = 
OBJECT_MAPPER.createObjectNode().put("name", "John Doe");
+        int standard = processMultipleInstanceDataEvent(expectedVarValue, 
false, false);
+        int binary = processMultipleInstanceDataEvent(expectedVarValue, true, 
false);
+        int binaryCompressed = 
processMultipleInstanceDataEvent(expectedVarValue, true, true);
+        assertThat(standard).isGreaterThan(binary);
+        assertThat(binary).isGreaterThan(binaryCompressed);
+    }
+
+    private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, 
boolean binary, boolean compress) throws IOException {
+        ProcessInstanceStateDataEvent stateEvent = new 
ProcessInstanceStateDataEvent();
+        setBaseEventValues(stateEvent, 
ProcessInstanceStateDataEvent.STATE_TYPE);
+        
stateEvent.setData(ProcessInstanceStateEventBody.create().eventDate(toDate(TIME)).eventType(EVENT_TYPE).eventUser(SUBJECT)
+                
.businessKey(BUSINESS_KEY).processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).state(PROCESS_STATE)
+                
.processVersion(PROCESS_INSTANCE_VERSION).parentInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID).processName(PROCESS_ID)
+                
.processType(PROCESS_TYPE).rootProcessId(ROOT_PROCESS_ID).rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID).build());
+
+        ProcessInstanceVariableDataEvent varEvent = new 
ProcessInstanceVariableDataEvent();
+        setBaseEventValues(varEvent, 
ProcessInstanceVariableDataEvent.VAR_TYPE);
+        
varEvent.addExtensionAttribute(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME,
 VARIABLE_NAME);
+        
varEvent.setData(ProcessInstanceVariableEventBody.create().eventDate(toDate(TIME)).eventUser(SUBJECT)
+                
.processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION)
+                
.nodeContainerDefinitionId(NODE_CONTAINER_ID).nodeContainerInstanceId(NODE_CONTAINER_INSTANCEID)
+                .variableName(VARIABLE_NAME)
+                .variableId(VARIABLE_NAME)
+                .variableValue(expectedVarValue)
+                .build());
+
+        ProcessInstanceErrorDataEvent errorEvent = new 
ProcessInstanceErrorDataEvent();
+        setBaseEventValues(errorEvent, 
ProcessInstanceErrorDataEvent.ERROR_TYPE);
+        
errorEvent.setData(ProcessInstanceErrorEventBody.create().errorMessage(ERROR_MESSAGE).eventDate(toDate(TIME)).eventUser(SUBJECT)
+                
.processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+                .nodeInstanceId(NODE_CONTAINER_INSTANCEID).build());
+
+        ProcessInstanceNodeDataEvent nodeEvent = new 
ProcessInstanceNodeDataEvent();
+        setBaseEventValues(nodeEvent, ProcessInstanceNodeDataEvent.NODE_TYPE);
+        nodeEvent
+                
.setData(ProcessInstanceNodeEventBody.create().processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+                        
.nodeInstanceId(NODE_CONTAINER_INSTANCEID).eventDate(toDate(TIME)).eventUser(SUBJECT).connectionNodeDefinitionId(NODE_CONTAINER_ID).workItemId(NODE_CONTAINER_ID)
+                        .nodeType(NODE_TYPE).nodeName(NODE_NAME)
+                        
.eventType(EVENT_TYPE).slaDueDate(toDate(TIME)).build());
+
+        ProcessInstanceSLADataEvent slaEvent = new 
ProcessInstanceSLADataEvent();
+        setBaseEventValues(slaEvent, ProcessInstanceSLADataEvent.SLA_TYPE);
+        slaEvent
+                
.setData(ProcessInstanceSLAEventBody.create().processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION).nodeDefinitionId(NODE_CONTAINER_ID)
+                        
.nodeInstanceId(NODE_CONTAINER_INSTANCEID).eventDate(toDate(TIME)).eventUser(SUBJECT)
+                        
.nodeType(NODE_TYPE).nodeName(NODE_NAME).slaDueDate(toDate(TIME)).build());
+
+        MultipleProcessInstanceDataEvent event = new 
MultipleProcessInstanceDataEvent(SOURCE, Arrays.asList(stateEvent, varEvent, 
errorEvent, nodeEvent, slaEvent));
+        if (binary) {
+            
event.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE);
+        }
+        if (compress) {
+            event.setCompressed(compress);
+        }
+
+        byte[] json = OBJECT_MAPPER.writeValueAsBytes(event);
+        logger.info("Serialized chunk size is {}", json.length);
+
+        // cloud event structured mode check
+        MultipleProcessInstanceDataEvent deserializedEvent = 
OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class);
+        
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
+        assertMultipleIntance(deserializedEvent, expectedVarValue);
+
+        // cloud event binary mode check
+        CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json, 
CloudEvent.class);
+        deserializedEvent = DataEventFactory.from(new 
MultipleProcessInstanceDataEvent(), cloudEvent, 
MultipleProcessDataInstanceConverterFactory.fromCloudEvent(cloudEvent, 
OBJECT_MAPPER));
+        
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
+        assertMultipleIntance(deserializedEvent, expectedVarValue);
+        return json.length;
+    }
+
+    private void assertMultipleIntance(MultipleProcessInstanceDataEvent 
deserializedEvent, JsonNode expectedVarValue) {
+
+        Iterator<ProcessInstanceDataEvent<? extends 
KogitoMarshallEventSupport>> iter = deserializedEvent.getData().iterator();
+        ProcessInstanceStateDataEvent deserializedStateEvent = 
(ProcessInstanceStateDataEvent) iter.next();
+        assertBaseEventValues(deserializedStateEvent, 
ProcessInstanceStateDataEvent.STATE_TYPE);
+        assertExtensionNames(deserializedStateEvent, BASE_EXTENSION_NAMES);
+        assertStateBody(deserializedStateEvent.getData());
+
+        ProcessInstanceVariableDataEvent deserializedVariableEvent = 
(ProcessInstanceVariableDataEvent) iter.next();
+        assertBaseEventValues(deserializedVariableEvent, 
ProcessInstanceVariableDataEvent.VAR_TYPE);
+        assertExtensionNames(deserializedVariableEvent, BASE_EXTENSION_NAMES, 
CloudEventExtensionConstants.KOGITO_VARIABLE_NAME);
+        
assertThat(deserializedVariableEvent.getExtension(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME)).isEqualTo(VARIABLE_NAME);
+        assertVarBody(deserializedVariableEvent.getData(), expectedVarValue);
+
+        ProcessInstanceErrorDataEvent deserializedErrorEvent = 
(ProcessInstanceErrorDataEvent) iter.next();
+        assertBaseEventValues(deserializedErrorEvent, 
ProcessInstanceErrorDataEvent.ERROR_TYPE);
+        assertExtensionNames(deserializedErrorEvent, BASE_EXTENSION_NAMES);
+        assertErrorBody(deserializedErrorEvent.getData());
+
+        ProcessInstanceNodeDataEvent deserializedNodeEvent = 
(ProcessInstanceNodeDataEvent) iter.next();
+        assertBaseEventValues(deserializedNodeEvent, 
ProcessInstanceNodeDataEvent.NODE_TYPE);
+        assertExtensionNames(deserializedNodeEvent, BASE_EXTENSION_NAMES);
+        assertNodeBody(deserializedNodeEvent.getData());
+
+        ProcessInstanceSLADataEvent deserializedSLAEvent = 
(ProcessInstanceSLADataEvent) iter.next();
+        assertBaseEventValues(deserializedSLAEvent, 
ProcessInstanceSLADataEvent.SLA_TYPE);
+        assertExtensionNames(deserializedSLAEvent, BASE_EXTENSION_NAMES);
+        assertSLABody(deserializedSLAEvent.getData());
+    }
+
+    private void assertSLABody(ProcessInstanceSLAEventBody data) {
+        assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+        
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+        assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+        assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+        
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+        assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+        assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+        assertThat(data.getSlaDueDate()).isEqualTo(toDate(TIME));
+        assertThat(data.getNodeName()).isEqualTo(NODE_NAME);
+        assertThat(data.getNodeType()).isEqualTo(NODE_TYPE);
+    }
+
+    private void assertNodeBody(ProcessInstanceNodeEventBody data) {
+        assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+        
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+        assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+        assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+        
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+        assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+        assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+        assertThat(data.getEventType()).isEqualTo(EVENT_TYPE);
+        
assertThat(data.getConnectionNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+        assertThat(data.getWorkItemId()).isEqualTo(NODE_CONTAINER_ID);
+        assertThat(data.getSlaDueDate()).isEqualTo(toDate(TIME));
+        assertThat(data.getNodeName()).isEqualTo(NODE_NAME);
+        assertThat(data.getNodeType()).isEqualTo(NODE_TYPE);
+    }
+
+    private void assertErrorBody(ProcessInstanceErrorEventBody data) {
+        assertThat(data.getNodeDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+        
assertThat(data.getNodeInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+        assertThat(data.getErrorMessage()).isEqualTo(ERROR_MESSAGE);
+        assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+        assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+        
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+        assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+        assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+    }
+
+    private static void assertVarBody(ProcessInstanceVariableEventBody data, 
JsonNode expectedVarValue) {
+        assertThat(data.getVariableId()).isEqualTo(VARIABLE_NAME);
+        assertThat(data.getVariableName()).isEqualTo(VARIABLE_NAME);
+        
assertThat(JsonObjectUtils.fromValue(data.getVariableValue())).isEqualTo(expectedVarValue);
+        
assertThat(data.getNodeContainerDefinitionId()).isEqualTo(NODE_CONTAINER_ID);
+        
assertThat(data.getNodeContainerInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID);
+        assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+        assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+        
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+        assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+        assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+    }
+
+    private static void assertStateBody(ProcessInstanceStateEventBody data) {
+        assertThat(data.getBusinessKey()).isEqualTo(BUSINESS_KEY);
+        
assertThat(data.getParentInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID);
+        assertThat(data.getRootProcessId()).isEqualTo(ROOT_PROCESS_ID);
+        assertThat(data.getProcessType()).isEqualTo(PROCESS_TYPE);
+        assertThat(data.getState()).isEqualTo(PROCESS_STATE);
+        
assertThat(data.getRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID);
+        assertThat(data.getEventType()).isEqualTo(EVENT_TYPE);
+        assertThat(data.getProcessId()).isEqualTo(PROCESS_ID);
+        assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
+        
assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION);
+        assertThat(data.getEventUser()).isEqualTo(SUBJECT);
+        assertThat(data.getEventDate()).isEqualTo(toDate(TIME));
+    }
+
     @Test
     void userTaskInstanceDataEvent() throws Exception {
         UserTaskInstanceStateDataEvent event = new 
UserTaskInstanceStateDataEvent();
@@ -136,7 +323,6 @@ class ProcessEventsTest {
         assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES,
                 CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_ID, 
CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_STATE,
                 EXTENSION_1, EXTENSION_2);
-
     }
 
     @Test
@@ -175,12 +361,11 @@ class ProcessEventsTest {
         event.setKogitoRootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID);
         event.setKogitoRootProcessId(ROOT_PROCESS_ID);
         
event.setKogitoParentProcessInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID);
-        event.setKogitoReferenceId(PROCESS_REFERENCE_ID);
         event.setKogitoProcessInstanceState(PROCESS_INSTANCE_STATE);
-        event.setKogitoStartFromNode(PROCESS_START_FROM_NODE);
         event.setKogitoBusinessKey(BUSINESS_KEY);
         event.setKogitoProcessType(PROCESS_TYPE);
         event.setKogitoAddons(ADDONS);
+        event.setKogitoIdentity(SUBJECT);
     }
 
     private static void setAdditionalExtensions(AbstractDataEvent<?> event) {
@@ -197,25 +382,25 @@ class ProcessEventsTest {
         assertThat(deserializedEvent.getSubject()).isEqualTo(SUBJECT);
         
assertThat(deserializedEvent.getDataContentType()).isEqualTo(DATA_CONTENT_TYPE);
         assertThat(deserializedEvent.getDataSchema()).isEqualTo(DATA_SCHEMA);
-
         
assertThat(deserializedEvent.getKogitoProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID);
         
assertThat(deserializedEvent.getKogitoProcessId()).isEqualTo(PROCESS_ID);
         
assertThat(deserializedEvent.getKogitoRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID);
         
assertThat(deserializedEvent.getKogitoRootProcessId()).isEqualTo(ROOT_PROCESS_ID);
         
assertThat(deserializedEvent.getKogitoParentProcessInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID);
-        
assertThat(deserializedEvent.getKogitoReferenceId()).isEqualTo(PROCESS_REFERENCE_ID);
         
assertThat(deserializedEvent.getKogitoProcessInstanceState()).isEqualTo(PROCESS_INSTANCE_STATE);
-        
assertThat(deserializedEvent.getKogitoStartFromNode()).isEqualTo(PROCESS_START_FROM_NODE);
         
assertThat(deserializedEvent.getKogitoBusinessKey()).isEqualTo(BUSINESS_KEY);
         
assertThat(deserializedEvent.getKogitoProcessType()).isEqualTo(PROCESS_TYPE);
+        assertThat(deserializedEvent.getKogitoIdentity()).isEqualTo(SUBJECT);
         assertThat(deserializedEvent.getKogitoAddons()).isEqualTo(ADDONS);
     }
 
     private static void assertExtensionNames(AbstractDataEvent<?> event, 
Set<String> baseNames, String... names) {
         Set<String> extensionNames = event.getExtensionNames();
         assertThat(extensionNames).hasSize(baseNames.size() + names.length)
-                .containsAll(baseNames)
-                .contains(names);
+                .containsAll(baseNames);
+        if (names.length > 0) {
+            assertThat(extensionNames).contains(names);
+        }
     }
 
     private static void assertExtensionsNotDuplicated(String json, Set<String> 
extensionNames) {
diff --git 
a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
 
b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
index 050097b1d1..c670e699f9 100644
--- 
a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
+++ 
b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java
@@ -43,6 +43,6 @@ public class GlobalObjectMapper implements 
ObjectMapperCustomizer {
             mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
         }
         mapper.setDateFormat(new 
StdDateFormat().withColonInTimeZone(true).withTimeZone(TimeZone.getDefault()));
-        mapper.registerModule(new 
JavaTimeModule()).registerModule(JsonFormat.getCloudEventJacksonModule());
+        
mapper.registerModule(JsonFormat.getCloudEventJacksonModule()).findAndRegisterModules();
     }
 }
\ No newline at end of file
diff --git 
a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
 
b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
index 66e007d039..ee30e0dc36 100644
--- 
a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
+++ 
b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
@@ -26,7 +26,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.eclipse.microprofile.config.inject.ConfigProperty;
 import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.KogitoMarshallEventSupport;
 import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
 import org.kie.kogito.event.process.ProcessInstanceDataEvent;
 import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
@@ -45,6 +47,12 @@ public class GroupingMessagingEventPublisher extends 
AbstractMessagingEventPubli
         publish(Collections.singletonList(event));
     }
 
+    @ConfigProperty(name = "kogito.events.grouping.binary", defaultValue = 
"false")
+    private boolean binary;
+
+    @ConfigProperty(name = "kogito.events.grouping.compress", defaultValue = 
"false")
+    private boolean compress;
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void publish(Collection<DataEvent<?>> events) {
@@ -62,7 +70,12 @@ public class GroupingMessagingEventPublisher extends 
AbstractMessagingEventPubli
         if (firstEvent instanceof UserTaskInstanceDataEvent) {
             publishToTopic(entry.getKey(), new 
MultipleUserTaskInstanceDataEvent(source, 
(Collection<UserTaskInstanceDataEvent<?>>) entry.getValue()));
         } else if (firstEvent instanceof ProcessInstanceDataEvent) {
-            publishToTopic(entry.getKey(), new 
MultipleProcessInstanceDataEvent(source, 
(Collection<ProcessInstanceDataEvent<?>>) entry.getValue()));
+            MultipleProcessInstanceDataEvent sent = new 
MultipleProcessInstanceDataEvent(source, (Collection<ProcessInstanceDataEvent<? 
extends KogitoMarshallEventSupport>>) entry.getValue());
+            if (binary) {
+                
sent.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE);
+                sent.setCompressed(compress);
+            }
+            publishToTopic(entry.getKey(), sent);
         } else {
             for (DataEvent<?> event : (Collection<DataEvent<?>>) 
entry.getValue()) {
                 publishToTopic(entry.getKey(), event);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to