conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a4d8196 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a4d8196 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a4d8196 Branch: refs/heads/ignite-comm-balance Commit: 6a4d81965a78dd3f47bea3b33f823c62e994dd9a Parents: 9c87e2c Author: sboikov <[email protected]> Authored: Thu Sep 22 17:37:05 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 22 17:42:27 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 34 ++- .../communication/tcp/TcpCommunicationSpi.java | 4 +- .../IgniteCommunicationBalanceTest.java | 215 +++++++++++++++---- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 4 files changed, 206 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 2d5cc64..7352b5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -47,6 +47,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -92,6 +93,9 @@ public class GridNioServer<T> { /** */ public static final String IGNITE_NIO_SES_BALANCER_CLASS_NAME = "IGNITE_NIO_SES_BALANCER_CLASS_NAME"; + /** */ + public static final String IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD = "IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD"; + /** Default session write timeout. */ public static final int DFLT_SES_WRITE_TIMEOUT = 5000; @@ -215,10 +219,10 @@ public class GridNioServer<T> { private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; /** */ - private volatile long writerMoveCnt; + private final AtomicLong readerMoveCnt = new AtomicLong(); /** */ - private volatile long readerMoveCnt; + private final AtomicLong writerMoveCnt = new AtomicLong(); /** */ private final Balancer balancer; @@ -361,6 +365,14 @@ public class GridNioServer<T> { this.balancer = balancer0; } + public long readerMoveCount() { + return readerMoveCnt.get(); + } + + public long writerMoveCount() { + return writerMoveCnt.get(); + } + /** * @return Configured port. */ @@ -1505,6 +1517,11 @@ public class GridNioServer<T> { ses.finishMoveSession(this); + if (idx % 2 == 0) + readerMoveCnt.incrementAndGet(); + else + writerMoveCnt.incrementAndGet(); + SelectionKey key = f.movedSocketChannel().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); @@ -2948,10 +2965,13 @@ public class GridNioServer<T> { /** */ private long lastBalance; + /** */ + private final long balancePeriod = IgniteSystemProperties.getLong(IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, 5000); + /** * @param srv Server. */ - public SizeBasedBalancer(GridNioServer<?> srv) { + SizeBasedBalancer(GridNioServer<?> srv) { this.srv = srv; log = srv.log; @@ -2961,13 +2981,13 @@ public class GridNioServer<T> { @Override public void balance() { long now = U.currentTimeMillis(); - if (lastBalance + 5000 < now) { + if (lastBalance + balancePeriod < now) { lastBalance = now; long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1; int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1; - boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4"); + boolean print = false;//Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4"); List<GridNioServer.AbstractNioClientWorker> clientWorkers = (List)srv.clientWorkers; @@ -3051,7 +3071,7 @@ public class GridNioServer<T> { log.info("Will move session to less loaded writer [diff=" + sentDiff + ", ses=" + ses + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); - srv.writerMoveCnt++; + srv.writerMoveCnt.incrementAndGet(); clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx)); } @@ -3093,7 +3113,7 @@ public class GridNioServer<T> { log.info("Will move session to less loaded reader [diff=" + rcvdDiff + ", ses=" + ses + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); - srv.readerMoveCnt++; + srv.readerMoveCnt.incrementAndGet(); clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 784b081..fd9985e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3110,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rcvCnt = buf.getLong(1); } - // if (log.isDebugEnabled()) - log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); + if (log.isDebugEnabled()) + log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); if (rcvCnt == -1) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java index 839bd77..86d43e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -22,12 +22,19 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -58,6 +65,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi()); commSpi.setSharedMemoryPort(-1); + commSpi.setConnectionsPerNode(1); if (selectors > 0) commSpi.setSelectorsCount(selectors); @@ -79,45 +87,157 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testBalance() throws Exception { - selectors = 4; + public void testBalance1() throws Exception { + System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500"); - startGrid(0); + try { + selectors = 4; - client = true; + startGridsMultiThreaded(4); - Ignite client = startGrid(4); + client = true; - startGridsMultiThreaded(1, 3); + Ignite client = startGrid(4); - for (int i = 0; i < 4; i++) { - ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id()); + for (int i = 0; i < 4; i++) { + ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id()); - client.compute(client.cluster().forNode(node)).run(new DummyRunnable()); + client.compute(client.cluster().forNode(node)).run(new DummyRunnable(null)); + } + + waitNioBalanceStop(client, 30_000); + + final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr"); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long readMoveCnt1 = srv.readerMoveCount(); + long writeMoveCnt1 = srv.writerMoveCount(); + + for (int iter = 0; iter < 10; iter++) { + log.info("Iteration: " + iter); + + int nodeIdx = rnd.nextInt(4); + + ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id()); + + IgniteCompute compute = client.compute(client.cluster().forNode(node)); + + for (int i = 0; i < 10_000; i++) + compute.run(new DummyRunnable(null)); + + final long readMoveCnt = readMoveCnt1; + final long writeMoveCnt = writeMoveCnt1; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt; + } + }, 10_000); + + waitNioBalanceStop(client, 30_000); + + long readMoveCnt2 = srv.readerMoveCount(); + long writeMoveCnt2 = srv.writerMoveCount(); + + assertTrue(readMoveCnt2 > readMoveCnt1); + assertTrue(writeMoveCnt2 > writeMoveCnt1); + + readMoveCnt1 = readMoveCnt2; + writeMoveCnt1 = writeMoveCnt2; + } + + for (Ignite node : G.allGrids()) + waitNioBalanceStop(node, 10_000); + } + finally { + System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, ""); } + } + + /** + * @throws Exception If failed. + */ + public void testBalance2() throws Exception { + System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500"); + + try { + startGridsMultiThreaded(5); + + client = true; + + startGridsMultiThreaded(5, 5); + + for (int i = 0; i < 20; i++) { + log.info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(idx.incrementAndGet() % 10); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); -// ThreadLocalRandom rnd = ThreadLocalRandom.current(); -// -// for (int iter = 0; iter < 10; iter++) { -// log.info("Iteration: " + iter); -// -// int nodeIdx = rnd.nextInt(4); -// -// ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id()); -// -// for (int i = 0; i < 10_000; i++) -// client.compute(client.cluster().forNode(node)).run(new DummyRunnable()); -// -// U.sleep(5000); -// } - - while (true) { - ((IgniteKernal) client).dumpDebugInfo(); - - Thread.sleep(5000); + int msgs = rnd.nextInt(1000); + + for (int i = 0; i < msgs; i++) { + int sndTo = rnd.nextInt(10); + + ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id()); + + IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode)); + + compute.run(new DummyRunnable(new byte[rnd.nextInt(1024)])); + } + + return null; + } + }, 30, "test-thread"); + + for (Ignite node : G.allGrids()) + waitNioBalanceStop(node, 10_000); + } + } + finally { + System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, ""); } + } + + /** + * @param node Node. + * @param timeout Timeout. + * @throws Exception If failed. + */ + private void waitNioBalanceStop(Ignite node, long timeout) throws Exception { + TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); + + final GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr"); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { + @Override public boolean applyx() throws IgniteCheckedException { + long readerMovCnt1 = srv.readerMoveCount(); + long writerMovCnt1 = srv.writerMoveCount(); + + U.sleep(2000); - //Thread.sleep(Long.MAX_VALUE); + long readerMovCnt2 = srv.readerMoveCount(); + long writerMovCnt2 = srv.writerMoveCount(); + + if (readerMovCnt1 != readerMovCnt2) { + log.info("Readers balance is in progress [cnt1=" + readerMovCnt1 + ", cnt2=" + readerMovCnt2 + ']'); + + return false; + } + if (writerMovCnt1 != writerMovCnt2) { + log.info("Writers balance is in progress [cnt1=" + writerMovCnt1 + ", cnt2=" + writerMovCnt2 + ']'); + + return false; + } + + return true; + } + }, timeout)); } /** @@ -126,28 +246,43 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { public void testRandomBalance() throws Exception { System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, TestBalancer.class.getName()); - final int NODES = 10; + try { + final int NODES = 10; - startGridsMultiThreaded(NODES); + startGridsMultiThreaded(NODES); - final long stopTime = System.currentTimeMillis() + 60_000; + final long stopTime = System.currentTimeMillis() + 60_000; - GridTestUtils.runMultiThreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - while (System.currentTimeMillis() < stopTime) - ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable()); + while (System.currentTimeMillis() < stopTime) + ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable(null)); - return null; - } - }, 20, "test-thread"); + return null; + } + }, 20, "test-thread"); + } + finally { + System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, null); + } } /** * */ private static class DummyRunnable implements IgniteRunnable { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + public DummyRunnable(byte[] data) { + this.data = data; + } + /** {@inheritDoc} */ @Override public void run() { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 3a0d1ee..5b24a13 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -37,6 +37,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreOptimizedMarshallerS import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; +import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest; import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest; @@ -321,6 +322,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(CacheTxFastFinishTest.class); suite.addTestSuite(IgniteVariousConnectionNumberTest.class); + suite.addTestSuite(IgniteCommunicationBalanceTest.class); return suite; }
