IGNITE-2951 - Stability fixes for cluster with many clients
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d92e617a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d92e617a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d92e617a Branch: refs/heads/ignite-testing-discovery Commit: d92e617a71c31ccaa3fd6e4954aa4ccaf494733c Parents: e7ab8ee Author: Valentin Kulichenko <[email protected]> Authored: Wed Apr 6 18:10:45 2016 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Wed Apr 6 18:10:45 2016 -0700 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 11 ++--- .../GridDhtPartitionsExchangeFuture.java | 4 +- .../continuous/GridContinuousProcessor.java | 13 +++++- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 44 +++++++++----------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 23 ++++------ .../IgniteClientReconnectAbstractTest.java | 7 ++-- .../tcp/TcpClientDiscoverySpiSelfTest.java | 8 ++-- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 39 +++++++++-------- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 23 ++++------ .../spi/discovery/tcp/TestTcpDiscoverySpi.java | 5 ++- 11 files changed, 87 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 7e13f26..6181d9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -859,7 +859,7 @@ public class GridDhtPartitionDemander { log.debug("Rebalancing is not required [cache=" + cctx.name() + ", topology=" + topVer + "]"); - checkIsDone(cancelled); + checkIsDone(cancelled, true); } } @@ -883,7 +883,7 @@ public class GridDhtPartitionDemander { remaining.clear(); - checkIsDone(true /* cancelled */); + checkIsDone(true /* cancelled */, false); } return true; @@ -1014,13 +1014,13 @@ public class GridDhtPartitionDemander { * */ private void checkIsDone() { - checkIsDone(false); + checkIsDone(false, false); } /** * @param cancelled Is cancelled. */ - private void checkIsDone(boolean cancelled) { + private void checkIsDone(boolean cancelled, boolean wasEmpty) { if (remaining.isEmpty()) { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sndStoppedEvnt)) preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); @@ -1028,7 +1028,8 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - cctx.shared().exchange().scheduleResendPartitions(); + if (!wasEmpty) + cctx.shared().exchange().scheduleResendPartitions(); Collection<Integer> m = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 22fb59e..be346b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -929,7 +929,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion() + ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: "); - cctx.exchange().dumpPendingObjects(); + cctx.exchange().dumpDebugInfo(); } /** @@ -993,7 +993,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 99e0bb5..c6503e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -394,7 +394,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); + Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size()); + + for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) { + Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size()); + + for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet()) + copy.put(e0.getKey(), e0.getValue()); + + clientInfos0.put(e.getKey(), copy); + } + + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); // Collect listeners information (will be sent to joining node during discovery process). for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 65b94ca..834922c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1062,7 +1062,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { if (ack) { synchronized (mux) { - assert unackedMsg == null : unackedMsg; + assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg; unackedMsg = msg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f0de546..5c7ffbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -18,9 +18,11 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectStreamException; +import java.io.OutputStream; import java.io.Serializable; import java.net.ConnectException; import java.net.InetAddress; @@ -73,7 +75,6 @@ import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -2120,6 +2121,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** Output stream. */ + private OutputStream out; + /** Last time status message has been sent. */ private long lastTimeStatusMsgSent; @@ -2456,10 +2460,12 @@ class ServerImpl extends TcpDiscoveryImpl { sock = spi.openSocket(addr, timeoutHelper); + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); + openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, @@ -2613,7 +2619,7 @@ class ServerImpl extends TcpDiscoveryImpl { timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); try { - writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); } finally { @@ -2665,7 +2671,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -3956,7 +3962,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); if (log.isDebugEnabled()) @@ -5569,6 +5575,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private final Socket sock; + /** Output stream. */ + private final OutputStream out; + /** Current client metrics. */ private volatile ClusterMetrics metrics; @@ -5582,11 +5591,13 @@ class ServerImpl extends TcpDiscoveryImpl { * @param sock Socket. * @param clientNodeId Node ID. */ - protected ClientMessageWorker(Socket sock, UUID clientNodeId) { + protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException { super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; + + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); } /** @@ -5633,7 +5644,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5644,7 +5655,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert topologyInitialized(msg) : msg; - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5751,9 +5762,6 @@ class ServerImpl extends TcpDiscoveryImpl { * Base class for message workers. */ protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - /** Message queue. */ private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); @@ -5835,20 +5843,6 @@ class ServerImpl extends TcpDiscoveryImpl { protected void noMessageLoop() { // No-op. } - - /** - * @param sock Socket. - * @param msg Message. - * @param timeout Socket timeout. - * @throws IOException If IO failed. - * @throws IgniteCheckedException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { - bout.reset(); - - spi.writeToSocket(sock, msg, bout, timeout); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 277055a..abe380c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp; +import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -51,7 +52,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -1346,45 +1346,38 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T */ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. + writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()), msg, timeout); } /** * Writes message to the socket. * * @param sock Socket. + * @param out Stream to write to. * @param msg Message. - * @param bout Byte array output stream. * @param timeout Timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("ThrowFromFinallyBlock") protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); + assert out != null; SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); - IOException err = null; + IgniteCheckedException err = null; try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); + marsh.marshal(msg, out); } - catch (IOException e) { + catch (IgniteCheckedException e) { err = e; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 0c005e9..96ca0dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import java.util.Collection; import java.util.Collections; @@ -384,7 +385,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra volatile CountDownLatch writeLatch; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryJoinRequestMessage) { CountDownLatch writeLatch0 = writeLatch; @@ -396,7 +397,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } } - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -464,4 +465,4 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra log.error(s); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7debb41..e01094c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.CIX2; @@ -2158,8 +2158,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -2184,7 +2184,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0df7da6..b45fe12 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -1849,9 +1849,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean failed; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { boolean add = msgIds.add(msg.id()); @@ -1861,7 +1860,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { failed = true; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1874,8 +1873,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { @@ -1905,7 +1904,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1937,8 +1936,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stop) return; @@ -1983,7 +1982,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1998,8 +1997,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { log.info("Stop node on custom event: " + msg); @@ -2011,7 +2010,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (stop) return; - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2032,8 +2031,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private boolean debug; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { if (nodeAdded1 != null) { nodeAdded1.countDown(); @@ -2060,7 +2059,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (debug && msg instanceof TcpDiscoveryCustomEventMessage) log.info("--- Send custom event: " + msg); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2072,13 +2071,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2090,12 +2089,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (msg instanceof TcpDiscoveryNodeAddedMessage) stop = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 4cf9bd0..4ef984f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -18,12 +18,12 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -348,10 +348,14 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (!(msg instanceof TcpDiscoveryPingRequest)) { - super.writeToSocket(sock, msg, timeout); + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, out, msg, timeout); + return; } @@ -370,16 +374,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } } else - super.writeToSocket(sock, msg, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntSent++; - - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */ @@ -405,4 +400,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf countConnCheckMsg = false; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index dbc54bc..721192f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -31,12 +32,12 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi { public boolean ignorePingResponse; /** {@inheritDoc} */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) return; else - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */
