Merge remote-tracking branch 'remotes/community/ignite-comm-opts2-1' into 
ignite-comm-balance

# Conflicts:
#       
modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
#       
modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/101f9845
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/101f9845
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/101f9845

Branch: refs/heads/ignite-comm-balance
Commit: 101f9845c542a5f6389f70f0ffa6396e303b13ab
Parents: 9bf435c
Author: sboikov <[email protected]>
Authored: Fri Oct 21 18:54:09 2016 +0300
Committer: sboikov <[email protected]>
Committed: Fri Oct 21 18:54:09 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 37 +++++++++++---------
 .../ignite/internal/util/nio/GridNioWorker.java |  6 ++--
 .../util/nio/GridSelectorNioSessionImpl.java    | 14 +++++---
 3 files changed, 33 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 448b3c8..c448c43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -545,8 +545,12 @@ public class GridNioServer<T> {
                     throw new IgniteCheckedException(err);
             }
         }
-        else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, 
true))
-            
clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req, 
ses.selectorIndex());
+        else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, 
true)) {
+            AbstractNioClientWorker worker = 
(AbstractNioClientWorker)ses.worker();
+
+            if (worker != null)
+                worker.offer((SessionChangeRequest)req);
+        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -622,7 +626,7 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            ses0.offerStateChange(fut0);
+            ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
         }
     }
 
@@ -688,7 +692,7 @@ public class GridNioServer<T> {
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
 
         for (int i = 0; i < clientWorkers.size(); i++)
-            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, 
NioOperation.DUMP_STATS), i);
+            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, 
NioOperation.DUMP_STATS));
     }
 
     /**
@@ -835,7 +839,7 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
-        clientWorkers.get(balanceIdx).offer(req, balanceIdx);
+        clientWorkers.get(balanceIdx).offer(req);
     }
 
     /** {@inheritDoc} */
@@ -1644,30 +1648,29 @@ public class GridNioServer<T> {
          * Adds socket channel to the registration queue and wakes up reading 
thread.
          *
          * @param req Change request.
-         * @param workerIdx Worker thread index.
          */
-        private void offer(SessionChangeRequest req, int workerIdx) {
+        public void offer(SessionChangeRequest req) {
             changeReqs.offer(req);
 
             if (select)
                 selector.wakeup();
             else if (park)
-                LockSupport.unpark(clientThreads[workerIdx]);
+                LockSupport.unpark(clientThreads[idx]);
         }
 
         /** {@inheritDoc} */
-        @Override public void offer(Collection<GridNioFuture> reqs) {
-            for (GridNioFuture req : reqs)
-                changeReqs.offer((NioOperationFuture)req);
+        @Override public void offer(Collection<SessionChangeRequest> reqs) {
+            for (SessionChangeRequest req : reqs)
+                changeReqs.offer(req);
 
             selector.wakeup();
         }
 
         /** {@inheritDoc} */
-        @Override public List<GridNioFuture> 
clearSessionRequests(GridNioSession ses) {
-            List<GridNioFuture> sesReqs = null;
+        @Override public List<SessionChangeRequest> 
clearSessionRequests(GridNioSession ses) {
+            List<SessionChangeRequest> sesReqs = null;
 
-            for (GridNioServer.NioOperationFuture changeReq : changeReqs) {
+            for (SessionChangeRequest changeReq : changeReqs) {
                 if (changeReq.session() == ses && !(changeReq instanceof 
SessionMoveFuture)) {
                     boolean rmv = changeReqs.remove(changeReq);
 
@@ -1705,7 +1708,7 @@ public class GridNioServer<T> {
                             }
 
                             case MOVE: {
-                                SessionMoveFuture f = (SessionMoveFuture)req;
+                                SessionMoveFuture f = (SessionMoveFuture)req0;
 
                                 GridSelectorNioSessionImpl ses = f.session();
 
@@ -2541,7 +2544,7 @@ public class GridNioServer<T> {
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    private enum NioOperation {
+    enum NioOperation {
         /** Register read key selection. */
         REGISTER,
 
@@ -3613,6 +3616,8 @@ public class GridNioServer<T> {
      *
      */
     interface SessionChangeRequest {
+        GridNioSession session();
+
         /**
          * @return Requested change operation.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index bd6c069..3419b4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -28,16 +28,16 @@ interface GridNioWorker {
     /**
      * @param req Change request.
      */
-    public void offer(GridNioFuture req);
+    public void offer(GridNioServer.SessionChangeRequest req);
 
     /**
      * @param reqs Change requests.
      */
-    public void offer(Collection<GridNioFuture> reqs);
+    public void offer(Collection<GridNioServer.SessionChangeRequest> reqs);
 
     /**
      * @param ses Session.
      * @return Session state change requests.
      */
-    @Nullable public List<GridNioFuture> clearSessionRequests(GridNioSession 
ses);
+    @Nullable public List<GridNioServer.SessionChangeRequest> 
clearSessionRequests(GridNioSession ses);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/101f9845/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index ff5d9ae..047a9bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -46,7 +46,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private SelectionKey key;
 
     /** Current worker thread. */
-    private GridNioWorker worker;
+    private volatile GridNioWorker worker;
 
     /** Semaphore. */
     @GridToStringExclude
@@ -68,7 +68,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     private final IgniteLogger log;
 
     /** */
-    private List<GridNioFuture> pendingStateChanges;
+    private List<GridNioServer.SessionChangeRequest> pendingStateChanges;
 
     /** */
     final AtomicBoolean procWrite = new AtomicBoolean();
@@ -129,6 +129,10 @@ class GridSelectorNioSessionImpl extends 
GridNioSessionImpl {
         }
     }
 
+    GridNioWorker worker() {
+        return worker;
+    }
+
     /**
      * Sets selection key for this session.
      *
@@ -166,7 +170,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
      * @param fut Move future.
      * @return {@code True} if session move was scheduled.
      */
-    boolean offerMove(GridNioWorker from, GridNioFuture fut) {
+    boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest 
fut) {
         synchronized (this) {
             GridNioWorker worker0 = worker;
 
@@ -182,7 +186,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
     /**
      * @param fut Future.
      */
-    void offerStateChange(GridNioFuture fut) {
+    void offerStateChange(GridNioServer.SessionChangeRequest fut) {
         synchronized (this) {
             GridNioWorker worker0 = worker;
 
@@ -204,7 +208,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
         synchronized (this) {
             assert this.worker == moveFrom;
 
-            List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this);
+            List<GridNioServer.SessionChangeRequest> sesReqs = 
moveFrom.clearSessionRequests(this);
 
             worker = null;
 

Reply via email to