This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d6af3b8e986 IGNITE-27735 Use MessageSerializer for 
GridEventStorageMessage (#12690)
d6af3b8e986 is described below

commit d6af3b8e9862e557a3e137acabb1aa76db1956ae
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Feb 11 16:54:50 2026 +0500

    IGNITE-27735 Use MessageSerializer for GridEventStorageMessage (#12690)
---
 .../communication/GridIoMessageFactory.java        |   3 +-
 .../eventstorage/GridEventStorageManager.java      |  28 +-
 .../eventstorage/GridEventStorageMessage.java      | 316 ++++++++-------------
 3 files changed, 122 insertions(+), 225 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f82ec84023..c0317adec9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -91,6 +91,7 @@ import 
org.apache.ignite.internal.codegen.GridDistributedTxFinishRequestSerializ
 import 
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSerializer;
 import 
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
 import 
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
+import org.apache.ignite.internal.codegen.GridEventStorageMessageSerializer;
 import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
 import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer;
 import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
@@ -367,7 +368,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)10, GridDeploymentInfoBean::new);
         factory.register((short)11, GridDeploymentRequest::new);
         factory.register((short)12, GridDeploymentResponse::new, new 
GridDeploymentResponseSerializer());
-        factory.register((short)13, GridEventStorageMessage::new);
+        factory.register((short)13, GridEventStorageMessage::new, new 
GridEventStorageMessageSerializer());
         factory.register((short)16, GridCacheTxRecoveryRequest::new, new 
GridCacheTxRecoveryRequestSerializer());
         factory.register((short)17, GridCacheTxRecoveryResponse::new, new 
GridCacheTxRecoveryResponseSerializer());
         factory.register((short)18, IndexQueryResultMeta::new, new 
IndexQueryResultMetaSerializer());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index afedf59bf3f..6783bb13995 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1024,13 +1024,7 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                 GridEventStorageMessage res = (GridEventStorageMessage)msg;
 
                 try {
-                    if (res.eventsBytes() != null)
-                        res.events(U.<Collection<Event>>unmarshal(marsh, 
res.eventsBytes(),
-                            U.resolveClassLoader(ctx.config())));
-
-                    if (res.exceptionBytes() != null)
-                        res.exception(U.<Throwable>unmarshal(marsh, 
res.exceptionBytes(),
-                            U.resolveClassLoader(ctx.config())));
+                    res.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()), null);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to unmarshal events query response: " 
+ msg, e);
@@ -1066,8 +1060,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
 
             ioMgr.addMessageListener(resTopic, resLsnr);
 
-            byte[] serFilter = U.marshal(marsh, p);
-
             GridDeployment dep = ctx.deploy().deploy(p.getClass(), 
U.detectClassLoader(p.getClass()));
 
             if (dep == null)
@@ -1075,8 +1067,7 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
 
             GridEventStorageMessage msg = new GridEventStorageMessage(
                 resTopic,
-                serFilter,
-                p.getClass().getName(),
+                p,
                 dep.classLoaderId(),
                 dep.deployMode(),
                 dep.userVersion(),
@@ -1154,7 +1145,7 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
             ctx.io().sendToGridTopic(locNode, topic, msg, plc);
 
         if (!rmtNodes.isEmpty()) {
-            msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
+            msg.prepareMarshal(marsh);
 
             ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
         }
@@ -1209,9 +1200,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                 Collection<Event> evts;
 
                 try {
-                    if (req.responseTopicBytes() != null)
-                        req.responseTopic(U.unmarshal(marsh, 
req.responseTopicBytes(), U.resolveClassLoader(ctx.config())));
-
                     GridDeployment dep = ctx.deploy().getGlobalDeployment(
                         req.deploymentMode(),
                         req.filterClassName(),
@@ -1226,7 +1214,9 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                         throw new IgniteDeploymentCheckedException("Failed to 
obtain deployment for event filter " +
                             "(is peer class loading turned on?): " + req);
 
-                    filter = U.unmarshal(marsh, req.filter(), 
U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                    req.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
+
+                    filter = (IgnitePredicate<Event>)req.filter();
 
                     // Resource injection.
                     ctx.resource().inject(dep, 
dep.deployedClass(req.filterClassName()).get1(), filter);
@@ -1260,10 +1250,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                     if (log.isDebugEnabled())
                         log.debug("Sending event query response to node 
[nodeId=" + nodeId + "res=" + res + ']');
 
-                    if (!ctx.localNodeId().equals(nodeId)) {
-                        res.eventsBytes(U.marshal(marsh, res.events()));
-                        res.exceptionBytes(U.marshal(marsh, res.exception()));
-                    }
+                    if (!ctx.localNodeId().equals(nodeId))
+                        res.prepareMarshal(marsh);
 
                     ctx.io().sendToCustomTopic(node, req.responseTopic(), res, 
PUBLIC_POOL);
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
index 937f4381a9d..6bcf91275e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
@@ -17,22 +17,22 @@
 
 package org.apache.ignite.internal.managers.eventstorage;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -40,44 +40,49 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridEventStorageMessage implements Message {
     /** */
-    @GridDirectTransient
     private Object resTopic;
 
     /** */
+    @Order(value = 0, method = "responseTopicBytes")
     private byte[] resTopicBytes;
 
     /** */
-    private byte[] filter;
+    private IgnitePredicate<?> filter;
 
     /** */
-    @GridDirectTransient
-    private Collection<Event> evts;
+    @Order(value = 1, method = "filterBytes")
+    private byte[] filterBytes;
 
     /** */
-    private byte[] evtsBytes;
+    private Collection<Event> evts;
 
     /** */
-    @GridDirectTransient
-    private Throwable ex;
+    @Order(value = 2, method = "eventsBytes")
+    private byte[] evtsBytes;
 
     /** */
-    private byte[] exBytes;
+    @Order(value = 3, method = "errorMessage")
+    private ErrorMessage errMsg;
 
     /** */
+    @Order(value = 4, method = "classLoaderId")
     private IgniteUuid clsLdrId;
 
     /** */
+    @Order(value = 5, method = "deploymentMode")
     private DeploymentMode depMode;
 
     /** */
+    @Order(value = 6, method = "filterClassName")
     private String filterClsName;
 
     /** */
+    @Order(value = 7, method = "userVersion")
     private String userVer;
 
     /** Node class loader participants. */
     @GridToStringInclude
-    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    @Order(value = 8, method = "loaderParticipants")
     private Map<UUID, IgniteUuid> ldrParties;
 
     /** */
@@ -88,7 +93,6 @@ public class GridEventStorageMessage implements Message {
     /**
      * @param resTopic Response topic,
      * @param filter Query filter.
-     * @param filterClsName Filter class name.
      * @param clsLdrId Class loader ID.
      * @param depMode Deployment mode.
      * @param userVer User version.
@@ -96,22 +100,21 @@ public class GridEventStorageMessage implements Message {
      */
     GridEventStorageMessage(
         Object resTopic,
-        byte[] filter,
-        String filterClsName,
+        IgnitePredicate<?> filter,
         IgniteUuid clsLdrId,
         DeploymentMode depMode,
         String userVer,
         Map<UUID, IgniteUuid> ldrParties) {
         this.resTopic = resTopic;
         this.filter = filter;
-        this.filterClsName = filterClsName;
+        filterClsName = filter.getClass().getName();
         this.depMode = depMode;
         this.clsLdrId = clsLdrId;
         this.userVer = userVer;
         this.ldrParties = ldrParties;
 
         evts = null;
-        ex = null;
+        errMsg = null;
     }
 
     /**
@@ -120,7 +123,9 @@ public class GridEventStorageMessage implements Message {
      */
     GridEventStorageMessage(Collection<Event> evts, Throwable ex) {
         this.evts = evts;
-        this.ex = ex;
+
+        if (ex != null)
+            errMsg = new ErrorMessage(ex);
 
         resTopic = null;
         filter = null;
@@ -137,287 +142,190 @@ public class GridEventStorageMessage implements Message 
{
         return resTopic;
     }
 
-    /**
-     * @param resTopic Response topic.
-     */
-    void responseTopic(Object resTopic) {
-        this.resTopic = resTopic;
-    }
-
     /**
      * @return Serialized response topic.
      */
-    byte[] responseTopicBytes() {
+    public byte[] responseTopicBytes() {
         return resTopicBytes;
     }
 
     /**
      * @param resTopicBytes Serialized response topic.
      */
-    void responseTopicBytes(byte[] resTopicBytes) {
+    public void responseTopicBytes(byte[] resTopicBytes) {
         this.resTopicBytes = resTopicBytes;
     }
 
     /**
      * @return Filter.
      */
-    byte[] filter() {
-        return filter;
+    public byte[] filterBytes() {
+        return filterBytes;
     }
 
     /**
-     * @return Events.
+     * @param filterBytes Filter bytes.
      */
-    @Nullable Collection<Event> events() {
-        return evts != null ? Collections.unmodifiableCollection(evts) : null;
+    public void filterBytes(byte[] filterBytes) {
+        this.filterBytes = filterBytes;
     }
 
     /**
-     * @param evts Events.
+     * @return Filter.
      */
-    void events(@Nullable Collection<Event> evts) {
-        this.evts = evts;
+    public IgnitePredicate<?> filter() {
+        return filter;
+    }
+
+    /**
+     * @return Events.
+     */
+    @Nullable Collection<Event> events() {
+        return evts != null ? Collections.unmodifiableCollection(evts) : null;
     }
 
     /**
      * @return Serialized events.
      */
-    byte[] eventsBytes() {
+    public byte[] eventsBytes() {
         return evtsBytes;
     }
 
     /**
      * @param evtsBytes Serialized events.
      */
-    void eventsBytes(byte[] evtsBytes) {
+    public void eventsBytes(byte[] evtsBytes) {
         this.evtsBytes = evtsBytes;
     }
 
     /**
      * @return the Class loader ID.
      */
-    IgniteUuid classLoaderId() {
+    public IgniteUuid classLoaderId() {
         return clsLdrId;
     }
 
+    /**
+     * @param clsLdrId the Class loader ID.
+     */
+    public void classLoaderId(IgniteUuid clsLdrId) {
+        this.clsLdrId = clsLdrId;
+    }
+
     /**
      * @return Deployment mode.
      */
-    DeploymentMode deploymentMode() {
+    public DeploymentMode deploymentMode() {
         return depMode;
     }
 
+    /**
+     * @param depMode Deployment mode.
+     */
+    public void deploymentMode(DeploymentMode depMode) {
+        this.depMode = depMode;
+    }
+
     /**
      * @return Filter class name.
      */
-    String filterClassName() {
+    public String filterClassName() {
         return filterClsName;
     }
 
+    /**
+     * @param filterClsName Filter class name.
+     */
+    public void filterClassName(String filterClsName) {
+        this.filterClsName = filterClsName;
+    }
+
     /**
      * @return User version.
      */
-    String userVersion() {
+    public String userVersion() {
         return userVer;
     }
 
+    /**
+     * @param userVer User version.
+     */
+    public void userVersion(String userVer) {
+        this.userVer = userVer;
+    }
+
     /**
      * @return Node class loader participant map.
      */
-    @Nullable Map<UUID, IgniteUuid> loaderParticipants() {
+    public @Nullable Map<UUID, IgniteUuid> loaderParticipants() {
         return ldrParties != null ? Collections.unmodifiableMap(ldrParties) : 
null;
     }
 
     /**
      * @param ldrParties Node class loader participant map.
      */
-    void loaderParticipants(Map<UUID, IgniteUuid> ldrParties) {
+    public void loaderParticipants(@Nullable Map<UUID, IgniteUuid> ldrParties) 
{
         this.ldrParties = ldrParties;
     }
 
     /**
      * @return Exception.
      */
-    Throwable exception() {
-        return ex;
+    @Nullable Throwable exception() {
+        return ErrorMessage.error(errMsg);
     }
 
     /**
-     * @param ex Exception.
+     * @return Error message.
      */
-    void exception(Throwable ex) {
-        this.ex = ex;
+    public @Nullable ErrorMessage errorMessage() {
+        return errMsg;
     }
 
     /**
-     * @return Serialized exception.
+     * @param errMsg Error message.
      */
-    byte[] exceptionBytes() {
-        return exBytes;
+    public void errorMessage(@Nullable ErrorMessage errMsg) {
+        this.errMsg = errMsg;
     }
 
     /**
-     * @param exBytes Serialized exception.
+     * @param marsh Marshaller.
      */
-    void exceptionBytes(byte[] exBytes) {
-        this.exBytes = exBytes;
-    }
+    public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException 
{
+        if (resTopic != null && resTopicBytes == null)
+            resTopicBytes = U.marshal(marsh, resTopic);
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
+        if (filter != null && filterBytes == null)
+            filterBytes = U.marshal(marsh, filter);
 
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeIgniteUuid(clsLdrId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeByte(depMode != null ? 
(byte)depMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeByteArray(evtsBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeByteArray(exBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeByteArray(filter))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeString(filterClsName))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMap(ldrParties, 
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeByteArray(resTopicBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeString(userVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
+        if (evts != null && evtsBytes == null)
+            evtsBytes = U.marshal(marsh, evts);
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                clsLdrId = reader.readIgniteUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                byte depModeOrd;
-
-                depModeOrd = reader.readByte();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
-                reader.incrementState();
-
-            case 2:
-                evtsBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                exBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                filter = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                filterClsName = reader.readString();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                ldrParties = reader.readMap(MessageCollectionItemType.UUID, 
MessageCollectionItemType.IGNITE_UUID, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                resTopicBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @param filterClsLdr Class loader for filter.
+     */
+    public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader 
filterClsLdr) throws IgniteCheckedException {
+        if (resTopicBytes != null && resTopic == null) {
+            resTopic = U.unmarshal(marsh, resTopicBytes, ldr);
 
-                reader.incrementState();
+            resTopicBytes = null;
+        }
 
-            case 8:
-                userVer = reader.readString();
+        if (filterBytes != null && filter == null && filterClsLdr != null) {
+            filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
 
-                if (!reader.isLastRead())
-                    return false;
+            filterBytes = null;
+        }
 
-                reader.incrementState();
+        if (evtsBytes != null && evts == null) {
+            evts = U.unmarshal(marsh, evtsBytes, ldr);
 
+            evtsBytes = null;
         }
-
-        return true;
     }
 
     /** {@inheritDoc} */

Reply via email to