This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d048c0aa4bd IGNITE-27800 Use MessageSerializer for GridIoMessage and
GridIoSecurityAwareMessage (#12720)
d048c0aa4bd is described below
commit d048c0aa4bdb8a2aa6a324ba9004d1a7d8720f19
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Feb 12 17:27:58 2026 +0500
IGNITE-27800 Use MessageSerializer for GridIoMessage and
GridIoSecurityAwareMessage (#12720)
---
.../managers/communication/GridIoManager.java | 18 +-
.../managers/communication/GridIoMessage.java | 228 +++++++--------------
.../communication/GridIoMessageFactory.java | 6 +-
.../communication/GridIoSecurityAwareMessage.java | 67 ++----
4 files changed, 107 insertions(+), 212 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 661e023febe..633ebef235b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1202,8 +1202,10 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
if (initMsg.topic() == null) {
int topicOrd = initMsg.topicOrdinal();
- initMsg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
- U.unmarshal(marsh, initMsg.topicBytes(),
U.resolveClassLoader(ctx.config())));
+ if (topicOrd >= 0)
+ initMsg.topic(GridTopic.fromOrdinal(topicOrd));
+ else
+ initMsg.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()));
}
byte plc = initMsg.policy();
@@ -1249,8 +1251,10 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
if (msg.topic() == null) {
int topicOrd = msg.topicOrdinal();
- msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
- U.unmarshal(marsh, msg.topicBytes(),
U.resolveClassLoader(ctx.config())));
+ if (topicOrd >= 0)
+ msg.topic(GridTopic.fromOrdinal(topicOrd));
+ else
+ msg.finishUnmarshal(marsh,
U.resolveClassLoader(ctx.config()));
}
if (!started) {
@@ -1977,7 +1981,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
try {
if (topicOrd < 0)
- ioMsg.topicBytes(U.marshal(marsh, topic));
+ ioMsg.prepareMarshal(marsh);
return
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
}
@@ -2051,7 +2055,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
}
else {
if (topicOrd < 0)
- ioMsg.topicBytes(U.marshal(marsh, topic));
+ ioMsg.prepareMarshal(marsh);
try {
if ((CommunicationSpi<?>)getSpi() instanceof
TcpCommunicationSpi)
@@ -4311,7 +4315,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
if (ctx.security().enabled()) {
assert msg instanceof GridIoSecurityAwareMessage;
- return ((GridIoSecurityAwareMessage)msg).secSubjId();
+ return ((GridIoSecurityAwareMessage)msg).securitySubjectId();
}
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index fe967f62b0a..f0e2f66bebb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -17,17 +17,17 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.ExecutorAwareMessage;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.tracing.messages.SpanTransport;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,32 +38,39 @@ public class GridIoMessage implements Message,
SpanTransport {
public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE;
/** Policy. */
+ @Order(value = 0, method = "policy")
private byte plc;
/** Message topic. */
@GridToStringInclude
- @GridDirectTransient
private Object topic;
/** Topic bytes. */
+ @Order(1)
private byte[] topicBytes;
/** Topic ordinal. */
+ @Order(value = 2, method = "topicOrdinal")
private int topicOrd = -1;
/** Message ordered flag. */
+ @Order(value = 3, method = "isOrdered")
private boolean ordered;
/** Message timeout. */
+ @Order(4)
private long timeout;
/** Whether message can be skipped on timeout. */
+ @Order(5)
private boolean skipOnTimeout;
/** Message. */
+ @Order(value = 6, method = "message")
private Message msg;
/** Serialized span */
+ @Order(7)
private byte[] span;
/**
@@ -107,10 +114,17 @@ public class GridIoMessage implements Message,
SpanTransport {
/**
* @return Policy.
*/
- byte policy() {
+ public byte policy() {
return plc;
}
+ /**
+ * @param plc Policy.
+ */
+ public void policy(byte plc) {
+ this.plc = plc;
+ }
+
/**
* @return Topic.
*/
@@ -128,24 +142,31 @@ public class GridIoMessage implements Message,
SpanTransport {
/**
* @return Topic bytes.
*/
- byte[] topicBytes() {
+ public byte[] topicBytes() {
return topicBytes;
}
/**
* @param topicBytes Topic bytes.
*/
- void topicBytes(byte[] topicBytes) {
+ public void topicBytes(byte[] topicBytes) {
this.topicBytes = topicBytes;
}
/**
* @return Topic ordinal.
*/
- int topicOrdinal() {
+ public int topicOrdinal() {
return topicOrd;
}
+ /**
+ * @param topicOrd Topic ordinal.
+ */
+ public void topicOrdinal(int topicOrd) {
+ this.topicOrd = topicOrd;
+ }
+
/**
* @return Message.
*/
@@ -153,6 +174,13 @@ public class GridIoMessage implements Message,
SpanTransport {
return msg;
}
+ /**
+ * @param msg Message.
+ */
+ public void message(Message msg) {
+ this.msg = msg;
+ }
+
/**
* @return Message timeout.
*/
@@ -160,6 +188,13 @@ public class GridIoMessage implements Message,
SpanTransport {
return timeout;
}
+ /**
+ * @param timeout Message timeout.
+ */
+ public void timeout(long timeout) {
+ this.timeout = timeout;
+ }
+
/**
* @return Whether message can be skipped on timeout.
*/
@@ -167,13 +202,27 @@ public class GridIoMessage implements Message,
SpanTransport {
return skipOnTimeout;
}
+ /**
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ */
+ public void skipOnTimeout(boolean skipOnTimeout) {
+ this.skipOnTimeout = skipOnTimeout;
+ }
+
/**
* @return {@code True} if message is ordered, {@code false} otherwise.
*/
- boolean isOrdered() {
+ public boolean isOrdered() {
return ordered;
}
+ /**
+ * @param ordered {@code True} if message is ordered, {@code false}
otherwise.
+ */
+ public void isOrdered(boolean ordered) {
+ this.ordered = ordered;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
throw new AssertionError();
@@ -184,145 +233,6 @@ public class GridIoMessage implements Message,
SpanTransport {
throw new AssertionError();
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeMessage(msg))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeBoolean(ordered))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByte(plc))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeBoolean(skipOnTimeout))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeByteArray(span))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeLong(timeout))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeByteArray(topicBytes))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeInt(topicOrd))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- msg = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- ordered = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- plc = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- skipOnTimeout = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- span = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- timeout = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- topicBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- topicOrd = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 8;
@@ -362,6 +272,26 @@ public class GridIoMessage implements Message,
SpanTransport {
return null;
}
+ /**
+ * @param marsh Marshaller.
+ */
+ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
+ if (topic != null && topicBytes == null)
+ topicBytes = U.marshal(marsh, topic);
+ }
+
+ /**
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ */
+ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
+ if (topicBytes != null && topic == null) {
+ topic = U.unmarshal(marsh, topicBytes, ldr);
+
+ topicBytes = null;
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridIoMessage.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index de56befa844..8f77d741f8b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -93,6 +93,8 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSeriali
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
import org.apache.ignite.internal.codegen.GridEventStorageMessageSerializer;
+import org.apache.ignite.internal.codegen.GridIoMessageSerializer;
+import org.apache.ignite.internal.codegen.GridIoSecurityAwareMessageSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
@@ -364,7 +366,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)5, GridTaskCancelRequest::new, new
GridTaskCancelRequestSerializer());
factory.register((short)6, GridTaskSessionRequest::new, new
GridTaskSessionRequestSerializer());
factory.register((short)7, GridCheckpointRequest::new, new
GridCheckpointRequestSerializer());
- factory.register((short)8, GridIoMessage::new);
+ factory.register((short)8, GridIoMessage::new, new
GridIoMessageSerializer());
factory.register((short)9, GridIoUserMessage::new);
factory.register((short)10, GridDeploymentInfoBean::new);
factory.register((short)11, GridDeploymentRequest::new);
@@ -476,7 +478,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
factory.register(GridQueryKillRequest.TYPE_CODE,
GridQueryKillRequest::new, new GridQueryKillRequestSerializer());
factory.register(GridQueryKillResponse.TYPE_CODE,
GridQueryKillResponse::new, new GridQueryKillResponseSerializer());
- factory.register(GridIoSecurityAwareMessage.TYPE_CODE,
GridIoSecurityAwareMessage::new);
+ factory.register(GridIoSecurityAwareMessage.TYPE_CODE,
GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer());
factory.register(SessionChannelMessage.TYPE_CODE,
SessionChannelMessage::new, new SessionChannelMessageSerializer());
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new,
new TcpInverseConnectionResponseMessageSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
index 879464267cd..9a256cd1169 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
@@ -30,7 +28,8 @@ public class GridIoSecurityAwareMessage extends GridIoMessage
{
/** */
public static final short TYPE_CODE = 174;
- /** Security subject id that will be used during message processing on an
remote node. */
+ /** Security subject ID that will be used during message processing on a
remote node. */
+ @Order(value = 8, method = "securitySubjectId")
private UUID secSubjId;
/**
@@ -41,7 +40,7 @@ public class GridIoSecurityAwareMessage extends GridIoMessage
{
}
/**
- * @param secSubjId Security subject id.
+ * @param secSubjId Security subject ID.
* @param plc Policy.
* @param topic Communication topic.
* @param topicOrd Topic ordinal value.
@@ -66,61 +65,21 @@ public class GridIoSecurityAwareMessage extends
GridIoMessage {
}
/**
- * @return Security subject id.
+ * @return Security subject ID.
*/
- UUID secSubjId() {
+ public UUID securitySubjectId() {
return secSubjId;
}
- /** {@inheritDoc} */
- @Override public short directType() {
- return TYPE_CODE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeUuid(secSubjId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /**
+ * @param secSubjId Security subject ID.
+ */
+ public void securitySubjectId(UUID secSubjId) {
+ this.secSubjId = secSubjId;
}
/** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- secSubjId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ @Override public short directType() {
+ return TYPE_CODE;
}
}