IGNITE-2951 - Stability fixes for cluster with many clients
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e266153 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e266153 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e266153 Branch: refs/heads/ignite-1786 Commit: 5e266153707021a8866e91dfa3f958066f80fc99 Parents: da47901 Author: Valentin Kulichenko <[email protected]> Authored: Wed Apr 6 18:10:45 2016 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Wed Apr 6 23:25:03 2016 -0700 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../continuous/GridContinuousProcessor.java | 13 +++++- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 44 +++++++++----------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 23 ++++------ .../IgniteClientReconnectAbstractTest.java | 7 ++-- .../tcp/TcpClientDiscoverySpiSelfTest.java | 8 ++-- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 39 +++++++++-------- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 23 ++++------ .../spi/discovery/tcp/TestTcpDiscoverySpi.java | 5 ++- 10 files changed, 80 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index bbfc71a..82e9bda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -931,7 +931,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index abafe85..d7838f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -393,7 +393,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); + Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size()); + + for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) { + Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size()); + + for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet()) + copy.put(e0.getKey(), e0.getValue()); + + clientInfos0.put(e.getKey(), copy); + } + + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); // Collect listeners information (will be sent to joining node during discovery process). for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 950c680..31d614f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1062,7 +1062,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { if (ack) { synchronized (mux) { - assert unackedMsg == null : unackedMsg; + assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg; unackedMsg = msg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 88e34e8..27a31c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -18,9 +18,11 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectStreamException; +import java.io.OutputStream; import java.io.Serializable; import java.net.ConnectException; import java.net.InetAddress; @@ -74,7 +76,6 @@ import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -2134,6 +2135,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** Output stream. */ + private OutputStream out; + /** Last time status message has been sent. */ private long lastTimeStatusMsgSent; @@ -2470,10 +2474,12 @@ class ServerImpl extends TcpDiscoveryImpl { sock = spi.openSocket(addr, timeoutHelper); + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); + openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, @@ -2627,7 +2633,7 @@ class ServerImpl extends TcpDiscoveryImpl { timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); try { - writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); } finally { @@ -2679,7 +2685,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -3999,7 +4005,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); if (log.isDebugEnabled()) @@ -5617,6 +5623,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private final Socket sock; + /** Output stream. */ + private final OutputStream out; + /** Current client metrics. */ private volatile ClusterMetrics metrics; @@ -5630,11 +5639,13 @@ class ServerImpl extends TcpDiscoveryImpl { * @param sock Socket. * @param clientNodeId Node ID. */ - protected ClientMessageWorker(Socket sock, UUID clientNodeId) { + protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException { super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; + + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); } /** @@ -5681,7 +5692,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5692,7 +5703,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert topologyInitialized(msg) : msg; - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5799,9 +5810,6 @@ class ServerImpl extends TcpDiscoveryImpl { * Base class for message workers. */ protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - /** Message queue. */ private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); @@ -5883,20 +5891,6 @@ class ServerImpl extends TcpDiscoveryImpl { protected void noMessageLoop() { // No-op. } - - /** - * @param sock Socket. - * @param msg Message. - * @param timeout Socket timeout. - * @throws IOException If IO failed. - * @throws IgniteCheckedException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { - bout.reset(); - - spi.writeToSocket(sock, msg, bout, timeout); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index df152f8..d981609 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp; +import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -51,7 +52,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -1346,45 +1346,38 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T */ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. + writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()), msg, timeout); } /** * Writes message to the socket. * * @param sock Socket. + * @param out Stream to write to. * @param msg Message. - * @param bout Byte array output stream. * @param timeout Timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("ThrowFromFinallyBlock") protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); + assert out != null; SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); - IOException err = null; + IgniteCheckedException err = null; try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); + marsh.marshal(msg, out); } - catch (IOException e) { + catch (IgniteCheckedException e) { err = e; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 6869d1c..4d49366 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import java.util.Collection; import java.util.Collections; @@ -384,7 +385,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra volatile CountDownLatch writeLatch; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryJoinRequestMessage) { CountDownLatch writeLatch0 = writeLatch; @@ -396,7 +397,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } } - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -464,4 +465,4 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra log.error(s); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7debb41..e01094c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.CIX2; @@ -2158,8 +2158,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -2184,7 +2184,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 7635f0b..7efaca0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -1852,9 +1852,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean failed; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { boolean add = msgIds.add(msg.id()); @@ -1864,7 +1863,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { failed = true; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1877,8 +1876,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { @@ -1908,7 +1907,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1940,8 +1939,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stop) return; @@ -1986,7 +1985,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2001,8 +2000,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { log.info("Stop node on custom event: " + msg); @@ -2014,7 +2013,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (stop) return; - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2035,8 +2034,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private boolean debug; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { if (nodeAdded1 != null) { nodeAdded1.countDown(); @@ -2063,7 +2062,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (debug && msg instanceof TcpDiscoveryCustomEventMessage) log.info("--- Send custom event: " + msg); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2075,13 +2074,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2093,12 +2092,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (msg instanceof TcpDiscoveryNodeAddedMessage) stop = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 4cf9bd0..4ef984f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -18,12 +18,12 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -348,10 +348,14 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (!(msg instanceof TcpDiscoveryPingRequest)) { - super.writeToSocket(sock, msg, timeout); + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, out, msg, timeout); + return; } @@ -370,16 +374,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } } else - super.writeToSocket(sock, msg, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntSent++; - - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */ @@ -405,4 +400,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf countConnCheckMsg = false; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index dbc54bc..721192f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -31,12 +32,12 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi { public boolean ignorePingResponse; /** {@inheritDoc} */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) return; else - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */
