This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d6af3b8e986 IGNITE-27735 Use MessageSerializer for
GridEventStorageMessage (#12690)
d6af3b8e986 is described below
commit d6af3b8e9862e557a3e137acabb1aa76db1956ae
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Feb 11 16:54:50 2026 +0500
IGNITE-27735 Use MessageSerializer for GridEventStorageMessage (#12690)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../eventstorage/GridEventStorageManager.java | 28 +-
.../eventstorage/GridEventStorageMessage.java | 316 ++++++++-------------
3 files changed, 122 insertions(+), 225 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f82ec84023..c0317adec9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -91,6 +91,7 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxFinishRequestSerializ
import
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
+import org.apache.ignite.internal.codegen.GridEventStorageMessageSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
@@ -367,7 +368,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)10, GridDeploymentInfoBean::new);
factory.register((short)11, GridDeploymentRequest::new);
factory.register((short)12, GridDeploymentResponse::new, new
GridDeploymentResponseSerializer());
- factory.register((short)13, GridEventStorageMessage::new);
+ factory.register((short)13, GridEventStorageMessage::new, new
GridEventStorageMessageSerializer());
factory.register((short)16, GridCacheTxRecoveryRequest::new, new
GridCacheTxRecoveryRequestSerializer());
factory.register((short)17, GridCacheTxRecoveryResponse::new, new
GridCacheTxRecoveryResponseSerializer());
factory.register((short)18, IndexQueryResultMeta::new, new
IndexQueryResultMetaSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index afedf59bf3f..6783bb13995 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1024,13 +1024,7 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
GridEventStorageMessage res = (GridEventStorageMessage)msg;
try {
- if (res.eventsBytes() != null)
- res.events(U.<Collection<Event>>unmarshal(marsh,
res.eventsBytes(),
- U.resolveClassLoader(ctx.config())));
-
- if (res.exceptionBytes() != null)
- res.exception(U.<Throwable>unmarshal(marsh,
res.exceptionBytes(),
- U.resolveClassLoader(ctx.config())));
+ res.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()), null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal events query response: "
+ msg, e);
@@ -1066,8 +1060,6 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
ioMgr.addMessageListener(resTopic, resLsnr);
- byte[] serFilter = U.marshal(marsh, p);
-
GridDeployment dep = ctx.deploy().deploy(p.getClass(),
U.detectClassLoader(p.getClass()));
if (dep == null)
@@ -1075,8 +1067,7 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
GridEventStorageMessage msg = new GridEventStorageMessage(
resTopic,
- serFilter,
- p.getClass().getName(),
+ p,
dep.classLoaderId(),
dep.deployMode(),
dep.userVersion(),
@@ -1154,7 +1145,7 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
ctx.io().sendToGridTopic(locNode, topic, msg, plc);
if (!rmtNodes.isEmpty()) {
- msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
+ msg.prepareMarshal(marsh);
ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
}
@@ -1209,9 +1200,6 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
Collection<Event> evts;
try {
- if (req.responseTopicBytes() != null)
- req.responseTopic(U.unmarshal(marsh,
req.responseTopicBytes(), U.resolveClassLoader(ctx.config())));
-
GridDeployment dep = ctx.deploy().getGlobalDeployment(
req.deploymentMode(),
req.filterClassName(),
@@ -1226,7 +1214,9 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
throw new IgniteDeploymentCheckedException("Failed to
obtain deployment for event filter " +
"(is peer class loading turned on?): " + req);
- filter = U.unmarshal(marsh, req.filter(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ req.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+
+ filter = (IgnitePredicate<Event>)req.filter();
// Resource injection.
ctx.resource().inject(dep,
dep.deployedClass(req.filterClassName()).get1(), filter);
@@ -1260,10 +1250,8 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
if (log.isDebugEnabled())
log.debug("Sending event query response to node
[nodeId=" + nodeId + "res=" + res + ']');
- if (!ctx.localNodeId().equals(nodeId)) {
- res.eventsBytes(U.marshal(marsh, res.events()));
- res.exceptionBytes(U.marshal(marsh, res.exception()));
- }
+ if (!ctx.localNodeId().equals(nodeId))
+ res.prepareMarshal(marsh);
ctx.io().sendToCustomTopic(node, req.responseTopic(), res,
PUBLIC_POOL);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
index 937f4381a9d..6bcf91275e7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
@@ -17,22 +17,22 @@
package org.apache.ignite.internal.managers.eventstorage;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -40,44 +40,49 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridEventStorageMessage implements Message {
/** */
- @GridDirectTransient
private Object resTopic;
/** */
+ @Order(value = 0, method = "responseTopicBytes")
private byte[] resTopicBytes;
/** */
- private byte[] filter;
+ private IgnitePredicate<?> filter;
/** */
- @GridDirectTransient
- private Collection<Event> evts;
+ @Order(value = 1, method = "filterBytes")
+ private byte[] filterBytes;
/** */
- private byte[] evtsBytes;
+ private Collection<Event> evts;
/** */
- @GridDirectTransient
- private Throwable ex;
+ @Order(value = 2, method = "eventsBytes")
+ private byte[] evtsBytes;
/** */
- private byte[] exBytes;
+ @Order(value = 3, method = "errorMessage")
+ private ErrorMessage errMsg;
/** */
+ @Order(value = 4, method = "classLoaderId")
private IgniteUuid clsLdrId;
/** */
+ @Order(value = 5, method = "deploymentMode")
private DeploymentMode depMode;
/** */
+ @Order(value = 6, method = "filterClassName")
private String filterClsName;
/** */
+ @Order(value = 7, method = "userVersion")
private String userVer;
/** Node class loader participants. */
@GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ @Order(value = 8, method = "loaderParticipants")
private Map<UUID, IgniteUuid> ldrParties;
/** */
@@ -88,7 +93,6 @@ public class GridEventStorageMessage implements Message {
/**
* @param resTopic Response topic,
* @param filter Query filter.
- * @param filterClsName Filter class name.
* @param clsLdrId Class loader ID.
* @param depMode Deployment mode.
* @param userVer User version.
@@ -96,22 +100,21 @@ public class GridEventStorageMessage implements Message {
*/
GridEventStorageMessage(
Object resTopic,
- byte[] filter,
- String filterClsName,
+ IgnitePredicate<?> filter,
IgniteUuid clsLdrId,
DeploymentMode depMode,
String userVer,
Map<UUID, IgniteUuid> ldrParties) {
this.resTopic = resTopic;
this.filter = filter;
- this.filterClsName = filterClsName;
+ filterClsName = filter.getClass().getName();
this.depMode = depMode;
this.clsLdrId = clsLdrId;
this.userVer = userVer;
this.ldrParties = ldrParties;
evts = null;
- ex = null;
+ errMsg = null;
}
/**
@@ -120,7 +123,9 @@ public class GridEventStorageMessage implements Message {
*/
GridEventStorageMessage(Collection<Event> evts, Throwable ex) {
this.evts = evts;
- this.ex = ex;
+
+ if (ex != null)
+ errMsg = new ErrorMessage(ex);
resTopic = null;
filter = null;
@@ -137,287 +142,190 @@ public class GridEventStorageMessage implements Message
{
return resTopic;
}
- /**
- * @param resTopic Response topic.
- */
- void responseTopic(Object resTopic) {
- this.resTopic = resTopic;
- }
-
/**
* @return Serialized response topic.
*/
- byte[] responseTopicBytes() {
+ public byte[] responseTopicBytes() {
return resTopicBytes;
}
/**
* @param resTopicBytes Serialized response topic.
*/
- void responseTopicBytes(byte[] resTopicBytes) {
+ public void responseTopicBytes(byte[] resTopicBytes) {
this.resTopicBytes = resTopicBytes;
}
/**
* @return Filter.
*/
- byte[] filter() {
- return filter;
+ public byte[] filterBytes() {
+ return filterBytes;
}
/**
- * @return Events.
+ * @param filterBytes Filter bytes.
*/
- @Nullable Collection<Event> events() {
- return evts != null ? Collections.unmodifiableCollection(evts) : null;
+ public void filterBytes(byte[] filterBytes) {
+ this.filterBytes = filterBytes;
}
/**
- * @param evts Events.
+ * @return Filter.
*/
- void events(@Nullable Collection<Event> evts) {
- this.evts = evts;
+ public IgnitePredicate<?> filter() {
+ return filter;
+ }
+
+ /**
+ * @return Events.
+ */
+ @Nullable Collection<Event> events() {
+ return evts != null ? Collections.unmodifiableCollection(evts) : null;
}
/**
* @return Serialized events.
*/
- byte[] eventsBytes() {
+ public byte[] eventsBytes() {
return evtsBytes;
}
/**
* @param evtsBytes Serialized events.
*/
- void eventsBytes(byte[] evtsBytes) {
+ public void eventsBytes(byte[] evtsBytes) {
this.evtsBytes = evtsBytes;
}
/**
* @return the Class loader ID.
*/
- IgniteUuid classLoaderId() {
+ public IgniteUuid classLoaderId() {
return clsLdrId;
}
+ /**
+ * @param clsLdrId the Class loader ID.
+ */
+ public void classLoaderId(IgniteUuid clsLdrId) {
+ this.clsLdrId = clsLdrId;
+ }
+
/**
* @return Deployment mode.
*/
- DeploymentMode deploymentMode() {
+ public DeploymentMode deploymentMode() {
return depMode;
}
+ /**
+ * @param depMode Deployment mode.
+ */
+ public void deploymentMode(DeploymentMode depMode) {
+ this.depMode = depMode;
+ }
+
/**
* @return Filter class name.
*/
- String filterClassName() {
+ public String filterClassName() {
return filterClsName;
}
+ /**
+ * @param filterClsName Filter class name.
+ */
+ public void filterClassName(String filterClsName) {
+ this.filterClsName = filterClsName;
+ }
+
/**
* @return User version.
*/
- String userVersion() {
+ public String userVersion() {
return userVer;
}
+ /**
+ * @param userVer User version.
+ */
+ public void userVersion(String userVer) {
+ this.userVer = userVer;
+ }
+
/**
* @return Node class loader participant map.
*/
- @Nullable Map<UUID, IgniteUuid> loaderParticipants() {
+ public @Nullable Map<UUID, IgniteUuid> loaderParticipants() {
return ldrParties != null ? Collections.unmodifiableMap(ldrParties) :
null;
}
/**
* @param ldrParties Node class loader participant map.
*/
- void loaderParticipants(Map<UUID, IgniteUuid> ldrParties) {
+ public void loaderParticipants(@Nullable Map<UUID, IgniteUuid> ldrParties)
{
this.ldrParties = ldrParties;
}
/**
* @return Exception.
*/
- Throwable exception() {
- return ex;
+ @Nullable Throwable exception() {
+ return ErrorMessage.error(errMsg);
}
/**
- * @param ex Exception.
+ * @return Error message.
*/
- void exception(Throwable ex) {
- this.ex = ex;
+ public @Nullable ErrorMessage errorMessage() {
+ return errMsg;
}
/**
- * @return Serialized exception.
+ * @param errMsg Error message.
*/
- byte[] exceptionBytes() {
- return exBytes;
+ public void errorMessage(@Nullable ErrorMessage errMsg) {
+ this.errMsg = errMsg;
}
/**
- * @param exBytes Serialized exception.
+ * @param marsh Marshaller.
*/
- void exceptionBytes(byte[] exBytes) {
- this.exBytes = exBytes;
- }
+ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
+ if (resTopic != null && resTopicBytes == null)
+ resTopicBytes = U.marshal(marsh, resTopic);
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
+ if (filter != null && filterBytes == null)
+ filterBytes = U.marshal(marsh, filter);
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeIgniteUuid(clsLdrId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByte(depMode != null ?
(byte)depMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByteArray(evtsBytes))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeByteArray(exBytes))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeByteArray(filter))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeString(filterClsName))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMap(ldrParties,
MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeByteArray(resTopicBytes))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeString(userVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ if (evts != null && evtsBytes == null)
+ evtsBytes = U.marshal(marsh, evts);
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- clsLdrId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- byte depModeOrd;
-
- depModeOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
- reader.incrementState();
-
- case 2:
- evtsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- exBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- filter = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- filterClsName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- ldrParties = reader.readMap(MessageCollectionItemType.UUID,
MessageCollectionItemType.IGNITE_UUID, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- resTopicBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
+ /**
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ * @param filterClsLdr Class loader for filter.
+ */
+ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader
filterClsLdr) throws IgniteCheckedException {
+ if (resTopicBytes != null && resTopic == null) {
+ resTopic = U.unmarshal(marsh, resTopicBytes, ldr);
- reader.incrementState();
+ resTopicBytes = null;
+ }
- case 8:
- userVer = reader.readString();
+ if (filterBytes != null && filter == null && filterClsLdr != null) {
+ filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
- if (!reader.isLastRead())
- return false;
+ filterBytes = null;
+ }
- reader.incrementState();
+ if (evtsBytes != null && evts == null) {
+ evts = U.unmarshal(marsh, evtsBytes, ldr);
+ evtsBytes = null;
}
-
- return true;
}
/** {@inheritDoc} */