conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d Branch: refs/heads/ignite-comm-balance Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99 Parents: ffd654a Author: sboikov <[email protected]> Authored: Thu Sep 22 14:02:39 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 22 14:23:04 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 131 ++++++++++++------- .../ignite/internal/util/nio/GridNioWorker.java | 19 ++- .../util/nio/GridSelectorNioSessionImpl.java | 62 ++++++++- .../IgniteCommunicationBalanceTest.java | 2 +- 4 files changed, 161 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 5da557b..2d5cc64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -435,10 +435,6 @@ public class GridNioServer<T> { NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); impl.offerStateChange(fut); -// int idx = impl.selectorIndex(); // TODO -// -// if (idx != -1) -// clientWorkers.get(idx).offer(fut); return fut; } @@ -498,14 +494,9 @@ public class GridNioServer<T> { if (ses.removeFuture(fut)) fut.connectionClosed(); } - else if (msgCnt == 1) { + else if (msgCnt == 1) // Change from 0 to 1 means that worker thread should be waken up. -// int idx = ses.selectorIndex(); -// -// if (idx != -1) // TODO revisit -// clientWorkers.get(idx).offer(fut); ses.offerStateChange(fut); - } if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -578,7 +569,6 @@ public class GridNioServer<T> { ses0.resend(futs); // Wake up worker. - //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); ses0.offerStateChange(fut0); } } @@ -598,6 +588,10 @@ public class GridNioServer<T> { } public void moveSession(GridNioSession ses, int from, int to) { + assert from >= 0 && from < clientWorkers.size() : from; + assert to >= 0 && to < clientWorkers.size() : to; + assert from != to; + clientWorkers.get(from).offer(new SessionMoveFuture((GridSelectorNioSessionImpl)ses, to)); } @@ -1336,13 +1330,15 @@ public class GridNioServer<T> { /** Worker index. */ private final int idx; + /** Sessions assigned to this worker. */ + private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions = + new GridConcurrentHashSet<>(); + private volatile long bytesRcvd; private volatile long bytesSent; private volatile long bytesRcvd0; private volatile long bytesSent0; - private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 = new GridConcurrentHashSet<>(); - /** * @param idx Index of this worker in server's array. * @param gridName Grid name. @@ -1360,7 +1356,7 @@ public class GridNioServer<T> { } public Collection<? extends GridNioSession> sessions() { - return sessions0; + return workerSessions; } /** {@inheritDoc} */ @@ -1409,15 +1405,15 @@ public class GridNioServer<T> { try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); - Class<?> selectorImplClass = + Class<?> selectorImplCls = Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader()); // Ensure the current selector implementation is what we can instrument. - if (!selectorImplClass.isAssignableFrom(selector.getClass())) + if (!selectorImplCls.isAssignableFrom(selector.getClass())) return; - Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); - Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); + Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys"); + Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); @@ -1439,17 +1435,41 @@ public class GridNioServer<T> { } } - /** - * Adds socket channel to the registration queue and wakes up reading thread. - * - * @param req Change request. - */ + /** {@inheritDoc} */ @Override public void offer(GridNioFuture req) { changeReqs.offer((NioOperationFuture)req); selector.wakeup(); } + /** {@inheritDoc} */ + @Override public void offer(Collection<GridNioFuture> reqs) { + for (GridNioFuture req : reqs) + changeReqs.offer((NioOperationFuture)req); + + selector.wakeup(); + } + + /** {@inheritDoc} */ + @Override public List<GridNioFuture> clearSessionRequests(GridNioSession ses) { + List<GridNioFuture> sesReqs = null; + + for (GridNioServer.NioOperationFuture changeReq : changeReqs) { + if (changeReq.session() == ses) { + boolean rmv = changeReqs.remove(changeReq); + + assert rmv : changeReq; + + if (sesReqs == null) + sesReqs = new ArrayList<>(); + + sesReqs.add(changeReq); + } + } + + return sesReqs; + } + /** * Processes read and write events and registration requests. * @@ -1477,25 +1497,29 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl ses = f.session(); if (idx == f.toIdx) { - ses.worker = this; + assert f.movedSocketChannel() != null : f; + + boolean add = workerSessions.add(ses); - sessions0.add(ses); + assert add; - SelectionKey key = f.socketChannel().register(selector, - SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused? + ses.finishMoveSession(this); + + SelectionKey key = f.movedSocketChannel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, + ses); ses.key(key); } else { - if (sessions0.remove(ses)) { - assert ses.worker == this; // TODO replace with IF and ignore? + assert f.movedSocketChannel() == null : f; - // Cleanup. - ses.worker = null; + if (workerSessions.remove(ses)) { + ses.startMoveSession(this); SelectionKey key = ses.key(); - f.socketChannel((SocketChannel)key.channel()); + f.movedSocketChannel((SocketChannel)key.channel()); key.cancel(); @@ -1879,7 +1903,7 @@ public class GridNioServer<T> { resend(ses); sessions.add(ses); - sessions0.add(ses); + workerSessions.add(ses); try { filterChain.onSessionOpened(ses); @@ -1922,7 +1946,7 @@ public class GridNioServer<T> { } sessions.remove(ses); - sessions0.remove(ses); + workerSessions.remove(ses); SelectionKey key = ses.key(); @@ -2038,7 +2062,7 @@ public class GridNioServer<T> { bytesSent0 = 0; bytesRcvd0 = 0; - for (GridSelectorNioSessionImpl ses : sessions0) + for (GridSelectorNioSessionImpl ses : workerSessions) ses.reset0(); } } @@ -2248,7 +2272,7 @@ public class GridNioServer<T> { /** Socket channel in register request. */ @GridToStringExclude - protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy + private SocketChannel sockCh; /** Session to perform operation on. */ @GridToStringExclude @@ -2450,11 +2474,15 @@ public class GridNioServer<T> { /** */ private final int toIdx; + /** */ + @GridToStringExclude + private SocketChannel movedSockCh; + /** - * @param ses - * @param toIdx + * @param ses Session. + * @param toIdx Target worker index. */ - public SessionMoveFuture( + SessionMoveFuture( GridSelectorNioSessionImpl ses, int toIdx ) { @@ -2463,12 +2491,25 @@ public class GridNioServer<T> { this.toIdx = toIdx; } + /** + * @return Target worker index. + */ int toIndex() { return toIdx; } - void socketChannel(SocketChannel sockCh) { - this.sockCh = sockCh; + /** + * @return Moved session socket channel. + */ + SocketChannel movedSocketChannel() { + return movedSockCh; + } + + /** + * @param movedSockCh Moved session socket channel. + */ + void movedSocketChannel(SocketChannel movedSockCh) { + this.movedSockCh = movedSockCh; } /** {@inheritDoc} */ @@ -2938,7 +2979,7 @@ public class GridNioServer<T> { long bytesRcvd0 = worker.bytesRcvd0; if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && - worker.sessions0.size() > 1) { + worker.workerSessions.size() > 1) { maxRcvd0 = bytesRcvd0; maxRcvdIdx = i; @@ -2955,7 +2996,7 @@ public class GridNioServer<T> { long bytesSent0 = worker.bytesSent0; if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && - worker.sessions0.size() > 1) { + worker.workerSessions.size() > 1) { maxSent0 = bytesSent0; maxSentIdx = i; @@ -2989,7 +3030,7 @@ public class GridNioServer<T> { double threshold = sentDiff * 0.9; GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = - clientWorkers.get(maxSentIdx).sessions0; + clientWorkers.get(maxSentIdx).workerSessions; for (GridSelectorNioSessionImpl ses0 : sessions) { long bytesSent0 = ses0.bytesSent0(); @@ -3031,7 +3072,7 @@ public class GridNioServer<T> { double threshold = rcvdDiff * 0.9; GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = - clientWorkers.get(maxRcvdIdx).sessions0; + clientWorkers.get(maxRcvdIdx).workerSessions; for (GridSelectorNioSessionImpl ses0 : sessions) { long bytesRcvd0 = ses0.bytesReceived0(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java index d088d8c..7f8033a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java @@ -17,12 +17,27 @@ package org.apache.ignite.internal.util.nio; +import java.util.Collection; +import java.util.List; +import org.jetbrains.annotations.Nullable; + /** * */ public interface GridNioWorker { /** - * @param fut Future. + * @param req Change request. + */ + void offer(GridNioFuture req); + + /** + * @param reqs Change requests. + */ + void offer(Collection<GridNioFuture> reqs); + + /** + * @param ses Session. + * @return Session state change requests. */ - void offer(GridNioFuture fut); + @Nullable List<GridNioFuture> clearSessionRequests(GridNioSession ses); } http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index e515696..0c2033b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteLogger; @@ -43,8 +45,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { @GridToStringExclude private SelectionKey key; - /** */ - public GridNioWorker worker; + /** Current worker thread. */ + private GridNioWorker worker; /** Size counter. */ private final AtomicInteger queueSize = new AtomicInteger(); @@ -68,6 +70,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Logger. */ private final IgniteLogger log; + /** */ + private List<GridNioFuture> pendingStateChanges; + /** * Creates session instance. * @@ -153,13 +158,60 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { return key; } + /** + * @param fut + */ void offerStateChange(GridNioFuture fut) { - GridNioWorker worker0 = worker; + synchronized (this) { + GridNioWorker worker0 = worker; + + if (worker0 == null) { + if (pendingStateChanges == null) + pendingStateChanges = new ArrayList<>(); + + pendingStateChanges.add(fut); + } + else + worker0.offer(fut); + } + } + + /** + * @param moveFrom + */ + void startMoveSession(GridNioWorker moveFrom) { + synchronized (this) { + assert this.worker == moveFrom; - if (worker0 != null) - worker0.offer(fut); + List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this); + + worker = null; + + if (sesReqs != null) { + if (pendingStateChanges == null) + pendingStateChanges = new ArrayList<>(); + + pendingStateChanges.addAll(sesReqs); + } + } } + /** + * @param moveTo + */ + void finishMoveSession(GridNioWorker moveTo) { + synchronized (this) { + assert worker == null; + + worker = moveTo; + + if (pendingStateChanges != null) { + moveTo.offer(pendingStateChanges); + + pendingStateChanges = null; + } + } + } /** * Adds write future at the front of the queue without acquiring back pressure semaphore. http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java index d523aab..839bd77 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -147,7 +147,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { /** * */ - static class DummyRunnable implements IgniteRunnable { + private static class DummyRunnable implements IgniteRunnable { /** {@inheritDoc} */ @Override public void run() { // No-op.
