Removed buffer from message worker

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19b4af10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19b4af10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19b4af10

Branch: refs/heads/gridgain-7.5.11-vk
Commit: 19b4af10386779efeb57be68f10484937201c58d
Parents: 2397552
Author: Valentin Kulichenko <[email protected]>
Authored: Tue Apr 5 13:16:08 2016 -0700
Committer: Valentin Kulichenko <[email protected]>
Committed: Tue Apr 5 13:16:08 2016 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  8 +-----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 27 +-------------------
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  5 ++--
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 26 ++++++++-----------
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  | 13 ++--------
 5 files changed, 17 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f0de546..356f4fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -73,7 +73,6 @@ import 
org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -5751,9 +5750,6 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Base class for message workers.
      */
     protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
-        /** Pre-allocated output stream (100K). */
-        private final GridByteArrayOutputStream bout = new 
GridByteArrayOutputStream(100 * 1024);
-
         /** Message queue. */
         private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new 
LinkedBlockingDeque<>();
 
@@ -5845,9 +5841,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         protected final void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
-            bout.reset();
-
-            spi.writeToSocket(sock, msg, bout, timeout);
+            spi.writeToSocket(sock, msg, timeout);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 277055a..d0d8be2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -51,7 +51,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -1340,21 +1339,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
      *
      * @param sock Socket.
      * @param msg Message.
-     * @param timeout Socket write timeout.
-     * @throws IOException If IO failed or write timed out.
-     * @throws IgniteCheckedException If marshalling failed.
-     */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, 
long timeout) throws IOException,
-        IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), 
timeout); // 8K.
-    }
-
-    /**
-     * Writes message to the socket.
-     *
-     * @param sock Socket.
-     * @param msg Message.
-     * @param bout Byte array output stream.
      * @param timeout Timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
@@ -1362,14 +1346,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
     @SuppressWarnings("ThrowFromFinallyBlock")
     protected void writeToSocket(Socket sock,
         TcpDiscoveryAbstractMessage msg,
-        GridByteArrayOutputStream bout,
         long timeout) throws IOException, IgniteCheckedException {
         assert sock != null;
         assert msg != null;
-        assert bout != null;
-
-        // Marshall message first to perform only write after.
-        marsh.marshal(msg, bout);
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, 
U.currentTimeMillis() + timeout);
 
@@ -1378,11 +1357,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
         IOException err = null;
 
         try {
-            OutputStream out = sock.getOutputStream();
-
-            bout.writeTo(out);
-
-            out.flush();
+            marsh.marshal(msg, sock.getOutputStream());
         }
         catch (IOException e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 7debb41..a57d531 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.typedef.CIX2;
@@ -2159,7 +2158,7 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -2184,7 +2183,7 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);

http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0df7da6..d3afdbf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -54,7 +54,6 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import 
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -1851,7 +1850,6 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             boolean add = msgIds.add(msg.id());
 
@@ -1861,7 +1859,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 failed = true;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -1875,7 +1873,6 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stopBeforeSndAck) {
                 if (msg instanceof TcpDiscoveryCustomEventMessage) {
@@ -1905,7 +1902,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -1938,7 +1935,6 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 return;
@@ -1983,7 +1979,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 return;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -1999,7 +1995,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryCustomEventMessage && latch != 
null) {
                 log.info("Stop node on custom event: " + msg);
 
@@ -2011,7 +2007,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             if (stop)
                 return;
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -2033,7 +2029,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                 if (nodeAdded1 != null) {
                     nodeAdded1.countDown();
@@ -2060,7 +2056,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
                 log.info("--- Send custom event: " + msg);
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -2073,12 +2069,12 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            long timeout) throws IOException, IgniteCheckedException {
 
             if (stop)
                 throw new RuntimeException("Failing ring message worker 
explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 
@@ -2091,11 +2087,11 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 throw new RuntimeException("Failing ring message worker 
explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, msg, timeout);
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 stop = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19b4af10/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index 4cf9bd0..513c5a3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -23,7 +23,6 @@ import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -352,6 +351,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends 
AbstractDiscoverySelf
             throws IOException, IgniteCheckedException {
             if (!(msg instanceof TcpDiscoveryPingRequest)) {
                 super.writeToSocket(sock, msg, timeout);
+
                 return;
             }
 
@@ -374,15 +374,6 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends 
AbstractDiscoverySelf
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
-            if (countConnCheckMsg && msg instanceof 
TcpDiscoveryConnectionCheckMessage)
-                connCheckStatusMsgCntSent++;
-
-            super.writeToSocket(sock, msg, bout, timeout);
-        }
-
-        /** {@inheritDoc} */
         protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket 
sock, int res, long timeout)
             throws IOException {
             if (countConnCheckMsg && msg instanceof 
TcpDiscoveryConnectionCheckMessage)
@@ -405,4 +396,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends 
AbstractDiscoverySelf
             countConnCheckMsg = false;
         }
     }
-}
\ No newline at end of file
+}

Reply via email to