Repository: ignite Updated Branches: refs/heads/master fc0dd9a48 -> f0daea95d
IGNITE-3060 Discovery: optimize resource usage for client connections (cherry picked from commits 7b0edfb, 8324233) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0daea95 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0daea95 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0daea95 Branch: refs/heads/master Commit: f0daea95dfbb1d6aca3a722773001c5ec0e0f5d6 Parents: fc0dd9a Author: sboikov <[email protected]> Authored: Fri Apr 29 09:57:10 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed May 4 09:12:24 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteComponentType.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 9 + .../ignite/spi/discovery/tcp/ServerImpl.java | 214 ++++++++++++------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +- .../TcpDiscoveryClientHeartbeatMessage.java | 1 + .../TcpDiscoveryClientReconnectMessage.java | 16 ++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 44 +++- 7 files changed, 202 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 01872b6..76e495f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -285,7 +285,7 @@ public enum IgniteComponentType { return (T)ctor.newInstance(ctx); } } - catch (Exception e) { + catch (Throwable e) { throw componentException(e); } } @@ -309,7 +309,7 @@ public enum IgniteComponentType { * @param err Creation error. * @return Component creation exception. */ - private IgniteCheckedException componentException(Exception err) { + private IgniteCheckedException componentException(Throwable err) { return new IgniteCheckedException("Failed to create Ignite component (consider adding " + module + " module to classpath) [component=" + this + ", cls=" + clsName + ']', err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 9ae2972..3b77d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -638,6 +638,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collection<CacheContinuousQueryEntry> entries) { final GridCacheContext cctx = cacheContext(ctx); + if (cctx == null) { + IgniteLogger log = ctx.log(CacheContinuousQueryHandler.class); + + if (log.isDebugEnabled()) + log.debug("Failed to notify callback, cache is not found: " + cacheId); + + return; + } + final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size()); for (CacheContinuousQueryEntry e : entries) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 05bb1e6..84400ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -162,7 +162,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe */ class ServerImpl extends TcpDiscoveryImpl { /** */ - private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10); + private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024); /** */ private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE = @@ -1412,6 +1412,8 @@ class ServerImpl extends TcpDiscoveryImpl { ", topSize=" + ring.allNodes().size() + ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + + ", clients=" + ring.clientNodes().size() + + ", clientWorkers=" + clientMsgWorkers.size() + ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); @@ -2114,7 +2116,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Message worker thread for messages processing. */ - private class RingMessageWorker extends MessageWorkerAdapter { + private class RingMessageWorker extends MessageWorkerAdapter<TcpDiscoveryAbstractMessage> { /** Next node. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private TcpDiscoveryNode next; @@ -2172,6 +2174,32 @@ class ServerImpl extends TcpDiscoveryImpl { initConnectionCheckFrequency(); } + /** + * Adds message to queue. + * + * @param msg Message to add. + */ + void addMessage(TcpDiscoveryAbstractMessage msg) { + if ((msg instanceof TcpDiscoveryStatusCheckMessage || + msg instanceof TcpDiscoveryJoinRequestMessage || + msg instanceof TcpDiscoveryCustomEventMessage || + msg instanceof TcpDiscoveryClientReconnectMessage) && + queue.contains(msg)) { + if (log.isDebugEnabled()) + log.debug("Ignoring duplicate message: " + msg); + + return; + } + + if (msg.highPriority()) + queue.addFirst(msg); + else + queue.add(msg); + + if (log.isDebugEnabled()) + log.debug("Message has been added to queue: " + msg); + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { @@ -2277,9 +2305,6 @@ class ServerImpl extends TcpDiscoveryImpl { else if (msg instanceof TcpDiscoveryNodeFailedMessage) processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); - else if (msg instanceof TcpDiscoveryClientHeartbeatMessage) - processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg); - else if (msg instanceof TcpDiscoveryHeartbeatMessage) processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); @@ -2333,28 +2358,44 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (redirectToClients(msg)) { - byte[] marshalledMsg = null; + byte[] msgBytes = null; for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { - // Send a clone to client to avoid ConcurrentModificationException - TcpDiscoveryAbstractMessage msgClone; - - try { - if (marshalledMsg == null) - marshalledMsg = spi.marsh.marshal(msg); + if (msgBytes == null) { + try { + msgBytes = spi.marsh.marshal(msg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message: " + msg, e); - msgClone = spi.marsh.unmarshal(marshalledMsg, - U.resolveClassLoader(spi.ignite().configuration())); + break; + } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message: " + msg, e); - msgClone = msg; - } + TcpDiscoveryAbstractMessage msg0 = msg; + byte[] msgBytes0 = msgBytes; - prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + TcpDiscoveryNode node = nodeAddedMsg.node(); + + if (clientMsgWorker.clientNodeId.equals(node.id())) { + try { + msg0 = spi.marsh.unmarshal(msgBytes, + U.resolveClassLoader(spi.ignite().configuration())); + + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null); - clientMsgWorker.addMessage(msgClone); + msgBytes0 = null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to create message copy: " + msg, e); + } + } + } + + clientMsgWorker.addMessage(msg0, msgBytes0); } } } @@ -4528,22 +4569,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Processes client heartbeat message. - * - * @param msg Heartbeat message. - */ - private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) { - assert msg.client(); - - ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); - - if (wrk != null) - wrk.metrics(msg.metrics()); - else if (log.isDebugEnabled()) - log.debug("Received heartbeat message from unknown client node: " + msg); - } - - /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. @@ -5448,7 +5473,12 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - msgWorker.addMessage(msg); + TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null; + + if (msg instanceof TcpDiscoveryClientHeartbeatMessage) + heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg; + else + msgWorker.addMessage(msg); // Send receipt back. if (clientMsgWrk != null) { @@ -5460,6 +5490,9 @@ class ServerImpl extends TcpDiscoveryImpl { } else spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + + if (heartbeatMsg != null) + processClientHeartbeatMessage(heartbeatMsg); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -5527,6 +5560,22 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Processes client heartbeat message. + * + * @param msg Heartbeat message. + */ + private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) { + assert msg.client(); + + ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); + + if (wrk != null) + wrk.metrics(msg.metrics()); + else if (log.isDebugEnabled()) + log.debug("Received heartbeat message from unknown client node: " + msg); + } + + /** * @param msg Join request message. * @param clientMsgWrk Client message worker to start. * @return Whether connection was successful. @@ -5653,16 +5702,13 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ - private class ClientMessageWorker extends MessageWorkerAdapter { + private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage, byte[]>> { /** Node ID. */ private final UUID clientNodeId; /** Socket. */ private final Socket sock; - /** Output stream. */ - private final OutputStream out; - /** Current client metrics. */ private volatile ClusterMetrics metrics; @@ -5681,8 +5727,6 @@ class ServerImpl extends TcpDiscoveryImpl { this.sock = sock; this.clientNodeId = clientNodeId; - - out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); } /** @@ -5706,11 +5750,43 @@ class ServerImpl extends TcpDiscoveryImpl { this.metrics = metrics; } + /** + * @param msg Message. + */ + public void addMessage(TcpDiscoveryAbstractMessage msg) { + addMessage(msg, null); + } + + /** + * @param msg Message. + * @param msgBytes Optional message bytes. + */ + public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { + T2 t = new T2<>(msg, msgBytes); + + if (msg.highPriority()) + queue.addFirst(t); + else + queue.add(t); + + if (log.isDebugEnabled()) + log.debug("Message has been added to client queue: " + msg); + } + /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) { + boolean success = false; + + TcpDiscoveryAbstractMessage msg = msgT.get1(); + try { assert msg.verified() : msg; + byte[] msgBytes = msgT.get2(); + + if (msgBytes == null) + msgBytes = spi.marsh.marshal(msg); + if (msg instanceof TcpDiscoveryClientAckResponse) { if (clientVer == null) { ClusterNode node = spi.getNode(clientNodeId); @@ -5729,7 +5805,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5740,9 +5816,11 @@ class ServerImpl extends TcpDiscoveryImpl { assert topologyInitialized(msg) : msg; - spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } + + success = true; } catch (IgniteCheckedException | IOException e) { if (log.isDebugEnabled()) @@ -5751,12 +5829,15 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Client connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e); + } + finally { + if (!success) { + clientMsgWorkers.remove(clientNodeId, this); - clientMsgWorkers.remove(clientNodeId, this); - - U.interrupt(this); + U.interrupt(this); - U.closeQuiet(sock); + U.closeQuiet(sock); + } } } @@ -5846,9 +5927,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Base class for message workers. */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { + protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread { /** Message queue. */ - private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); + protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>(); /** Backed interrupted flag. */ private volatile boolean interrupted; @@ -5874,7 +5955,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); + T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); if (msg == null) noMessageLoop(); @@ -5903,34 +5984,9 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - if ((msg instanceof TcpDiscoveryStatusCheckMessage || - msg instanceof TcpDiscoveryJoinRequestMessage || - msg instanceof TcpDiscoveryCustomEventMessage) && - queue.contains(msg)) { - if (log.isDebugEnabled()) - log.debug("Ignoring duplicate message: " + msg); - - return; - } - - if (msg.highPriority()) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - /** * @param msg Message. */ - protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); + protected abstract void processMessage(T msg); /** * Called when there is no message to process giving ability to perform other activity. http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 4351c64..c135b83 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1267,7 +1267,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; } @@ -1297,12 +1297,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * Writes message to the socket. * * @param sock Socket. + * @param msg Message. * @param data Raw data to write. * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException { assert sock != null; assert data != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java index 37b578c..3993de0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java @@ -38,6 +38,7 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess * Constructor. * * @param creatorNodeId Creator node. + * @param metrics Metrics. */ public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 7c0cd5d..7cce78b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Collection; import java.util.UUID; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -96,6 +97,21 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess } /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + // NOTE! + // Do not call super. As IDs will differ, but we can ignore this. + + if (!(obj instanceof TcpDiscoveryClientReconnectMessage)) + return false; + + TcpDiscoveryClientReconnectMessage other = (TcpDiscoveryClientReconnectMessage)obj; + + return F.eq(creatorNodeId(), other.creatorNodeId()) && + F.eq(routerNodeId, other.routerNodeId) && + F.eq(lastMsgId, other.lastMsgId); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryClientReconnectMessage.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index e01094c..331b581 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -1736,8 +1736,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final AtomicBoolean err = new AtomicBoolean(false); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { log.info("Disconnected event."); @@ -2158,17 +2157,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeToSocket(Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); + if (!onMessage(sock, msg)) + return; + + super.writeToSocket(sock, out, msg, timeout); + + if (afterWrite != null) + afterWrite.apply(msg, sock); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] msgBytes, + long timeout) throws IOException { + waitFor(writeLock); + + if (!onMessage(sock, msg)) + return; + + super.writeToSocket(sock, msg, msgBytes, timeout); + + if (afterWrite != null) + afterWrite.apply(msg, sock); + } + + /** + * @param sock Socket. + * @param msg Message. + * @return {@code False} if should not further process message. + * @throws IOException If failed. + */ + private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException { boolean fail = false; if (skipNodeAdded && (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) { log.info("Skip message: " + msg); - return; + return false; } if (msg instanceof TcpDiscoveryNodeAddedMessage) @@ -2184,10 +2215,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, out, msg, timeout); - - if (afterWrite != null) - afterWrite.apply(msg, sock); + return true; } /** {@inheritDoc} */
