This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 737c550270f IGNITE-28286 : Use MarshallableMessage for GridIoMessage
and GridEventStorageMessage v2 (#12891)
737c550270f is described below
commit 737c550270f62ee791423a81aa433f3ad2d4cd9d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Mar 19 20:38:57 2026 +0300
IGNITE-28286 : Use MarshallableMessage for GridIoMessage and
GridEventStorageMessage v2 (#12891)
---
.../managers/communication/GridIoManager.java | 24 ------------
.../managers/communication/GridIoMessage.java | 23 ++++++-----
.../communication/GridIoMessageFactory.java | 11 ++++--
.../deployment/GridDeploymentCommunication.java | 6 ---
.../eventstorage/GridEventStorageManager.java | 19 +---------
.../eventstorage/GridEventStorageMessage.java | 44 +++++++++++-----------
.../ignite/p2p/GridP2PHotRedeploymentSelfTest.java | 16 --------
7 files changed, 42 insertions(+), 101 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 446546b8962..7a164d1b6d5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1199,15 +1199,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
return;
}
- if (initMsg.topic() == null) {
- int topicOrd = initMsg.topicOrdinal();
-
- if (topicOrd >= 0)
- initMsg.topic(GridTopic.fromOrdinal(topicOrd));
- else
- initMsg.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()));
- }
-
byte plc = initMsg.policy();
pools.poolForPolicy(plc).execute(new Runnable() {
@@ -1248,15 +1239,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
return;
}
- if (msg.topic() == null) {
- int topicOrd = msg.topicOrdinal();
-
- if (topicOrd >= 0)
- msg.topic(GridTopic.fromOrdinal(topicOrd));
- else
- msg.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()));
- }
-
if (!started) {
lock.readLock().lock();
@@ -1980,9 +1962,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
);
try {
- if (topicOrd < 0)
- ioMsg.prepareMarshal(marsh);
-
return
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
}
catch (IgniteSpiException e) {
@@ -2054,9 +2033,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
ackC.apply(null);
}
else {
- if (topicOrd < 0)
- ioMsg.prepareMarshal(marsh);
-
try {
if ((CommunicationSpi<?>)getSpi() instanceof
TcpCommunicationSpi)
getTcpCommunicationSpi().sendMessage(node, ioMsg,
ackC);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b42b8df962a..906df7c9ff6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.communication;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.ExecutorAwareMessage;
+import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -27,13 +28,14 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Wrapper for all grid messages.
*/
-public class GridIoMessage implements Message, SpanTransport {
+public class GridIoMessage implements MarshallableMessage, SpanTransport {
/** */
public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE;
@@ -216,24 +218,21 @@ public class GridIoMessage implements Message,
SpanTransport {
return null;
}
- /**
- * @param marsh Marshaller.
- */
- public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
- if (topic != null && topicBytes == null)
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (topicOrd < 0 && topic != null)
topicBytes = U.marshal(marsh, topic);
}
- /**
- * @param marsh Marshaller.
- * @param ldr Class loader.
- */
- public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
- if (topicBytes != null && topic == null) {
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr)
throws IgniteCheckedException {
+ if (topicOrd < 0 && topicBytes != null) {
topic = U.unmarshal(marsh, topicBytes, ldr);
topicBytes = null;
}
+ else if (topicOrd >= 0)
+ topic = GridTopic.fromOrdinal(topicOrd);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a3379034634..65480e0b48e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,7 +62,7 @@ import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyReque
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponseSerializer;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
-import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageSerializer;
+import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageMarshallableSerializer;
import
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
import
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessageSerializer;
import
org.apache.ignite.internal.processors.authentication.UserAuthenticateResponseMessage;
@@ -405,12 +405,14 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)5, GridTaskCancelRequest::new, new
GridTaskCancelRequestSerializer());
factory.register((short)6, GridTaskSessionRequest::new, new
GridTaskSessionRequestSerializer());
factory.register((short)7, GridCheckpointRequest::new, new
GridCheckpointRequestSerializer());
- factory.register((short)8, GridIoMessage::new, new
GridIoMessageSerializer());
+ factory.register((short)8, GridIoMessage::new,
+ new GridIoMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
factory.register((short)9, GridIoUserMessage::new, new
GridIoUserMessageSerializer());
factory.register((short)10, GridDeploymentInfoBean::new, new
GridDeploymentInfoBeanSerializer());
factory.register((short)11, GridDeploymentRequest::new, new
GridDeploymentRequestSerializer());
factory.register((short)12, GridDeploymentResponse::new, new
GridDeploymentResponseSerializer());
- factory.register((short)13, GridEventStorageMessage::new, new
GridEventStorageMessageSerializer());
+ factory.register((short)13, GridEventStorageMessage::new,
+ new GridEventStorageMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
factory.register((short)16, GridCacheTxRecoveryRequest::new, new
GridCacheTxRecoveryRequestSerializer());
factory.register((short)17, GridCacheTxRecoveryResponse::new, new
GridCacheTxRecoveryResponseSerializer());
factory.register((short)18, IndexQueryResultMeta::new, new
IndexQueryResultMetaSerializer());
@@ -517,7 +519,8 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)169, ServiceSingleNodeDeploymentResult::new,
new ServiceSingleNodeDeploymentResultSerializer());
factory.register(GridQueryKillRequest.TYPE_CODE,
GridQueryKillRequest::new, new GridQueryKillRequestSerializer());
factory.register(GridQueryKillResponse.TYPE_CODE,
GridQueryKillResponse::new, new GridQueryKillResponseSerializer());
- factory.register(GridIoSecurityAwareMessage.TYPE_CODE,
GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer());
+ factory.register(GridIoSecurityAwareMessage.TYPE_CODE,
GridIoSecurityAwareMessage::new,
+ new
GridIoSecurityAwareMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
factory.register(SessionChannelMessage.TYPE_CODE,
SessionChannelMessage::new, new SessionChannelMessageSerializer());
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new,
new TcpInverseConnectionResponseMessageSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index c83d39e4bf9..f6c8b0c6117 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteNotPeerDeployable;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -73,9 +72,6 @@ class GridDeploymentCommunication {
/** */
private final GridBusyLock busyLock = new GridBusyLock();
- /** */
- private final Marshaller marsh;
-
/**
* Creates new instance of deployment communication.
*
@@ -93,8 +89,6 @@ class GridDeploymentCommunication {
processDeploymentRequest(nodeId, msg);
}
};
-
- marsh = ctx.marshaller();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index fcfa3e5b37d..38a773de2bb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1025,15 +1025,6 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
GridEventStorageMessage res = (GridEventStorageMessage)msg;
- try {
- res.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal events query response: "
+ msg, e);
-
- return;
- }
-
synchronized (qryMux) {
if (uids.remove(nodeId)) {
if (res.events() != null)
@@ -1146,11 +1137,8 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
if (locNode != null)
ctx.io().sendToGridTopic(locNode, topic, msg, plc);
- if (!rmtNodes.isEmpty()) {
- msg.prepareMarshal(marsh);
-
+ if (!rmtNodes.isEmpty())
ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
- }
}
/**
@@ -1216,7 +1204,7 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
throw new IgniteDeploymentCheckedException("Failed to
obtain deployment for event filter " +
"(is peer class loading turned on?): " + req);
- req.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+ req.finishUnmarshalFilters(marsh,
U.resolveClassLoader(dep.classLoader(), ctx.config()));
filter = (IgnitePredicate<Event>)req.filter();
@@ -1252,9 +1240,6 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
if (log.isDebugEnabled())
log.debug("Sending event query response to node
[nodeId=" + nodeId + "res=" + res + ']');
- if (!ctx.localNodeId().equals(nodeId))
- res.prepareMarshal(marsh);
-
ctx.io().sendToCustomTopic(node, req.responseTopic(), res,
PUBLIC_POOL);
}
catch (ClusterTopologyCheckedException e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
index dade283bc0d..263451596e8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
@@ -32,13 +32,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;
/**
* Event storage message.
*/
-public class GridEventStorageMessage implements Message {
+public class GridEventStorageMessage implements MarshallableMessage {
/** */
private Object resTopic;
@@ -205,42 +205,42 @@ public class GridEventStorageMessage implements Message {
return ErrorMessage.error(errMsg);
}
- /**
- * @param marsh Marshaller.
- */
- public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
- if (resTopic != null && resTopicBytes == null)
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (resTopic != null)
resTopicBytes = U.marshal(marsh, resTopic);
- if (filter != null && filterBytes == null)
+ if (filter != null)
filterBytes = U.marshal(marsh, filter);
- if (evts != null && evtsBytes == null)
+ if (evts != null)
evtsBytes = U.marshal(marsh, evts);
}
- /**
- * @param marsh Marshaller.
- * @param ldr Class loader.
- * @param filterClsLdr Class loader for filter.
- */
- public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader
filterClsLdr) throws IgniteCheckedException {
- if (resTopicBytes != null && resTopic == null) {
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr)
throws IgniteCheckedException {
+ if (resTopicBytes != null) {
resTopic = U.unmarshal(marsh, resTopicBytes, ldr);
resTopicBytes = null;
}
- if (filterBytes != null && filter == null && filterClsLdr != null) {
- filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
+ if (evtsBytes != null) {
+ evts = U.unmarshal(marsh, evtsBytes, ldr);
- filterBytes = null;
+ evtsBytes = null;
}
+ }
- if (evtsBytes != null && evts == null) {
- evts = U.unmarshal(marsh, evtsBytes, ldr);
+ /**
+ * @param marsh Marshaller.
+ * @param filterClsLdr Class loader for filter.
+ */
+ public void finishUnmarshalFilters(Marshaller marsh, ClassLoader
filterClsLdr) throws IgniteCheckedException {
+ if (filterBytes != null && filter == null) {
+ filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
- evtsBytes = null;
+ filterBytes = null;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
index 70db1390a16..2cc6e5b9dee 100644
---
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
@@ -122,18 +122,6 @@ public class GridP2PHotRedeploymentSelfTest extends
GridCommonAbstractTest {
assert taskCls1.getClassLoader() != taskCls2.getClassLoader();
assert taskCls1 != taskCls2;
-// final AtomicBoolean undeployed = new AtomicBoolean(false);
-//
-// grid2.events().localListen(new GridLocalEventListener() {
-// @Override public void onEvent(GridEvent evt) {
-// if (evt.type() == EVT_TASK_UNDEPLOYED) {
-// assert
((GridDeploymentEvent)evt).alias().equals(TASK_NAME);
-//
-// undeployed.set(true);
-// }
-// }
-// }, EVT_TASK_UNDEPLOYED);
-
ignite2.compute().localDeployTask(taskCls1,
taskCls1.getClassLoader());
Integer res1 = ignite1.compute().execute(taskCls1,
Collections.singletonList(ignite2.cluster().localNode().id()));
@@ -149,10 +137,6 @@ public class GridP2PHotRedeploymentSelfTest extends
GridCommonAbstractTest {
info("Result2: " + res2);
assert !res1.equals(res2);
-
-// Thread.sleep(P2P_TIMEOUT * 2);
-//
-// assert undeployed.get();
}
finally {
stopGrid(2);