conn

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a4d8196
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a4d8196
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a4d8196

Branch: refs/heads/ignite-comm-balance
Commit: 6a4d81965a78dd3f47bea3b33f823c62e994dd9a
Parents: 9c87e2c
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Sep 22 17:37:05 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Sep 22 17:42:27 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java |  34 ++-
 .../communication/tcp/TcpCommunicationSpi.java  |   4 +-
 .../IgniteCommunicationBalanceTest.java         | 215 +++++++++++++++----
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 4 files changed, 206 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 2d5cc64..7352b5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -47,6 +47,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -92,6 +93,9 @@ public class GridNioServer<T> {
     /** */
     public static final String IGNITE_NIO_SES_BALANCER_CLASS_NAME = 
"IGNITE_NIO_SES_BALANCER_CLASS_NAME";
 
+    /** */
+    public static final String IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD = 
"IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD";
+
     /** Default session write timeout. */
     public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
 
@@ -215,10 +219,10 @@ public class GridNioServer<T> {
     private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
     /** */
-    private volatile long writerMoveCnt;
+    private final AtomicLong readerMoveCnt = new AtomicLong();
 
     /** */
-    private volatile long readerMoveCnt;
+    private final AtomicLong writerMoveCnt = new AtomicLong();
 
     /** */
     private final Balancer balancer;
@@ -361,6 +365,14 @@ public class GridNioServer<T> {
         this.balancer = balancer0;
     }
 
+    public long readerMoveCount() {
+        return readerMoveCnt.get();
+    }
+
+    public long writerMoveCount() {
+        return writerMoveCnt.get();
+    }
+
     /**
      * @return Configured port.
      */
@@ -1505,6 +1517,11 @@ public class GridNioServer<T> {
 
                                     ses.finishMoveSession(this);
 
+                                    if (idx % 2 == 0)
+                                        readerMoveCnt.incrementAndGet();
+                                    else
+                                        writerMoveCnt.incrementAndGet();
+
                                     SelectionKey key = 
f.movedSocketChannel().register(selector,
                                         SelectionKey.OP_READ | 
SelectionKey.OP_WRITE,
                                         ses);
@@ -2948,10 +2965,13 @@ public class GridNioServer<T> {
         /** */
         private long lastBalance;
 
+        /** */
+        private final long balancePeriod = 
IgniteSystemProperties.getLong(IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, 5000);
+
         /**
          * @param srv Server.
          */
-        public SizeBasedBalancer(GridNioServer<?> srv) {
+        SizeBasedBalancer(GridNioServer<?> srv) {
             this.srv = srv;
 
             log = srv.log;
@@ -2961,13 +2981,13 @@ public class GridNioServer<T> {
         @Override public void balance() {
             long now = U.currentTimeMillis();
 
-            if (lastBalance + 5000 < now) {
+            if (lastBalance + balancePeriod < now) {
                 lastBalance = now;
 
                 long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = 
-1;
                 int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, 
minSentIdx = -1;
 
-                boolean print = 
Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
+                boolean print = 
false;//Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
 
                 List<GridNioServer.AbstractNioClientWorker> clientWorkers = 
(List)srv.clientWorkers;
 
@@ -3051,7 +3071,7 @@ public class GridNioServer<T> {
                             log.info("Will move session to less loaded writer 
[diff=" + sentDiff + ", ses=" + ses +
                                 ", from=" + maxSentIdx + ", to=" + minSentIdx 
+ ']');
 
-                        srv.writerMoveCnt++;
+                        srv.writerMoveCnt.incrementAndGet();
 
                         clientWorkers.get(maxSentIdx).offer(new 
SessionMoveFuture(ses, minSentIdx));
                     }
@@ -3093,7 +3113,7 @@ public class GridNioServer<T> {
                             log.info("Will move session to less loaded reader 
[diff=" + rcvdDiff + ", ses=" + ses +
                                 ", from=" + maxSentIdx + ", to=" + minSentIdx 
+ ']');
 
-                        srv.readerMoveCnt++;
+                        srv.readerMoveCnt.incrementAndGet();
 
                         clientWorkers.get(maxRcvdIdx).offer(new 
SessionMoveFuture(ses, minRcvdIdx));
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 784b081..fd9985e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3110,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             rcvCnt = buf.getLong(1);
                         }
 
-                       // if (log.isDebugEnabled())
-                            log.info("Received handshake message [rmtNode=" + 
rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                       if (log.isDebugEnabled())
+                            log.debug("Received handshake message [rmtNode=" + 
rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
                         if (rcvCnt == -1) {
                             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 839bd77..86d43e7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -22,12 +22,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -58,6 +65,7 @@ public class IgniteCommunicationBalanceTest extends 
GridCommonAbstractTest {
         TcpCommunicationSpi commSpi = 
((TcpCommunicationSpi)cfg.getCommunicationSpi());
 
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(1);
 
         if (selectors > 0)
             commSpi.setSelectorsCount(selectors);
@@ -79,45 +87,157 @@ public class IgniteCommunicationBalanceTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testBalance() throws Exception {
-        selectors = 4;
+    public void testBalance1() throws Exception {
+        
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
 
-        startGrid(0);
+        try {
+            selectors = 4;
 
-        client = true;
+            startGridsMultiThreaded(4);
 
-        Ignite client = startGrid(4);
+            client = true;
 
-        startGridsMultiThreaded(1, 3);
+            Ignite client = startGrid(4);
 
-        for (int i = 0; i < 4; i++) {
-            ClusterNode node = 
client.cluster().node(ignite(i).cluster().localNode().id());
+            for (int i = 0; i < 4; i++) {
+                ClusterNode node = 
client.cluster().node(ignite(i).cluster().localNode().id());
 
-            client.compute(client.cluster().forNode(node)).run(new 
DummyRunnable());
+                client.compute(client.cluster().forNode(node)).run(new 
DummyRunnable(null));
+            }
+
+            waitNioBalanceStop(client, 30_000);
+
+            final GridNioServer srv = 
GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), 
"nioSrvr");
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            long readMoveCnt1 = srv.readerMoveCount();
+            long writeMoveCnt1 = srv.writerMoveCount();
+
+            for (int iter = 0; iter < 10; iter++) {
+                log.info("Iteration: " + iter);
+
+                int nodeIdx = rnd.nextInt(4);
+
+                ClusterNode node = 
client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
+
+                IgniteCompute compute = 
client.compute(client.cluster().forNode(node));
+
+                for (int i = 0; i < 10_000; i++)
+                    compute.run(new DummyRunnable(null));
+
+                final long readMoveCnt = readMoveCnt1;
+                final long writeMoveCnt = writeMoveCnt1;
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return srv.readerMoveCount() > readMoveCnt && 
srv.writerMoveCount() > writeMoveCnt;
+                    }
+                }, 10_000);
+
+                waitNioBalanceStop(client, 30_000);
+
+                long readMoveCnt2 = srv.readerMoveCount();
+                long writeMoveCnt2 = srv.writerMoveCount();
+
+                assertTrue(readMoveCnt2 > readMoveCnt1);
+                assertTrue(writeMoveCnt2 > writeMoveCnt1);
+
+                readMoveCnt1 = readMoveCnt2;
+                writeMoveCnt1 = writeMoveCnt2;
+            }
+
+            for (Ignite node : G.allGrids())
+                waitNioBalanceStop(node, 10_000);
+        }
+        finally {
+            
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
         }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBalance2() throws Exception {
+        
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
+
+        try {
+            startGridsMultiThreaded(5);
+
+            client = true;
+
+            startGridsMultiThreaded(5, 5);
+
+            for (int i = 0; i < 20; i++) {
+                log.info("Iteration: " + i);
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        Ignite node = ignite(idx.incrementAndGet() % 10);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-//        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-//
-//        for (int iter = 0; iter < 10; iter++) {
-//            log.info("Iteration: " + iter);
-//
-//            int nodeIdx = rnd.nextInt(4);
-//
-//            ClusterNode node = 
client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
-//
-//            for (int i = 0; i < 10_000; i++)
-//                client.compute(client.cluster().forNode(node)).run(new 
DummyRunnable());
-//
-//            U.sleep(5000);
-//        }
-
-        while (true) {
-            ((IgniteKernal) client).dumpDebugInfo();
-
-            Thread.sleep(5000);
+                        int msgs = rnd.nextInt(1000);
+
+                        for (int i = 0; i < msgs; i++) {
+                            int sndTo = rnd.nextInt(10);
+
+                            ClusterNode sntToNode = 
node.cluster().node(ignite(sndTo).cluster().localNode().id());
+
+                            IgniteCompute compute = 
node.compute(node.cluster().forNode(sntToNode));
+
+                            compute.run(new DummyRunnable(new 
byte[rnd.nextInt(1024)]));
+                        }
+
+                        return null;
+                    }
+                }, 30, "test-thread");
+
+                for (Ignite node : G.allGrids())
+                    waitNioBalanceStop(node, 10_000);
+            }
+        }
+        finally {
+            
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
         }
+    }
+
+    /**
+     * @param node Node.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
+     */
+    private void waitNioBalanceStop(Ignite node, long timeout) throws 
Exception {
+        TcpCommunicationSpi spi = 
(TcpCommunicationSpi)node.configuration().getCommunicationSpi();
+
+        final GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+            @Override public boolean applyx() throws IgniteCheckedException {
+                long readerMovCnt1 = srv.readerMoveCount();
+                long writerMovCnt1 = srv.writerMoveCount();
+
+                U.sleep(2000);
 
-        //Thread.sleep(Long.MAX_VALUE);
+                long readerMovCnt2 = srv.readerMoveCount();
+                long writerMovCnt2 = srv.writerMoveCount();
+
+                if (readerMovCnt1 != readerMovCnt2) {
+                    log.info("Readers balance is in progress [cnt1=" + 
readerMovCnt1 + ", cnt2=" + readerMovCnt2 + ']');
+
+                    return false;
+                }
+                if (writerMovCnt1 != writerMovCnt2) {
+                    log.info("Writers balance is in progress [cnt1=" + 
writerMovCnt1 + ", cnt2=" + writerMovCnt2 + ']');
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, timeout));
     }
 
     /**
@@ -126,28 +246,43 @@ public class IgniteCommunicationBalanceTest extends 
GridCommonAbstractTest {
     public void testRandomBalance() throws Exception {
         System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, 
TestBalancer.class.getName());
 
-        final int NODES = 10;
+        try {
+            final int NODES = 10;
 
-        startGridsMultiThreaded(NODES);
+            startGridsMultiThreaded(NODES);
 
-        final long stopTime = System.currentTimeMillis() + 60_000;
+            final long stopTime = System.currentTimeMillis() + 60_000;
 
-        GridTestUtils.runMultiThreaded(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                while (System.currentTimeMillis() < stopTime)
-                    ignite(rnd.nextInt(NODES)).compute().broadcast(new 
DummyRunnable());
+                    while (System.currentTimeMillis() < stopTime)
+                        ignite(rnd.nextInt(NODES)).compute().broadcast(new 
DummyRunnable(null));
 
-                return null;
-            }
-        }, 20, "test-thread");
+                    return null;
+                }
+            }, 20, "test-thread");
+        }
+        finally {
+            
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, null);
+        }
     }
 
     /**
      *
      */
     private static class DummyRunnable implements IgniteRunnable {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        public DummyRunnable(byte[] data) {
+            this.data = data;
+        }
+
         /** {@inheritDoc} */
         @Override public void run() {
             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 3a0d1ee..5b24a13 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreOptimizedMarshallerS
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
 import 
org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
+import 
org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 import 
org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
@@ -321,6 +322,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(CacheTxFastFinishTest.class);
 
         suite.addTestSuite(IgniteVariousConnectionNumberTest.class);
+        suite.addTestSuite(IgniteCommunicationBalanceTest.class);
 
         return suite;
     }

Reply via email to