Repository: ignite Updated Branches: refs/heads/ignite-comm-balance 5ebb7ebb9 -> a1e875d34
ignite-comm-balance Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1e875d3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1e875d3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1e875d3 Branch: refs/heads/ignite-comm-balance Commit: a1e875d34778dc7d97c468fc5ffc5dd926bc78d1 Parents: 5ebb7eb Author: sboikov <sboi...@gridgain.com> Authored: Fri Sep 16 17:30:59 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Sep 16 17:30:59 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 37 +++++++++++--------- .../ignite/internal/util/nio/GridNioWorker.java | 28 +++++++++++++++ .../util/nio/GridSelectorNioSessionImpl.java | 28 ++++++--------- 3 files changed, 59 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 3ede2cb..961def9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -434,10 +434,11 @@ public class GridNioServer<T> { NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); - int idx = impl.selectorIndex(); // TODO - - if (idx != -1) - clientWorkers.get(idx).offer(fut); + impl.offerStateChange(fut); +// int idx = impl.selectorIndex(); // TODO +// +// if (idx != -1) +// clientWorkers.get(idx).offer(fut); return fut; } @@ -499,10 +500,11 @@ public class GridNioServer<T> { } else if (msgCnt == 1) { // Change from 0 to 1 means that worker thread should be waken up. - int idx = ses.selectorIndex(); - - if (idx != -1) // TODO revisit - clientWorkers.get(idx).offer(fut); +// int idx = ses.selectorIndex(); +// +// if (idx != -1) // TODO revisit +// clientWorkers.get(idx).offer(fut); + ses.offerStateChange(fut); } if (msgQueueLsnr != null) @@ -576,7 +578,8 @@ public class GridNioServer<T> { ses0.resend(futs); // Wake up worker. - clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + ses0.offerStateChange(fut0); } } @@ -615,7 +618,7 @@ public class GridNioServer<T> { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op); - clientWorkers.get(impl.selectorIndex()).offer(fut); + impl.offerStateChange(fut); return fut; } @@ -1324,7 +1327,7 @@ public class GridNioServer<T> { /** * Thread performing only read operations from the channel. */ - public abstract class AbstractNioClientWorker extends GridWorker { + public abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker { /** Queue of change requests on this selector. */ private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>(); @@ -1445,8 +1448,8 @@ public class GridNioServer<T> { * * @param req Change request. */ - private void offer(NioOperationFuture req) { - changeReqs.offer(req); + @Override public void offer(GridNioFuture req) { + changeReqs.offer((NioOperationFuture)req); selector.wakeup(); } @@ -1478,7 +1481,7 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl ses = f.session(); if (idx == f.toIdx) { - ses.selectorIndex(idx); + ses.worker = this; sessions0.add(ses); @@ -1489,10 +1492,10 @@ public class GridNioServer<T> { } else { if (sessions0.remove(ses)) { - assert ses.selectorIndex() == idx; // TODO replace with IF and ignore? + assert ses.worker == this; // TODO replace with IF and ignore? // Cleanup. - ses.selectorIndex(-1); + ses.worker = null; SelectionKey key = ses.key(); @@ -1855,7 +1858,7 @@ public class GridNioServer<T> { final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl( log, - idx, + this, filterChain, (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java new file mode 100644 index 0000000..d088d8c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +/** + * + */ +public interface GridNioWorker { + /** + * @param fut Future. + */ + void offer(GridNioFuture fut); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e875d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 8e5b93d..7d02da5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -43,8 +43,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { @GridToStringExclude private SelectionKey key; - /** Worker index for server */ - private volatile int selectorIdx; + /** */ + public GridNioWorker worker; /** Size counter. */ private final AtomicInteger queueSize = new AtomicInteger(); @@ -72,7 +72,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * Creates session instance. * * @param log Logger. - * @param selectorIdx Selector index for this session. + * @param worker NIO worker thread. * @param filterChain Filter chain that will handle requests. * @param locAddr Local address. * @param rmtAddr Remote address. @@ -83,7 +83,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { */ GridSelectorNioSessionImpl( IgniteLogger log, - int selectorIdx, + GridNioWorker worker, GridNioFilterChain filterChain, InetSocketAddress locAddr, InetSocketAddress rmtAddr, @@ -94,7 +94,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { ) { super(filterChain, locAddr, rmtAddr, accepted); - assert selectorIdx >= 0; + assert worker != null; assert sndQueueLimit >= 0; assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address."; @@ -104,7 +104,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { this.log = log; - this.selectorIdx = selectorIdx; + this.worker = worker; sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null; @@ -153,20 +153,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { return key; } - /** - * @return Selector index. - */ - int selectorIndex() { - return selectorIdx; - } + void offerStateChange(GridNioFuture fut) { + GridNioWorker worker0 = worker; - /** - * @param selectorIdx Selector index. - */ - void selectorIndex(int selectorIdx) { - this.selectorIdx = selectorIdx; + if (worker0 != null) + worker0.offer(fut); } + /** * Adds write future at the front of the queue without acquiring back pressure semaphore. *