This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a1abd2ca936 IGNITE-26111 TcpDiscoverySpi uses MessageSerializer
(#12243)
a1abd2ca936 is described below
commit a1abd2ca9360f1857008c90e6f4746b485dc258d
Author: Maksim Timonin <[email protected]>
AuthorDate: Thu Nov 13 16:26:15 2025 +0300
IGNITE-26111 TcpDiscoverySpi uses MessageSerializer (#12243)
---
.../discovery/DiscoveryMessageFactory.java | 31 +++
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 ++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 71 +++---
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 11 +
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 275 +++++++++++++++++++++
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 63 +++--
.../tcp/messages/TcpDiscoveryAbstractMessage.java | 11 +
.../messages/TcpDiscoveryCheckFailedMessage.java | 23 +-
.../ignite/internal/IgniteClientRejoinTest.java | 8 +-
.../IgniteDiscoveryMassiveNodeFailTest.java | 8 +-
.../CacheClientsConcurrentStartTest.java | 10 +-
.../dht/IgniteCacheTopologySplitAbstractTest.java | 9 +-
.../IncrementalSnapshotJoiningClientTest.java | 10 +-
.../IgniteTcpCommunicationConnectOnInitTest.java | 9 +-
.../TcpCommunicationSpiSkipMessageSendTest.java | 13 +-
.../discovery/LongClientConnectToClusterTest.java | 7 +-
.../spi/discovery/tcp/BlockTcpDiscoverySpi.java | 6 +-
.../tcp/DiscoveryUnmarshalVulnerabilityTest.java | 1 +
.../spi/discovery/tcp/IgniteClientConnectTest.java | 11 +-
...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 16 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 10 +-
.../tcp/TcpDiscoveryCoordinatorFailureTest.java | 8 +-
.../discovery/tcp/TcpDiscoveryFailedJoinTest.java | 7 +-
.../tcp/TcpDiscoveryNetworkIssuesTest.java | 15 +-
.../tcp/TcpDiscoveryNodeJoinAndFailureTest.java | 4 +-
.../TcpDiscoveryPendingMessageDeliveryTest.java | 5 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 59 ++---
.../tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java | 7 +-
.../tcp/TcpDiscoverySpiReconnectDelayTest.java | 7 +-
.../tcp/TcpDiscoverySslSecuredUnsecuredTest.java | 7 +-
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 6 +-
31 files changed, 537 insertions(+), 243 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
new file mode 100644
index 00000000000..c30286a909a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+
+/** Message factory for discovery messages. */
+public class DiscoveryMessageFactory implements MessageFactoryProvider {
+ /** {@inheritDoc} */
+ @Override public void registerAll(MessageFactory factory) {
+ factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new
TcpDiscoveryCheckFailedMessageSerializer());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 643d4d3fb25..caf8a530ffa 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -17,11 +17,8 @@
package org.apache.ignite.spi.discovery.tcp;
-import java.io.BufferedInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InterruptedIOException;
-import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -719,13 +716,13 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean openSock = false;
Socket sock = null;
- OutputStream out;
try {
long tsNanos = System.nanoTime();
sock = spi.openSocket(addr, timeoutHelper);
- out = spi.socketStream(sock);
+
+ TcpDiscoveryIoSession ses = createSession(sock);
openSock = true;
@@ -733,9 +730,9 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, out, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(ses,
ackTimeout0);
UUID rmtNodeId = res.creatorNodeId();
@@ -788,7 +785,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);
- spi.writeToSocket(sock, out, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
@@ -1179,7 +1176,7 @@ class ClientImpl extends TcpDiscoveryImpl {
+ ":" + sockStream.sock.getPort());
try {
- InputStream in = sockStream.stream();
+ TcpDiscoveryIoSession ses = createSession(sock);
assert sock.getKeepAlive() && sock.getTcpNoDelay() :
"Socket wasn't configured properly:" +
" KeepAlive " + sock.getKeepAlive() +
@@ -1189,7 +1186,7 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryAbstractMessage msg;
try {
- msg = U.unmarshal(spi.marshaller(), in,
U.resolveClassLoader(spi.ignite().configuration()));
+ msg = spi.readMessage(ses, sock.getSoTimeout());
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -1266,6 +1263,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private Socket sock;
+ /** */
+ private TcpDiscoveryIoSession ses;
+
/** */
private boolean clientAck;
@@ -1333,6 +1333,8 @@ class ClientImpl extends TcpDiscoveryImpl {
synchronized (mux) {
this.sock = sock;
+ ses = createSession(sock);
+
this.clientAck = clientAck;
unackedMsg = null;
@@ -1387,11 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.client(true);
try {
- spi.writeToSocket(
- sock,
- spi.socketStream(sock),
- msg,
- sockTimeout);
+ spi.writeMessage(ses, msg, sockTimeout);
}
catch (IOException | IgniteCheckedException e) {
if (log.isDebugEnabled()) {
@@ -1434,11 +1432,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- spi.writeToSocket(
- sock,
- spi.socketStream(sock),
- msg,
- sockTimeout);
+ spi.writeMessage(ses, msg, sockTimeout);
IgniteUuid latencyCheckId = msg instanceof
TcpDiscoveryRingLatencyCheckMessage ?
msg.id() : null;
@@ -1601,6 +1595,7 @@ class ClientImpl extends TcpDiscoveryImpl {
clientAck = joinRes.get2();
Socket sock = sockStream.socket();
+ TcpDiscoveryIoSession ses = createSession(sock);
if (isInterrupted())
throw new InterruptedException();
@@ -1612,8 +1607,6 @@ class ClientImpl extends TcpDiscoveryImpl {
sock.setSoTimeout((int)spi.netTimeout);
- InputStream in = sockStream.stream();
-
assert sock.getKeepAlive() && sock.getTcpNoDelay() :
"Socket wasn't configured properly:" +
" KeepAlive " + sock.getKeepAlive() +
" TcpNoDelay " + sock.getTcpNoDelay();
@@ -1621,8 +1614,7 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg =
U.unmarshal(spi.marshaller(), in,
-
U.resolveClassLoader(spi.ignite().configuration()));
+ TcpDiscoveryAbstractMessage msg =
spi.readMessage(ses, sock.getSoTimeout());
if (msg instanceof
TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res =
(TcpDiscoveryClientReconnectMessage)msg;
@@ -2769,9 +2761,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private final Socket sock;
- /** */
- private final InputStream in;
-
/**
* @param sock Socket.
* @throws IOException If failed to create stream.
@@ -2780,8 +2769,6 @@ class ClientImpl extends TcpDiscoveryImpl {
assert sock != null;
this.sock = sock;
-
- this.in = new BufferedInputStream(sock.getInputStream());
}
/**
@@ -2792,13 +2779,6 @@ class ClientImpl extends TcpDiscoveryImpl {
}
- /**
- * @return Socket input stream.
- */
- InputStream stream() {
- return in;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return sock.toString();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7905a5a8ac0..8b5bdadbebd 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -17,11 +17,8 @@
package org.apache.ignite.spi.discovery.tcp;
-import java.io.BufferedInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.ObjectStreamException;
-import java.io.OutputStream;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.net.ConnectException;
@@ -877,11 +874,12 @@ class ServerImpl extends TcpDiscoveryImpl {
openedSock = true;
- spi.writeToSocket(sock, spi.socketStream(sock), new
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+ TcpDiscoveryIoSession ses = createSession(sock);
+
+ spi.writeMessage(ses, new
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryPingResponse res = spi.readMessage(sock,
null, timeoutHelper.nextTimeoutChunk(
- spi.getAckTimeout()));
+ TcpDiscoveryPingResponse res = spi.readMessage(ses,
timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -1469,16 +1467,16 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
sock = spi.openSocket(addr, timeoutHelper);
+ TcpDiscoveryIoSession ses = createSession(sock);
openSock = true;
TcpDiscoveryHandshakeRequest req = new
TcpDiscoveryHandshakeRequest(locNodeId);
// Handshake.
- spi.writeToSocket(sock, spi.socketStream(sock), req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null, timeoutHelper.nextTimeoutChunk(
- ackTimeout0));
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(ses,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
boolean ignore = false;
@@ -1510,7 +1508,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send message.
tsNanos = System.nanoTime();
- spi.writeToSocket(sock, spi.socketStream(sock), msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tsNanos0 = System.nanoTime();
@@ -2957,8 +2955,8 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Socket. */
private Socket sock;
- /** Output stream. */
- private OutputStream out;
+ /** IO session. */
+ private TcpDiscoveryIoSession ses;
/** Last time status message has been sent. */
private long lastTimeStatusMsgSentNanos;
@@ -3522,7 +3520,7 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
sock = spi.openSocket(addr, timeoutHelper);
- out = spi.socketStream(sock);
+ ses = createSession(sock);
openSock = true;
@@ -3540,16 +3538,14 @@ class ServerImpl extends TcpDiscoveryImpl {
"] with timeout " +
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
- spi.writeToSocket(sock, out, hndMsg,
-
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, hndMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
if (log.isDebugEnabled()) {
log.debug("Reading handshake response with
timeout " +
timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
- TcpDiscoveryHandshakeResponse res =
spi.readMessage(sock, null,
-
timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ TcpDiscoveryHandshakeResponse res =
spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Handshake response: " + res);
@@ -3748,8 +3744,7 @@ class ServerImpl extends TcpDiscoveryImpl {
timeoutHelper =
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
try {
- spi.writeToSocket(sock, out,
pendingMsg, timeoutHelper.nextTimeoutChunk(
- spi.getSocketTimeout()));
+ spi.writeMessage(ses, pendingMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
finally {
clearNodeAddedMessage(pendingMsg);
@@ -3800,11 +3795,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (latencyCheck && log.isInfoEnabled())
log.info("Latency check message has been
written to socket: " + msg.id());
- spi.writeToSocket(
- sock,
- out,
- msg,
-
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tsNanos0 = System.nanoTime();
@@ -5573,7 +5564,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (leftNode.equals(next) && sock != null) {
try {
- spi.writeToSocket(sock, out, msg,
spi.failureDetectionTimeoutEnabled() ?
+ spi.writeMessage(ses, msg,
spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() :
spi.getSocketTimeout());
if (log.isDebugEnabled())
@@ -6696,6 +6687,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Thread that reads messages from the socket created for incoming
connections.
*/
private class SocketReader extends IgniteSpiThread {
+ /** IO session. */
+ private final TcpDiscoveryIoSession ses;
+
/** Socket to read data from. */
private final Socket sock;
@@ -6712,6 +6706,8 @@ class ServerImpl extends TcpDiscoveryImpl {
this.sock = sock;
+ ses = createSession(sock);
+
setPriority(spi.threadPri);
}
@@ -6731,8 +6727,6 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean srvSock;
try {
- InputStream in;
-
try {
// Set socket options.
spi.configureSocketOptions(sock);
@@ -6744,15 +6738,11 @@ class ServerImpl extends TcpDiscoveryImpl {
for (IgniteInClosure<Socket> connLsnr :
spi.incomeConnLsnrs)
connLsnr.apply(sock);
- int rcvBufSize = sock.getReceiveBufferSize();
-
- in = new BufferedInputStream(sock.getInputStream(),
rcvBufSize > 0 ? rcvBufSize : 8192);
-
byte[] buf = new byte[4];
int read = 0;
while (read < buf.length) {
- int r = in.read(buf, read, buf.length - read);
+ int r = sock.getInputStream().read(buf, read,
buf.length - read);
if (r >= 0)
read += r;
@@ -6787,7 +6777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Restore timeout.
sock.setSoTimeout(timeout);
- TcpDiscoveryAbstractMessage msg = spi.readMessage(sock,
in, spi.netTimeout);
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(ses,
spi.netTimeout);
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
@@ -6812,7 +6802,7 @@ class ServerImpl extends TcpDiscoveryImpl {
res.clientExists(clientWorker.ping(timeoutHelper));
}
- spi.writeToSocket(sock, spi.socketStream(sock),
res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeMessage(ses, res,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
if (!(sock instanceof SSLSocket))
sock.shutdownOutput();
@@ -6905,7 +6895,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.getEffectiveSocketTimeout(srvSock) + " to " +
rmtAddr + ":" + sock.getPort());
}
- spi.writeToSocket(sock, spi.socketStream(sock), res,
spi.getEffectiveSocketTimeout(srvSock));
+ spi.writeMessage(ses, res,
spi.getEffectiveSocketTimeout(srvSock));
// It can happen if a remote node is stopped and it has a
loopback address in the list of addresses,
// the local node sends a handshake request message on the
loopback address, so we get here.
@@ -7028,8 +7018,8 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
SecurityUtils.serializeVersion(1);
- TcpDiscoveryAbstractMessage msg =
U.unmarshal(spi.marshaller(), in,
-
U.resolveClassLoader(spi.ignite().configuration()));
+ // Use inifinite timeout for accepting new messages.
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(ses,
0);
msg.senderNodeId(nodeId);
@@ -7713,6 +7703,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Node ID. */
private final UUID clientNodeId;
+ /** */
+ private final TcpDiscoveryIoSession ses;
+
/** Socket. */
private final Socket sock;
@@ -7741,6 +7734,8 @@ class ServerImpl extends TcpDiscoveryImpl {
this.sock = sock;
this.clientNodeId = clientNodeId;
+ ses = createSession(sock);
+
lastMetricsUpdateMsgTimeNanos = System.nanoTime();
}
@@ -7804,7 +7799,7 @@ class ServerImpl extends TcpDiscoveryImpl {
byte[] msgBytes = msgT.get2();
if (msgBytes == null)
- msgBytes = U.marshal(spi.marshaller(), msg);
+ msgBytes = ses.serializeMessage(msg);
DebugLogger msgLog = messageLogger(msg);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index d8d5164f4af..6ff2faa1974 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@@ -439,6 +440,16 @@ abstract class TcpDiscoveryImpl {
return res;
}
+ /**
+ * Instantiates IO session for exchanging discovery messages with remote
node.
+ *
+ * @param sock Socket to remote node.
+ * @return IO session for writing and reading {@link
TcpDiscoveryAbstractMessage}.
+ */
+ TcpDiscoveryIoSession createSession(Socket sock) {
+ return new TcpDiscoveryIoSession(sock, spi);
+ }
+
/**
* @param msg Message.
* @return Message logger.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
new file mode 100644
index 00000000000..1bf398c9780
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
+
+/**
+ * Handles I/O operations between discovery nodes in the cluster. This class
encapsulates the socket connection used
+ * by the {@link TcpDiscoverySpi} to exchange discovery protocol messages
between nodes.
+ * <p>
+ * Currently, there are two modes for message serialization:
+ * <ul>
+ * <li>Using {@link MessageSerializer} for messages implementing the
{@link Message} interface.</li>
+ * <li>Deprecated: Using {@link JdkMarshaller} for messages that have not
yet been refactored.</li>
+ * </ul>
+ * A leading byte is used to distinguish between the modes. The byte will be
removed in future.
+ */
+public class TcpDiscoveryIoSession {
+ /** Default size of buffer used for buffering socket in/out. */
+ private static final int DFLT_SOCK_BUFFER_SIZE = 8192;
+
+ /** Size for an intermediate buffer for serializing discovery messages. */
+ private static final int MSG_BUFFER_SIZE = 100;
+
+ /** Leading byte for messages use {@link JdkMarshaller} for serialization.
*/
+ // TODO: remove these flags after refactoring all discovery messages.
+ static final byte JAVA_SERIALIZATION = (byte)1;
+
+ /** Leading byte for messages use {@link MessageSerializer} for
serialization. */
+ static final byte MESSAGE_SERIALIZATION = (byte)2;
+
+ /** */
+ private final TcpDiscoverySpi spi;
+
+ /** Loads discovery messages classes during java deserialization. */
+ private final ClassLoader clsLdr;
+
+ /** */
+ private final Socket sock;
+
+ /** */
+ private final DirectMessageWriter msgWriter;
+
+ /** */
+ private final DirectMessageReader msgReader;
+
+ /** Buffered socket output stream. */
+ private final OutputStream out;
+
+ /** Buffered socket input stream. */
+ private final InputStream in;
+
+ /** Intermediate buffer for serializing discovery messages. */
+ private final ByteBuffer msgBuf;
+
+ /**
+ * Creates a new discovery I/O session bound to the given socket.
+ *
+ * @param sock Socket connected to a remote discovery node.
+ * @param spi Discovery SPI instance owning this session.
+ * @throws IgniteException If an I/O error occurs while initializing
buffers.
+ */
+ TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) {
+ this.sock = sock;
+ this.spi = spi;
+
+ clsLdr = U.resolveClassLoader(spi.ignite().configuration());
+
+ msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+ msgWriter = new DirectMessageWriter(spi.messageFactory());
+ msgReader = new DirectMessageReader(spi.messageFactory(), null);
+
+ try {
+ int sendBufSize = sock.getSendBufferSize() > 0 ?
sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE;
+ int rcvBufSize = sock.getReceiveBufferSize() > 0 ?
sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE;
+
+ out = new BufferedOutputStream(sock.getOutputStream(),
sendBufSize);
+ in = new BufferedInputStream(sock.getInputStream(), rcvBufSize);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Writes a discovery message to the underlying socket output stream.
+ *
+ * @param msg Message to send to the remote node.
+ * @throws IgniteCheckedException If serialization fails.
+ */
+ void writeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
+ if (!(msg instanceof Message)) {
+ out.write(JAVA_SERIALIZATION);
+
+ U.marshal(spi.marshaller(), msg, out);
+
+ return;
+ }
+
+ try {
+ out.write(MESSAGE_SERIALIZATION);
+
+ serializeMessage((Message)msg, out);
+
+ out.flush();
+ }
+ catch (Exception e) {
+ // Keep logic similar to `U.marshal(...)`.
+ if (e instanceof IgniteCheckedException)
+ throw (IgniteCheckedException)e;
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Reads the next discovery message from the socket input stream.
+ *
+ * @param <T> Type of the expected message.
+ * @return Deserialized message instance.
+ * @throws IgniteCheckedException If deserialization fails.
+ */
+ <T> T readMessage() throws IgniteCheckedException, IOException {
+ byte serMode = (byte)in.read();
+
+ if (JAVA_SERIALIZATION == serMode)
+ return U.unmarshal(spi.marshaller(), in, clsLdr);
+
+ try {
+ if (MESSAGE_SERIALIZATION != serMode) {
+ detectSslAlert(serMode, in);
+
+ throw new IgniteCheckedException("Received unexpected byte
while reading discovery message: " + serMode);
+ }
+
+ byte b0 = (byte)in.read();
+ byte b1 = (byte)in.read();
+
+ Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
+
+ msgReader.reset();
+ msgReader.setBuffer(msgBuf);
+
+ MessageSerializer msgSer =
spi.messageFactory().serializer(msg.directType());
+
+ boolean finished;
+
+ do {
+ msgBuf.clear();
+
+ int read = in.read(msgBuf.array(), 0, msgBuf.limit());
+
+ if (read == -1)
+ throw new EOFException("Connection closed before message
was fully read.");
+
+ msgBuf.limit(read);
+
+ finished = msgSer.readFrom(msg, msgReader);
+ } while (!finished);
+
+ return (T)msg;
+ }
+ catch (Exception e) {
+ // Keep logic similar to `U.marshal(...)`.
+ if (e instanceof IgniteCheckedException)
+ throw (IgniteCheckedException)e;
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Serializes a discovery message into a byte array.
+ *
+ * @param msg Discovery message to serialize.
+ * @return Serialized byte array containing the message data.
+ * @throws IgniteCheckedException If serialization fails.
+ * @throws IOException If serialization fails.
+ */
+ byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
+ if (!(msg instanceof Message))
+ return U.marshal(spi.marshaller(), msg);
+
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ serializeMessage((Message)msg, out);
+
+ return out.toByteArray();
+ }
+ }
+
+ /** @return Socket. */
+ public Socket socket() {
+ return sock;
+ }
+
+ /**
+ * Serializes a discovery message into given output stream.
+ *
+ * @param m Discovery message to serialize.
+ * @param out Output stream to write serialized message.
+ * @throws IOException If serialization fails.
+ */
+ private void serializeMessage(Message m, OutputStream out) throws
IOException {
+ MessageSerializer msgSer =
spi.messageFactory().serializer(m.directType());
+
+ msgWriter.reset();
+ msgWriter.setBuffer(msgBuf);
+
+ boolean finished;
+
+ do {
+ finished = msgSer.writeTo(m, msgWriter);
+
+ out.write(msgBuf.array(), 0, msgBuf.position());
+
+ msgBuf.clear();
+ }
+ while (!finished);
+ }
+
+ /**
+ * Checks wheter input stream contains SSL alert.
+ * See handling {@code StreamCorruptedException} in {@link #readMessage()}.
+ * Keeps logic similar to {@link
java.io.ObjectInputStream#readStreamHeader}.
+ */
+ private void detectSslAlert(byte firstByte, InputStream in) throws
IOException {
+ byte[] hdr = new byte[4];
+ hdr[0] = firstByte;
+ int read = in.readNBytes(hdr, 1, 3);
+
+ if (read < 3)
+ throw new EOFException();
+
+ String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2],
hdr[3]);
+
+ if (hex.matches("15....00"))
+ throw new StreamCorruptedException("invalid stream header: " +
hex);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index b82f3fca33a..1ce14f60cdd 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -17,10 +17,8 @@
package org.apache.ignite.spi.discovery.tcp;
-import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.StreamCorruptedException;
@@ -58,6 +56,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
@@ -73,6 +73,9 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
@@ -450,6 +453,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/** */
protected IgniteSpiContext spiCtx;
+ /** Discovery messages factory. */
+ private MessageFactory msgFactory;
+
/** For test purposes. */
private boolean skipAddrsRandomization = false;
@@ -1113,6 +1119,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
locNodeVer = ver;
}
+ /** @return Discovery messages factory. */
+ public MessageFactory messageFactory() {
+ return msgFactory;
+ }
+
/**
* Gets ID of the local node.
*
@@ -1565,18 +1576,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
return openSocket(createSocket(), sockAddr, timeoutHelper);
}
- /**
- * @param sock Socket.
- * @return Buffered stream wrapping socket stream.
- * @throws IOException If failed.
- */
- final BufferedOutputStream socketStream(Socket sock) throws IOException {
- int bufSize = sock.getSendBufferSize();
-
- return bufSize > 0 ? new BufferedOutputStream(sock.getOutputStream(),
bufSize) :
- new BufferedOutputStream(sock.getOutputStream());
- }
-
/**
* Connects to remote address sending {@code U.IGNITE_HEADER} when
connection is established.
*
@@ -1692,6 +1691,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
OutputStream out = sock.getOutputStream();
+ // Write Ignite header without leading byte.
+ if (msg != null) {
+ byte mode = msg instanceof Message ?
TcpDiscoveryIoSession.MESSAGE_SERIALIZATION :
TcpDiscoveryIoSession.JAVA_SERIALIZATION;
+
+ out.write(mode);
+ }
+
out.write(data);
out.flush();
@@ -1728,22 +1734,24 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/**
* Writes message to the socket.
*
- * @param sock Socket.
- * @param out Stream to write to.
+ * @param ses IO session.
* @param msg Message.
* @param timeout Timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
- protected void writeToSocket(Socket sock,
- OutputStream out,
+ protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
- long timeout) throws IOException, IgniteCheckedException {
+ long timeout
+ ) throws IOException, IgniteCheckedException {
+ Socket sock = ses.socket();
+
assert sock != null;
assert msg != null;
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
- U.marshal(marshaller(), msg, out);
+ ses.writeMessage(msg);
}
catch (IgniteCheckedException e) {
SSLException sslEx = checkSslException(sock, e);
@@ -1782,15 +1790,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/**
* Reads message from the socket limiting read time.
*
- * @param sock Socket.
- * @param in Input stream (in case socket stream was wrapped).
+ * @param ses IO session.
* @param timeout Socket timeout for this operation.
* @return Message.
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
- protected <T> T readMessage(Socket sock, @Nullable InputStream in, long
timeout) throws IOException,
+ protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout)
throws IOException,
IgniteCheckedException {
+ Socket sock = ses.socket();
+
assert sock != null;
int oldTimeout = sock.getSoTimeout();
@@ -1798,10 +1807,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
try {
sock.setSoTimeout((int)timeout);
- T res = U.unmarshal(marshaller(), in == null ?
sock.getInputStream() : in,
- U.resolveClassLoader(ignite.configuration()));
-
- return res;
+ return ses.readMessage();
}
catch (IOException | IgniteCheckedException e) {
if (X.hasCause(e, SocketTimeoutException.class))
@@ -2109,6 +2115,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this),
TcpDiscoverySpiMBean.class);
+ msgFactory = new IgniteMessageFactoryImpl(
+ new MessageFactoryProvider[] { new DiscoveryMessageFactory() });
+
impl.spiStart(igniteInstanceName);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 93d8bcf9e5e..505980bef3d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,6 +59,7 @@ public abstract class TcpDiscoveryAbstractMessage implements
Serializable {
private transient UUID sndNodeId;
/** Message ID. */
+ @Order(0)
private IgniteUuid id;
/**
@@ -135,6 +137,15 @@ public abstract class TcpDiscoveryAbstractMessage
implements Serializable {
return id;
}
+ /**
+ * Sets message ID.
+ *
+ * @param id Message ID.
+ */
+ public void id(IgniteUuid id) {
+ this.id = id;
+ }
+
/**
* Gets sender node ID.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
index e282410f40e..c65db903e0f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
@@ -18,17 +18,24 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Message telling joining node that it failed coordinator's validation check.
*/
-public class TcpDiscoveryCheckFailedMessage extends
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryCheckFailedMessage extends
TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** Coordinator version. */
- private final String err;
+ @Order(value = 1, method = "error")
+ private String err;
+
+ /** */
+ public TcpDiscoveryCheckFailedMessage() {
+ }
/**
* Constructor.
@@ -49,6 +56,18 @@ public class TcpDiscoveryCheckFailedMessage extends
TcpDiscoveryAbstractMessage
return err;
}
+ /**
+ * @param err message from coordinator.
+ */
+ public void error(String err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryCheckFailedMessage.class, this, "super",
super.toString());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index 285a70efc74..b29a8171fbc 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
@@ -45,6 +44,7 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.GridTestUtils;
@@ -363,12 +363,12 @@ public class IgniteClientRejoinTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- if (blockAll || block && sock.getPort() == 47500)
+ if (blockAll || block && ses.socket().getPort() == 47500)
throw new SocketException("Test discovery exception");
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
index 304991e46df..b69d65446ff 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
@@ -33,6 +32,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -313,14 +313,14 @@ public class IgniteDiscoveryMassiveNodeFailTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- assertNotFailedNode(sock);
+ assertNotFailedNode(ses.socket());
if (isDrop(msg))
return;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
index 75cab20d4ac..4dfa5f43da5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -40,6 +38,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -75,9 +74,8 @@ public class CacheClientsConcurrentStartTest extends
GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(gridName);
TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
@@ -92,7 +90,7 @@ public class CacheClientsConcurrentStartTest extends
GridCommonAbstractTest {
}
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
};
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
index 2c055f2dd99..fdfb3b90cd3 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -38,6 +37,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -228,13 +228,12 @@ public abstract class
IgniteCacheTopologySplitAbstractTest extends GridCommonAbs
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(),
timeout);
+
checkSegmented((InetSocketAddress)ses.socket().getRemoteSocketAddress(),
timeout);
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
index 0ba28ca3e01..aaae9d6b9e2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
@@ -18,8 +18,6 @@
package
org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
@@ -39,6 +37,7 @@ import
org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -219,9 +218,8 @@ public class IncrementalSnapshotJoiningClientTest extends
AbstractIncrementalSna
/** */
private static class ClientBlockingDiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
@@ -231,7 +229,7 @@ public class IncrementalSnapshotJoiningClientTest extends
AbstractIncrementalSna
U.awaitQuiet(unblockClientJoinReq);
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
index 8bce7675f88..ab8022dbdd2 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.communication.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -37,6 +36,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -208,15 +208,14 @@ public class IgniteTcpCommunicationConnectOnInitTest
extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
awaitLatch();
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
index 0e26ff95b76..aeba1e4d7c1 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -18,9 +18,6 @@
package org.apache.ignite.spi.communication.tcp;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Set;
import java.util.UUID;
@@ -45,10 +42,10 @@ import
org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -281,7 +278,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends
GridCommonAbstractTe
private final CountDownLatch netDisabledLatch = new CountDownLatch(1);
/** {@inheritDoc} */
- @Override protected <T> T readMessage(Socket sock, @Nullable
InputStream in,
+ @Override protected <T> T readMessage(TcpDiscoveryIoSession ses,
long timeout) throws IOException, IgniteCheckedException {
if (netDisabled) {
U.sleep(timeout);
@@ -289,11 +286,11 @@ public class TcpCommunicationSpiSkipMessageSendTest
extends GridCommonAbstractTe
throw new SocketTimeoutException("CustomDiscoverySpi: network
is disabled.");
}
else
- return super.readMessage(sock, in, timeout);
+ return super.readMessage(ses, timeout);
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (netDisabled) {
netDisabledLatch.countDown();
@@ -301,7 +298,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends
GridCommonAbstractTe
throw new SocketTimeoutException("CustomDiscoverySpi: network
is disabled.");
}
else
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
index 8b0c912eed5..3a02b54260d 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.spi.discovery;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.Collections;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
@@ -28,6 +26,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -147,7 +146,7 @@ public class LongClientConnectToClusterTest extends
GridCommonAbstractTest {
public static final int DELAY_MSG_PERIOD_MILLIS = 2_000;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
if (msg instanceof TcpDiscoveryNodeAddFinishedMessage &&
msg.topologyVersion() == 3) {
log.info("Catched discovery message: " + msg);
@@ -162,7 +161,7 @@ public class LongClientConnectToClusterTest extends
GridCommonAbstractTest {
}
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
index 65688863a84..f72eac99994 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
@@ -88,13 +87,12 @@ public class BlockTcpDiscoverySpi extends
TestTcpDiscoverySpi {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (spiCtx != null)
apply(spiCtx.localNode(), msg);
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
index a184cd42fe4..b8a82d52cc9 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
@@ -175,6 +175,7 @@ public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
oos.write(U.IGNITE_HEADER);
+ oos.write((byte)1); // Flag for java serialization.
oos.write(data);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
index 472682c1996..ca316204037 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
@@ -18,10 +18,8 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -211,9 +209,8 @@ public class IgniteClientConnectTest extends
GridCommonAbstractTest {
*/
class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
@@ -228,10 +225,10 @@ public class IgniteClientConnectTest extends
GridCommonAbstractTest {
fail("Unexpected interrupt on nodeAddFinishedDelay");
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
else
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 2708b606007..7eef5a2739b 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
@@ -42,7 +40,6 @@ import
org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -475,8 +472,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
}
/** */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (writeToSocketDelay > 0) {
@@ -490,8 +486,8 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
}
}
- if (sock.getSoTimeout() >= writeToSocketDelay)
- super.writeToSocket(sock, out, msg, timeout);
+ if (ses.socket().getSoTimeout() >= writeToSocketDelay)
+ super.writeMessage(ses, msg, timeout);
else
throw new SocketTimeoutException("Write to socket delay
timeout exception.");
}
@@ -521,14 +517,14 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
}
/** {@inheritDoc} */
- @Override protected <T> T readMessage(Socket sock, @Nullable
InputStream in, long timeout)
+ @Override protected <T> T readMessage(TcpDiscoveryIoSession ses, long
timeout)
throws IOException, IgniteCheckedException {
long currTimeout = getLocalNode().isClient() ?
clientFailureDetectionTimeout() : failureDetectionTimeout();
if (readDelay < currTimeout) {
try {
- return super.readMessage(sock, in, timeout);
+ return super.readMessage(ses, timeout);
}
catch (Exception e) {
err = e;
@@ -537,7 +533,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
}
}
else {
- T msg = super.readMessage(sock, in, timeout);
+ T msg = super.readMessage(ses, timeout);
if (msg instanceof TcpDiscoveryPingRequest) {
try {
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index f3d45f9f097..6ca4782c65c 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
@@ -2573,19 +2572,18 @@ public class TcpClientDiscoverySpiSelfTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
- if (!onMessage(sock, msg))
+ if (!onMessage(ses.socket(), msg))
return;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
if (afterWrite != null)
- afterWrite.apply(msg, sock);
+ afterWrite.apply(msg, ses.socket());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
index 10c6aa14242..f6b70244744 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
@@ -302,9 +301,8 @@ public class TcpDiscoveryCoordinatorFailureTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout
) throws IOException, IgniteCheckedException {
@@ -314,7 +312,7 @@ public class TcpDiscoveryCoordinatorFailureTest extends
GridCommonAbstractTest {
msg = new TcpDiscoveryConnectionCheckMessage(locNode);
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
index 481eb24a0fb..e1e647c01e6 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -199,10 +198,10 @@ public class TcpDiscoveryFailedJoinTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- if (sock.getPort() != FAIL_PORT)
- super.writeToSocket(sock, out, msg, timeout);
+ if (ses.socket().getPort() != FAIL_PORT)
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index 7162083af9c..3b2c9258044 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -538,9 +537,9 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
Object spis = GridTestUtils.getFieldValue(disco,
GridManagerAdapter.class, "spis");
- OutputStream out = GridTestUtils.getFieldValue(((Object[])spis)[0],
"impl", "msgWorker", "out");
+ TcpDiscoveryIoSession ses =
GridTestUtils.getFieldValue(((Object[])spis)[0], "impl", "msgWorker", "ses");
- out.close();
+ ses.socket().getOutputStream().close();
}
/**
@@ -627,21 +626,21 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
BiConsumer<Socket, TcpDiscoveryHandshakeRequest> hsRqLsnr;
BiConsumer<Socket, TcpDiscoveryHandshakeResponse> hsRespLsnr;
if (msg instanceof TcpDiscoveryHandshakeRequest && (hsRqLsnr =
this.hsRqLsnr.get()) != null)
- hsRqLsnr.accept(sock, (TcpDiscoveryHandshakeRequest)msg);
+ hsRqLsnr.accept(ses.socket(),
(TcpDiscoveryHandshakeRequest)msg);
if (msg instanceof TcpDiscoveryHandshakeResponse && (hsRespLsnr =
this.hsRespLsnr.get()) != null)
- hsRespLsnr.accept(sock, (TcpDiscoveryHandshakeResponse)msg);
+ hsRespLsnr.accept(ses.socket(),
(TcpDiscoveryHandshakeResponse)msg);
- if (dropMsg(sock))
+ if (dropMsg(ses.socket()))
return;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
index 0bdf0a0558a..f449be13c8e 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
@@ -167,8 +167,8 @@ public class TcpDiscoveryNodeJoinAndFailureTest extends
GridCommonAbstractTest {
if (nodeId.equals(node2Id)) {
Object workerObj = GridTestUtils.getFieldValue(impl,
"msgWorker");
-
- OutputStream out =
GridTestUtils.getFieldValue(workerObj, "out");
+ TcpDiscoveryIoSession ses =
GridTestUtils.getFieldValue(workerObj, "ses");
+ OutputStream out = GridTestUtils.getFieldValue(ses,
"out");
try {
out.close();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index ddb52738db5..15cb1f90382 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.Socket;
import java.util.Set;
import org.apache.ignite.Ignite;
@@ -265,10 +264,10 @@ public class TcpDiscoveryPendingMessageDeliveryTest
extends GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (!blockMsgs)
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index c01fc8a5cbc..3195284ba92 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -19,9 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -2448,9 +2446,8 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(
+ TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
// Test relies on an error in this thread only.
@@ -2459,14 +2456,14 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
if (startTest && !(msg instanceof
TcpDiscoveryConnectionCheckMessage) && ringMsgWorkerThread) {
int errPort = errPortSupplier.get();
- if (sock.getPort() == errPort) {
+ if (ses.socket().getPort() == errPort) {
log.info("Fail write on message send [port=" + errPort +
", msg=" + msg + ']');
throw new SocketTimeoutException();
}
else if (locNode.discoveryPort() == errPort) {
if (sleepEndTime == 0) {
- errNextPort = sock.getPort();
+ errNextPort = ses.socket().getPort();
sleepEndTime = System.currentTimeMillis() + 3000;
}
@@ -2487,8 +2484,8 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
log.info("Stop sleep on message send: " + msg);
- if (sock.getPort() == errNextPort) {
- log.info("Fail write after sleep [port=" +
sock.getPort() + ", msg=" + msg + ']');
+ if (ses.socket().getPort() == errNextPort) {
+ log.info("Fail write after sleep [port=" +
ses.socket().getPort() + ", msg=" + msg + ']');
throw new SocketTimeoutException();
}
@@ -2496,7 +2493,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2514,7 +2511,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
static volatile boolean checkClientNodeAddFinished;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -2536,7 +2533,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/**
@@ -2580,7 +2577,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
private volatile boolean failed;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
boolean add = msgIds.add(msg.id());
@@ -2591,7 +2588,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
failed = true;
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2603,8 +2600,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
private volatile boolean stopBeforeSndAck;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (stopBeforeSndAck) {
@@ -2616,7 +2612,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
if (custMsg instanceof
StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " +
msg);
- sock.close();
+ ses.socket().close();
GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws
Exception {
@@ -2635,7 +2631,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2666,8 +2662,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
- OutputStream out,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (stop)
@@ -2678,7 +2673,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
log.info("IO error on message send [locNode=" + locNode + ",
msg=" + msg + ']');
- sock.close();
+ ses.socket().close();
throw new SocketTimeoutException();
}
@@ -2688,7 +2683,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
failMsg.compareAndSet(false, true)) {
log.info("IO error on message send [locNode=" + locNode + ",
msg=" + msg + ']');
- sock.close();
+ ses.socket().close();
throw new SocketTimeoutException();
}
@@ -2700,7 +2695,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
log.info("Skip messages send and stop node [locNode=" +
locNode + ", msg=" + msg + ']');
- sock.close();
+ ses.socket().close();
GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -2713,7 +2708,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
return;
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2728,7 +2723,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
private boolean stop;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryCustomEventMessage && latch !=
null) {
log.info("Stop node on custom event: " + msg);
@@ -2741,7 +2736,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
if (stop)
return;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2762,7 +2757,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
private boolean debug;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
if (nodeAdded1 != null) {
@@ -2790,7 +2785,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
log.info("--- Send custom event: " + msg);
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2818,7 +2813,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (stop) {
@@ -2835,7 +2830,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
}
@@ -2847,12 +2842,12 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
private volatile boolean stop;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (stop)
throw new RuntimeException("Failing ring message worker
explicitly");
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
if (msg instanceof TcpDiscoveryNodeAddedMessage)
stop = true;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index 2368cb6d19b..55f83d72fc6 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
@@ -303,13 +302,13 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest
extends AbstractDiscoverySelf
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg, long timeout)
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
if (!(msg instanceof TcpDiscoveryPingRequest)) {
if (cntConnCheckMsg && msg instanceof
TcpDiscoveryConnectionCheckMessage)
connCheckStatusMsgCntSent++;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
return;
}
@@ -329,7 +328,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends
AbstractDiscoverySelf
}
}
else
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
index e06733a3810..5a0726e7a60 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -400,13 +399,13 @@ public class TcpDiscoverySpiReconnectDelayTest extends
GridCommonAbstractTest {
private final AtomicInteger failReconReq = new AtomicInteger();
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- if (!onMessage(sock, msg))
+ if (!onMessage(ses.socket(), msg))
return;
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
index 4c876d14b0a..dcf9931ef92 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
@@ -18,9 +18,7 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.InputStream;
import java.io.StreamCorruptedException;
-import java.net.Socket;
import java.util.concurrent.Callable;
import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteCheckedException;
@@ -28,7 +26,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -177,7 +174,7 @@ public class TcpDiscoverySslSecuredUnsecuredTest extends
GridCommonAbstractTest
}
/** {@inheritDoc} */
- @Override protected <T> T readMessage(final Socket sock, @Nullable
final InputStream in,
+ @Override protected <T> T readMessage(final TcpDiscoveryIoSession ses,
final long timeout) throws IOException, IgniteCheckedException {
if (cnt-- > 0) {
if (plain)
@@ -186,7 +183,7 @@ public class TcpDiscoverySslSecuredUnsecuredTest extends
GridCommonAbstractTest
throw new SSLException("Test SSL exception");
}
- return super.readMessage(sock, in, timeout);
+ return super.readMessage(ses, timeout);
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 669b3112fd8..e3038bf5dc6 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -18,8 +18,6 @@
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
@@ -48,7 +46,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
private IgniteDiscoverySpiInternalListener internalLsnr;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ @Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
return;
@@ -61,7 +59,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
internalLsnr.beforeReconnect(locNode, log);
}
- super.writeToSocket(sock, out, msg, timeout);
+ super.writeMessage(ses, msg, timeout);
}
/** {@inheritDoc} */