Merge remote-tracking branch 'remotes/community/ignite-comm-opts2-1' into ignite-comm-balance
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java # modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/101f9845 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/101f9845 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/101f9845 Branch: refs/heads/ignite-comm-balance Commit: 101f9845c542a5f6389f70f0ffa6396e303b13ab Parents: 9bf435c Author: sboikov <[email protected]> Authored: Fri Oct 21 18:54:09 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 21 18:54:09 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 37 +++++++++++--------- .../ignite/internal/util/nio/GridNioWorker.java | 6 ++-- .../util/nio/GridSelectorNioSessionImpl.java | 14 +++++--- 3 files changed, 33 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 448b3c8..c448c43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -545,8 +545,12 @@ public class GridNioServer<T> { throw new IgniteCheckedException(err); } } - else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) - clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req, ses.selectorIndex()); + else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) { + AbstractNioClientWorker worker = (AbstractNioClientWorker)ses.worker(); + + if (worker != null) + worker.offer((SessionChangeRequest)req); + } if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -622,7 +626,7 @@ public class GridNioServer<T> { ses0.resend(futs); // Wake up worker. - ses0.offerStateChange(fut0); + ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0); } } @@ -688,7 +692,7 @@ public class GridNioServer<T> { ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); for (int i = 0; i < clientWorkers.size(); i++) - clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS), i); + clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS)); } /** @@ -835,7 +839,7 @@ public class GridNioServer<T> { else balanceIdx = 0; - clientWorkers.get(balanceIdx).offer(req, balanceIdx); + clientWorkers.get(balanceIdx).offer(req); } /** {@inheritDoc} */ @@ -1644,30 +1648,29 @@ public class GridNioServer<T> { * Adds socket channel to the registration queue and wakes up reading thread. * * @param req Change request. - * @param workerIdx Worker thread index. */ - private void offer(SessionChangeRequest req, int workerIdx) { + public void offer(SessionChangeRequest req) { changeReqs.offer(req); if (select) selector.wakeup(); else if (park) - LockSupport.unpark(clientThreads[workerIdx]); + LockSupport.unpark(clientThreads[idx]); } /** {@inheritDoc} */ - @Override public void offer(Collection<GridNioFuture> reqs) { - for (GridNioFuture req : reqs) - changeReqs.offer((NioOperationFuture)req); + @Override public void offer(Collection<SessionChangeRequest> reqs) { + for (SessionChangeRequest req : reqs) + changeReqs.offer(req); selector.wakeup(); } /** {@inheritDoc} */ - @Override public List<GridNioFuture> clearSessionRequests(GridNioSession ses) { - List<GridNioFuture> sesReqs = null; + @Override public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) { + List<SessionChangeRequest> sesReqs = null; - for (GridNioServer.NioOperationFuture changeReq : changeReqs) { + for (SessionChangeRequest changeReq : changeReqs) { if (changeReq.session() == ses && !(changeReq instanceof SessionMoveFuture)) { boolean rmv = changeReqs.remove(changeReq); @@ -1705,7 +1708,7 @@ public class GridNioServer<T> { } case MOVE: { - SessionMoveFuture f = (SessionMoveFuture)req; + SessionMoveFuture f = (SessionMoveFuture)req0; GridSelectorNioSessionImpl ses = f.session(); @@ -2541,7 +2544,7 @@ public class GridNioServer<T> { /** * Asynchronous operation that may be requested on selector. */ - private enum NioOperation { + enum NioOperation { /** Register read key selection. */ REGISTER, @@ -3613,6 +3616,8 @@ public class GridNioServer<T> { * */ interface SessionChangeRequest { + GridNioSession session(); + /** * @return Requested change operation. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java index bd6c069..3419b4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java @@ -28,16 +28,16 @@ interface GridNioWorker { /** * @param req Change request. */ - public void offer(GridNioFuture req); + public void offer(GridNioServer.SessionChangeRequest req); /** * @param reqs Change requests. */ - public void offer(Collection<GridNioFuture> reqs); + public void offer(Collection<GridNioServer.SessionChangeRequest> reqs); /** * @param ses Session. * @return Session state change requests. */ - @Nullable public List<GridNioFuture> clearSessionRequests(GridNioSession ses); + @Nullable public List<GridNioServer.SessionChangeRequest> clearSessionRequests(GridNioSession ses); } http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index ff5d9ae..047a9bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -46,7 +46,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { private SelectionKey key; /** Current worker thread. */ - private GridNioWorker worker; + private volatile GridNioWorker worker; /** Semaphore. */ @GridToStringExclude @@ -68,7 +68,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { private final IgniteLogger log; /** */ - private List<GridNioFuture> pendingStateChanges; + private List<GridNioServer.SessionChangeRequest> pendingStateChanges; /** */ final AtomicBoolean procWrite = new AtomicBoolean(); @@ -129,6 +129,10 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } } + GridNioWorker worker() { + return worker; + } + /** * Sets selection key for this session. * @@ -166,7 +170,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param fut Move future. * @return {@code True} if session move was scheduled. */ - boolean offerMove(GridNioWorker from, GridNioFuture fut) { + boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) { synchronized (this) { GridNioWorker worker0 = worker; @@ -182,7 +186,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** * @param fut Future. */ - void offerStateChange(GridNioFuture fut) { + void offerStateChange(GridNioServer.SessionChangeRequest fut) { synchronized (this) { GridNioWorker worker0 = worker; @@ -204,7 +208,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { synchronized (this) { assert this.worker == moveFrom; - List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this); + List<GridNioServer.SessionChangeRequest> sesReqs = moveFrom.clearSessionRequests(this); worker = null;
