This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch ignite-2.18
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.18 by this push:
new 094f54ce3ac IGNITE-27652 Refactor
RingMessageWorker#sendMessageToClients (#12663)
094f54ce3ac is described below
commit 094f54ce3ac268cda21876e87cc37fac55a57d70
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Feb 3 11:52:38 2026 +0300
IGNITE-27652 Refactor RingMessageWorker#sendMessageToClients (#12663)
(cherry picked from commit 1f175f97a8cc77a88c61aba45f729403c9f4e2c4)
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 63 ++++------------------
1 file changed, 10 insertions(+), 53 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0d0323bbc09..0a24730a12c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -100,7 +100,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -3242,46 +3241,19 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spi.ensured(msg))
msgHist.add(msg);
- byte[] msgBytes = null;
-
for (ClientMessageWorker clientMsgWorker :
clientMsgWorkers.values()) {
- if (msgBytes == null) {
- try {
- msgBytes =
clientMsgWorker.ses.serializeMessage(msg);
- }
- catch (IgniteCheckedException | IOException e) {
- U.error(log, "Failed to serialize message to a
client: " + msg + ", recepient " +
- "client id: " + clientMsgWorker.clientNodeId,
e);
-
- break;
- }
- }
-
- TcpDiscoveryAbstractMessage msg0 = msg;
- byte[] msgBytes0 = msgBytes;
-
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage nodeAddedMsg =
(TcpDiscoveryNodeAddedMessage)msg;
- TcpDiscoveryNode node = nodeAddedMsg.node();
-
- if (clientMsgWorker.clientNodeId.equals(node.id())) {
- try {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization.
- msg0 = U.unmarshal(spi.marshaller(), msgBytes,
-
U.resolveClassLoader(spi.ignite().configuration()));
-
- prepareNodeAddedMessage(msg0,
clientMsgWorker.clientNodeId, null);
+ if
(clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
+ msg = new
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
- msgBytes0 = null;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to create message copy: "
+ msg, e);
- }
+ prepareNodeAddedMessage(msg,
clientMsgWorker.clientNodeId, null);
}
}
- clientMsgWorker.addMessage(msg0, msgBytes0);
+ // TODO Investigate possible optimizations:
https://issues.apache.org/jira/browse/IGNITE-27722
+ clientMsgWorker.addMessage(msg);
}
}
}
@@ -7610,7 +7582,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** */
- private class ClientMessageWorker extends
MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
+ private class ClientMessageWorker extends
MessageWorker<TcpDiscoveryAbstractMessage> {
/** Node ID. */
private final UUID clientNodeId;
@@ -7677,20 +7649,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
- addMessage(msg, null);
- }
-
- /**
- * @param msg Message.
- * @param msgBytes Optional message bytes.
- */
- void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[]
msgBytes) {
- T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg,
msgBytes);
-
if (msg.highPriority())
- queue.addFirst(t);
+ queue.addFirst(msg);
else
- queue.add(t);
+ queue.add(msg);
DebugLogger log = messageLogger(msg);
@@ -7699,18 +7661,13 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected void
processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
+ @Override protected void processMessage(TcpDiscoveryAbstractMessage
msg) {
boolean success = false;
- TcpDiscoveryAbstractMessage msg = msgT.get1();
-
try {
assert msg.verified() : msg;
- byte[] msgBytes = msgT.get2();
-
- if (msgBytes == null)
- msgBytes = ses.serializeMessage(msg);
+ byte[] msgBytes = ses.serializeMessage(msg);
DebugLogger msgLog = messageLogger(msg);