Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance 5ebb7ebb9 -> a1e875d34


ignite-comm-balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1e875d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1e875d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1e875d3

Branch: refs/heads/ignite-comm-balance
Commit: a1e875d34778dc7d97c468fc5ffc5dd926bc78d1
Parents: 5ebb7eb
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 16 17:30:59 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 16 17:30:59 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 37 +++++++++++---------
 .../ignite/internal/util/nio/GridNioWorker.java | 28 +++++++++++++++
 .../util/nio/GridSelectorNioSessionImpl.java    | 28 ++++++---------
 3 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 3ede2cb..961def9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -434,10 +434,11 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, 
NioOperation.CLOSE);
 
-        int idx = impl.selectorIndex(); // TODO
-
-        if (idx != -1)
-            clientWorkers.get(idx).offer(fut);
+        impl.offerStateChange(fut);
+//        int idx = impl.selectorIndex(); // TODO
+//
+//        if (idx != -1)
+//            clientWorkers.get(idx).offer(fut);
 
         return fut;
     }
@@ -499,10 +500,11 @@ public class GridNioServer<T> {
         }
         else if (msgCnt == 1) {
             // Change from 0 to 1 means that worker thread should be waken up.
-            int idx = ses.selectorIndex();
-
-            if (idx != -1) // TODO revisit
-                clientWorkers.get(idx).offer(fut);
+//            int idx = ses.selectorIndex();
+//
+//            if (idx != -1) // TODO revisit
+//                clientWorkers.get(idx).offer(fut);
+            ses.offerStateChange(fut);
         }
 
         if (msgQueueLsnr != null)
@@ -576,7 +578,8 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            
clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+            
//clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+            ses0.offerStateChange(fut0);
         }
     }
 
@@ -615,7 +618,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        impl.offerStateChange(fut);
 
         return fut;
     }
@@ -1324,7 +1327,7 @@ public class GridNioServer<T> {
     /**
      * Thread performing only read operations from the channel.
      */
-    public abstract class AbstractNioClientWorker extends GridWorker {
+    public abstract class AbstractNioClientWorker extends GridWorker 
implements GridNioWorker {
         /** Queue of change requests on this selector. */
         private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = 
new ConcurrentLinkedQueue<>();
 
@@ -1445,8 +1448,8 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        private void offer(NioOperationFuture req) {
-            changeReqs.offer(req);
+        @Override  public void offer(GridNioFuture req) {
+            changeReqs.offer((NioOperationFuture)req);
 
             selector.wakeup();
         }
@@ -1478,7 +1481,7 @@ public class GridNioServer<T> {
                                 GridSelectorNioSessionImpl ses = f.session();
 
                                 if (idx == f.toIdx) {
-                                    ses.selectorIndex(idx);
+                                    ses.worker = this;
 
                                     sessions0.add(ses);
 
@@ -1489,10 +1492,10 @@ public class GridNioServer<T> {
                                 }
                                 else {
                                     if (sessions0.remove(ses)) {
-                                        assert ses.selectorIndex() == idx; // 
TODO replace with IF and ignore?
+                                        assert ses.worker == this; // TODO 
replace with IF and ignore?
 
                                         // Cleanup.
-                                        ses.selectorIndex(-1);
+                                        ses.worker = null;
 
                                         SelectionKey key = ses.key();
 
@@ -1855,7 +1858,7 @@ public class GridNioServer<T> {
 
                 final GridSelectorNioSessionImpl ses = new 
GridSelectorNioSessionImpl(
                     log,
-                    idx,
+                    this,
                     filterChain,
                     (InetSocketAddress)sockCh.getLocalAddress(),
                     (InetSocketAddress)sockCh.getRemoteAddress(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
new file mode 100644
index 0000000..d088d8c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+/**
+ *
+ */
+public interface GridNioWorker {
+    /**
+     * @param fut Future.
+     */
+    void offer(GridNioFuture fut);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 8e5b93d..7d02da5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -43,8 +43,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @GridToStringExclude
     private SelectionKey key;
 
-    /** Worker index for server */
-    private volatile int selectorIdx;
+    /** */
+    public GridNioWorker worker;
 
     /** Size counter. */
     private final AtomicInteger queueSize = new AtomicInteger();
@@ -72,7 +72,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * Creates session instance.
      *
      * @param log Logger.
-     * @param selectorIdx Selector index for this session.
+     * @param worker NIO worker thread.
      * @param filterChain Filter chain that will handle requests.
      * @param locAddr Local address.
      * @param rmtAddr Remote address.
@@ -83,7 +83,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      */
     GridSelectorNioSessionImpl(
         IgniteLogger log,
-        int selectorIdx,
+        GridNioWorker worker,
         GridNioFilterChain filterChain,
         InetSocketAddress locAddr,
         InetSocketAddress rmtAddr,
@@ -94,7 +94,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     ) {
         super(filterChain, locAddr, rmtAddr, accepted);
 
-        assert selectorIdx >= 0;
+        assert worker != null;
         assert sndQueueLimit >= 0;
 
         assert locAddr != null : "GridSelectorNioSessionImpl should have local 
socket address.";
@@ -104,7 +104,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
 
         this.log = log;
 
-        this.selectorIdx = selectorIdx;
+        this.worker = worker;
 
         sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
 
@@ -153,20 +153,14 @@ class GridSelectorNioSessionImpl extends 
GridNioSessionImpl {
         return key;
     }
 
-    /**
-     * @return Selector index.
-     */
-    int selectorIndex() {
-        return selectorIdx;
-    }
+    void offerStateChange(GridNioFuture fut) {
+        GridNioWorker worker0 = worker;
 
-    /**
-     * @param selectorIdx Selector index.
-     */
-    void selectorIndex(int selectorIdx) {
-        this.selectorIdx = selectorIdx;
+        if (worker0 != null)
+            worker0.offer(fut);
     }
 
+
     /**
      * Adds write future at the front of the queue without acquiring back 
pressure semaphore.
      *

Reply via email to