conn

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d

Branch: refs/heads/ignite-comm-balance
Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99
Parents: ffd654a
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Sep 22 14:02:39 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Sep 22 14:23:04 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 131 ++++++++++++-------
 .../ignite/internal/util/nio/GridNioWorker.java |  19 ++-
 .../util/nio/GridSelectorNioSessionImpl.java    |  62 ++++++++-
 .../IgniteCommunicationBalanceTest.java         |   2 +-
 4 files changed, 161 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5da557b..2d5cc64 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -435,10 +435,6 @@ public class GridNioServer<T> {
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, 
NioOperation.CLOSE);
 
         impl.offerStateChange(fut);
-//        int idx = impl.selectorIndex(); // TODO
-//
-//        if (idx != -1)
-//            clientWorkers.get(idx).offer(fut);
 
         return fut;
     }
@@ -498,14 +494,9 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1) {
+        else if (msgCnt == 1)
             // Change from 0 to 1 means that worker thread should be waken up.
-//            int idx = ses.selectorIndex();
-//
-//            if (idx != -1) // TODO revisit
-//                clientWorkers.get(idx).offer(fut);
             ses.offerStateChange(fut);
-        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -578,7 +569,6 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            
//clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
             ses0.offerStateChange(fut0);
         }
     }
@@ -598,6 +588,10 @@ public class GridNioServer<T> {
     }
 
     public void moveSession(GridNioSession ses, int from, int to) {
+        assert from >= 0 && from < clientWorkers.size() : from;
+        assert to >= 0 && to < clientWorkers.size() : to;
+        assert from != to;
+
         clientWorkers.get(from).offer(new 
SessionMoveFuture((GridSelectorNioSessionImpl)ses, to));
     }
 
@@ -1336,13 +1330,15 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        /** Sessions assigned to this worker. */
+        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> 
workerSessions =
+            new GridConcurrentHashSet<>();
+
         private volatile long bytesRcvd;
         private volatile long bytesSent;
         private volatile long bytesRcvd0;
         private volatile long bytesSent0;
 
-        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> 
sessions0 = new GridConcurrentHashSet<>();
-
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1360,7 +1356,7 @@ public class GridNioServer<T> {
         }
 
         public Collection<? extends GridNioSession> sessions() {
-            return sessions0;
+            return workerSessions;
         }
 
         /** {@inheritDoc} */
@@ -1409,15 +1405,15 @@ public class GridNioServer<T> {
             try {
                 SelectedSelectionKeySet selectedKeySet = new 
SelectedSelectionKeySet();
 
-                Class<?> selectorImplClass =
+                Class<?> selectorImplCls =
                     Class.forName("sun.nio.ch.SelectorImpl", false, 
U.gridClassLoader());
 
                 // Ensure the current selector implementation is what we can 
instrument.
-                if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+                if (!selectorImplCls.isAssignableFrom(selector.getClass()))
                     return;
 
-                Field selectedKeysField = 
selectorImplClass.getDeclaredField("selectedKeys");
-                Field publicSelectedKeysField = 
selectorImplClass.getDeclaredField("publicSelectedKeys");
+                Field selectedKeysField = 
selectorImplCls.getDeclaredField("selectedKeys");
+                Field publicSelectedKeysField = 
selectorImplCls.getDeclaredField("publicSelectedKeys");
 
                 selectedKeysField.setAccessible(true);
                 publicSelectedKeysField.setAccessible(true);
@@ -1439,17 +1435,41 @@ public class GridNioServer<T> {
             }
         }
 
-        /**
-         * Adds socket channel to the registration queue and wakes up reading 
thread.
-         *
-         * @param req Change request.
-         */
+        /** {@inheritDoc} */
         @Override public void offer(GridNioFuture req) {
             changeReqs.offer((NioOperationFuture)req);
 
             selector.wakeup();
         }
 
+        /** {@inheritDoc} */
+        @Override public void offer(Collection<GridNioFuture> reqs) {
+            for (GridNioFuture req : reqs)
+                changeReqs.offer((NioOperationFuture)req);
+
+            selector.wakeup();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<GridNioFuture> 
clearSessionRequests(GridNioSession ses) {
+            List<GridNioFuture> sesReqs = null;
+
+            for (GridNioServer.NioOperationFuture changeReq : changeReqs) {
+                if (changeReq.session() == ses) {
+                    boolean rmv = changeReqs.remove(changeReq);
+
+                    assert rmv : changeReq;
+
+                    if (sesReqs == null)
+                        sesReqs = new ArrayList<>();
+
+                    sesReqs.add(changeReq);
+                }
+            }
+
+            return sesReqs;
+        }
+
         /**
          * Processes read and write events and registration requests.
          *
@@ -1477,25 +1497,29 @@ public class GridNioServer<T> {
                                 GridSelectorNioSessionImpl ses = f.session();
 
                                 if (idx == f.toIdx) {
-                                    ses.worker = this;
+                                    assert f.movedSocketChannel() != null : f;
+
+                                    boolean add = workerSessions.add(ses);
 
-                                    sessions0.add(ses);
+                                    assert add;
 
-                                    SelectionKey key = 
f.socketChannel().register(selector,
-                                        SelectionKey.OP_READ | 
SelectionKey.OP_WRITE, ses); // TODO what if reads were paused?
+                                    ses.finishMoveSession(this);
+
+                                    SelectionKey key = 
f.movedSocketChannel().register(selector,
+                                        SelectionKey.OP_READ | 
SelectionKey.OP_WRITE,
+                                        ses);
 
                                     ses.key(key);
                                 }
                                 else {
-                                    if (sessions0.remove(ses)) {
-                                        assert ses.worker == this; // TODO 
replace with IF and ignore?
+                                    assert f.movedSocketChannel() == null : f;
 
-                                        // Cleanup.
-                                        ses.worker = null;
+                                    if (workerSessions.remove(ses)) {
+                                        ses.startMoveSession(this);
 
                                         SelectionKey key = ses.key();
 
-                                        
f.socketChannel((SocketChannel)key.channel());
+                                        
f.movedSocketChannel((SocketChannel)key.channel());
 
                                         key.cancel();
 
@@ -1879,7 +1903,7 @@ public class GridNioServer<T> {
                     resend(ses);
 
                 sessions.add(ses);
-                sessions0.add(ses);
+                workerSessions.add(ses);
 
                 try {
                     filterChain.onSessionOpened(ses);
@@ -1922,7 +1946,7 @@ public class GridNioServer<T> {
             }
 
             sessions.remove(ses);
-            sessions0.remove(ses);
+            workerSessions.remove(ses);
 
             SelectionKey key = ses.key();
 
@@ -2038,7 +2062,7 @@ public class GridNioServer<T> {
             bytesSent0 = 0;
             bytesRcvd0 = 0;
 
-            for (GridSelectorNioSessionImpl ses : sessions0)
+            for (GridSelectorNioSessionImpl ses : workerSessions)
                 ses.reset0();
         }
     }
@@ -2248,7 +2272,7 @@ public class GridNioServer<T> {
 
         /** Socket channel in register request. */
         @GridToStringExclude
-        protected SocketChannel sockCh; // TODO to be fixed with proper 
hierarchy
+        private SocketChannel sockCh;
 
         /** Session to perform operation on. */
         @GridToStringExclude
@@ -2450,11 +2474,15 @@ public class GridNioServer<T> {
         /** */
         private final int toIdx;
 
+        /** */
+        @GridToStringExclude
+        private SocketChannel movedSockCh;
+
         /**
-         * @param ses
-         * @param toIdx
+         * @param ses Session.
+         * @param toIdx Target worker index.
          */
-        public SessionMoveFuture(
+        SessionMoveFuture(
             GridSelectorNioSessionImpl ses,
             int toIdx
         ) {
@@ -2463,12 +2491,25 @@ public class GridNioServer<T> {
             this.toIdx = toIdx;
         }
 
+        /**
+         * @return Target worker index.
+         */
         int toIndex() {
             return toIdx;
         }
 
-        void socketChannel(SocketChannel sockCh) {
-            this.sockCh = sockCh;
+        /**
+         * @return Moved session socket channel.
+         */
+        SocketChannel movedSocketChannel() {
+            return movedSockCh;
+        }
+
+        /**
+         * @param movedSockCh Moved session socket channel.
+         */
+        void movedSocketChannel(SocketChannel movedSockCh) {
+            this.movedSockCh = movedSockCh;
         }
 
         /** {@inheritDoc} */
@@ -2938,7 +2979,7 @@ public class GridNioServer<T> {
                         long bytesRcvd0 = worker.bytesRcvd0;
 
                         if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && 
bytesRcvd0 > 0 &&
-                            worker.sessions0.size() > 1) {
+                            worker.workerSessions.size() > 1) {
                             maxRcvd0 = bytesRcvd0;
                             maxRcvdIdx = i;
 
@@ -2955,7 +2996,7 @@ public class GridNioServer<T> {
                         long bytesSent0 = worker.bytesSent0;
 
                         if ((maxSent0 == -1 || bytesSent0 > maxSent0) && 
bytesSent0 > 0 &&
-                            worker.sessions0.size() > 1) {
+                            worker.workerSessions.size() > 1) {
                             maxSent0 = bytesSent0;
                             maxSentIdx = i;
 
@@ -2989,7 +3030,7 @@ public class GridNioServer<T> {
                     double threshold = sentDiff * 0.9;
 
                     GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions 
=
-                        clientWorkers.get(maxSentIdx).sessions0;
+                        clientWorkers.get(maxSentIdx).workerSessions;
 
                     for (GridSelectorNioSessionImpl ses0 : sessions) {
                         long bytesSent0 = ses0.bytesSent0();
@@ -3031,7 +3072,7 @@ public class GridNioServer<T> {
                     double threshold = rcvdDiff * 0.9;
 
                     GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions 
=
-                        clientWorkers.get(maxRcvdIdx).sessions0;
+                        clientWorkers.get(maxRcvdIdx).workerSessions;
 
                     for (GridSelectorNioSessionImpl ses0 : sessions) {
                         long bytesRcvd0 = ses0.bytesReceived0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index d088d8c..7f8033a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -17,12 +17,27 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
 /**
  *
  */
 public interface GridNioWorker {
     /**
-     * @param fut Future.
+     * @param req Change request.
+     */
+    void offer(GridNioFuture req);
+
+    /**
+     * @param reqs Change requests.
+     */
+    void offer(Collection<GridNioFuture> reqs);
+
+    /**
+     * @param ses Session.
+     * @return Session state change requests.
      */
-    void offer(GridNioFuture fut);
+    @Nullable List<GridNioFuture> clearSessionRequests(GridNioSession ses);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index e515696..0c2033b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.util.nio;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteLogger;
@@ -43,8 +45,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @GridToStringExclude
     private SelectionKey key;
 
-    /** */
-    public GridNioWorker worker;
+    /** Current worker thread. */
+    private GridNioWorker worker;
 
     /** Size counter. */
     private final AtomicInteger queueSize = new AtomicInteger();
@@ -68,6 +70,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private List<GridNioFuture> pendingStateChanges;
+
     /**
      * Creates session instance.
      *
@@ -153,13 +158,60 @@ class GridSelectorNioSessionImpl extends 
GridNioSessionImpl {
         return key;
     }
 
+    /**
+     * @param fut
+     */
     void offerStateChange(GridNioFuture fut) {
-        GridNioWorker worker0 = worker;
+        synchronized (this) {
+            GridNioWorker worker0 = worker;
+
+            if (worker0 == null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.add(fut);
+            }
+            else
+                worker0.offer(fut);
+        }
+    }
+
+    /**
+     * @param moveFrom
+     */
+    void startMoveSession(GridNioWorker moveFrom) {
+        synchronized (this) {
+            assert this.worker == moveFrom;
 
-        if (worker0 != null)
-            worker0.offer(fut);
+            List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this);
+
+            worker = null;
+
+            if (sesReqs != null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.addAll(sesReqs);
+            }
+        }
     }
 
+    /**
+     * @param moveTo
+     */
+    void finishMoveSession(GridNioWorker moveTo) {
+        synchronized (this) {
+            assert worker == null;
+
+            worker = moveTo;
+
+            if (pendingStateChanges != null) {
+                moveTo.offer(pendingStateChanges);
+
+                pendingStateChanges = null;
+            }
+        }
+    }
 
     /**
      * Adds write future at the front of the queue without acquiring back 
pressure semaphore.

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index d523aab..839bd77 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -147,7 +147,7 @@ public class IgniteCommunicationBalanceTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    static class DummyRunnable implements IgniteRunnable {
+    private static class DummyRunnable implements IgniteRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
             // No-op.

Reply via email to