Repository: ignite Updated Branches: refs/heads/ignite-1.8.2-balance [created] faac0887d
ignite-1.8.2: balance for pairedConnections=false Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/faac0887 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/faac0887 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/faac0887 Branch: refs/heads/ignite-1.8.2-balance Commit: faac0887d5e84aee9d66b2a1761817439e43c84b Parents: 8372e69 Author: sboikov <[email protected]> Authored: Tue Dec 27 13:25:27 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 27 13:25:27 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 98 +++++++++++++++++++- .../IgniteCommunicationBalanceTest.java | 5 +- 2 files changed, 99 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/faac0887/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index bc1f173..11c1ea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -362,7 +362,9 @@ public class GridNioServer<T> { if (balancing && balancePeriod > 0) { boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false); - balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod); + balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer2(balancePeriod); + + log.info("Balancer: " + balancer0.getClass().getSimpleName()); } this.balancer = balancer0; @@ -3559,6 +3561,100 @@ public class GridNioServer<T> { } /** + * + */ + private class SizeBasedBalancer2 implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long lastBalance; + + /** */ + private final long balancePeriod; + + /** + * @param balancePeriod Period. + */ + SizeBasedBalancer2(long balancePeriod) { + this.balancePeriod = balancePeriod; + } + + /** {@inheritDoc} */ + @Override public void run() { + long now = U.currentTimeMillis(); + + if (lastBalance + balancePeriod < now) { + lastBalance = now; + + long maxBytes0 = -1, minBytes0 = -1; + int maxBytesIdx = -1, minBytesIdx = -1; + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + int sesCnt = worker.workerSessions.size(); + + long bytes0 = worker.bytesRcvd0 + worker.bytesSent0; + + if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) { + maxBytes0 = bytes0; + maxBytesIdx = i; + } + + if (minBytes0 == -1 || bytes0 < minBytes0) { + minBytes0 = bytes0; + minBytesIdx = i; + } + } + + if (log.isDebugEnabled()) + log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx + + ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']'); + + if (maxBytes0 != -1 && minBytes0 != -1) { + GridSelectorNioSessionImpl ses = null; + + long bytesDiff = maxBytes0 - minBytes0; + long delta = bytesDiff; + double threshold = bytesDiff * 0.9; + + GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = + clientWorkers.get(maxBytesIdx).workerSessions; + + for (GridSelectorNioSessionImpl ses0 : sessions) { + long bytesSent0 = ses0.bytesSent0(); + + if (bytesSent0 < threshold && + (ses == null || delta > U.safeAbs(bytesSent0 - bytesDiff / 2))) { + ses = ses0; + delta = U.safeAbs(bytesSent0 - bytesDiff / 2); + } + } + + if (ses != null) { + if (log.isDebugEnabled()) + log.debug("Will move session to less loaded worker [ses=" + ses + + ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']'); + + moveSession(ses, maxBytesIdx, minBytesIdx); + } + else { + if (log.isDebugEnabled()) + log.debug("Unable to find session to move."); + } + } + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + worker.reset0(); + } + } + } + } + + /** * For tests only. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/ignite/blob/faac0887/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java index e142aef..19482be 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -151,7 +151,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { } } - return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt; + return srv.readerMoveCount() > readMoveCnt || srv.writerMoveCount() > writeMoveCnt; } }, 30_000); @@ -165,8 +165,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { ", rc2=" + readMoveCnt2 + ", wc2=" + writeMoveCnt2 + ']'); - assertTrue(readMoveCnt2 > readMoveCnt1); - assertTrue(writeMoveCnt2 > writeMoveCnt1); + assertTrue(readMoveCnt2 > readMoveCnt1 || writeMoveCnt2 > writeMoveCnt1); readMoveCnt1 = readMoveCnt2; writeMoveCnt1 = writeMoveCnt2;
