Removed buffer from message worker
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19b4af10 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19b4af10 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19b4af10 Branch: refs/heads/gridgain-7.5.11-vk Commit: 19b4af10386779efeb57be68f10484937201c58d Parents: 2397552 Author: Valentin Kulichenko <[email protected]> Authored: Tue Apr 5 13:16:08 2016 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Tue Apr 5 13:16:08 2016 -0700 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 8 +----- .../spi/discovery/tcp/TcpDiscoverySpi.java | 27 +------------------- .../tcp/TcpClientDiscoverySpiSelfTest.java | 5 ++-- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 26 ++++++++----------- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 13 ++-------- 5 files changed, 17 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f0de546..356f4fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -73,7 +73,6 @@ import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -5751,9 +5750,6 @@ class ServerImpl extends TcpDiscoveryImpl { * Base class for message workers. */ protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - /** Message queue. */ private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); @@ -5845,9 +5841,7 @@ class ServerImpl extends TcpDiscoveryImpl { */ protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - bout.reset(); - - spi.writeToSocket(sock, msg, bout, timeout); + spi.writeToSocket(sock, msg, timeout); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 277055a..d0d8be2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -51,7 +51,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -1340,21 +1339,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * * @param sock Socket. * @param msg Message. - * @param timeout Socket write timeout. - * @throws IOException If IO failed or write timed out. - * @throws IgniteCheckedException If marshalling failed. - */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, - IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @param bout Byte array output stream. * @param timeout Timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. @@ -1362,14 +1346,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @SuppressWarnings("ThrowFromFinallyBlock") protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); @@ -1378,11 +1357,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T IOException err = null; try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); + marsh.marshal(msg, sock.getOutputStream()); } catch (IOException e) { err = e; http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7debb41..a57d531 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.CIX2; @@ -2159,7 +2158,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -2184,7 +2183,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0df7da6..d3afdbf 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -54,7 +54,6 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -1851,7 +1850,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { boolean add = msgIds.add(msg.id()); @@ -1861,7 +1859,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { failed = true; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -1875,7 +1873,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { @@ -1905,7 +1902,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -1938,7 +1935,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stop) return; @@ -1983,7 +1979,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -1999,7 +1995,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { log.info("Stop node on custom event: " + msg); @@ -2011,7 +2007,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (stop) return; - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -2033,7 +2029,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { if (nodeAdded1 != null) { nodeAdded1.countDown(); @@ -2060,7 +2056,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (debug && msg instanceof TcpDiscoveryCustomEventMessage) log.info("--- Send custom event: " + msg); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -2073,12 +2069,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); } } @@ -2091,11 +2087,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, msg, timeout); if (msg instanceof TcpDiscoveryNodeAddedMessage) stop = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 4cf9bd0..513c5a3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -23,7 +23,6 @@ import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -352,6 +351,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf throws IOException, IgniteCheckedException { if (!(msg instanceof TcpDiscoveryPingRequest)) { super.writeToSocket(sock, msg, timeout); + return; } @@ -374,15 +374,6 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntSent++; - - super.writeToSocket(sock, msg, bout, timeout); - } - - /** {@inheritDoc} */ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) throws IOException { if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) @@ -405,4 +396,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf countConnCheckMsg = false; } } -} \ No newline at end of file +}
