http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index c8e2e0b..bc1f173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Deque; import java.util.Iterator; import java.util.List; @@ -44,6 +45,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -63,6 +66,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -86,11 +90,14 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPER * */ public class GridNioServer<T> { + /** */ + public static final String IGNITE_IO_BALANCE_RANDOM_BALANCE = "IGNITE_IO_BALANCE_RANDOM_BALANCER"; + /** Default session write timeout. */ public static final int DFLT_SES_WRITE_TIMEOUT = 5000; /** Default send queue limit. */ - public static final int DFLT_SEND_QUEUE_LIMIT = 1024; + public static final int DFLT_SEND_QUEUE_LIMIT = 0; /** Time, which server will wait before retry operation. */ private static final long ERR_WAIT_TIME = 2000; @@ -122,6 +129,9 @@ public class GridNioServer<T> { } } + /** Defines how many times selector should do {@code selectNow()} before doing {@code select(long)}. */ + private long selectorSpins; + /** Accept worker thread. */ @GridToStringExclude private final IgniteThread acceptThread; @@ -145,9 +155,13 @@ public class GridNioServer<T> { /** Flag indicating if this server should use direct buffers. */ private final boolean directBuf; - /** Index to select which thread will serve next socket channel. Using round-robin balancing. */ + /** Index to select which thread will serve next incoming socket channel. Using round-robin balancing. */ + @GridToStringExclude + private int readBalanceIdx; + + /** Index to select which thread will serve next out socket channel. Using round-robin balancing. */ @GridToStringExclude - private int balanceIdx; + private int writeBalanceIdx = 1; /** Tcp no delay flag. */ private final boolean tcpNoDelay; @@ -204,12 +218,25 @@ public class GridNioServer<T> { /** Optional listener to monitor outbound message queue size. */ private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; + /** */ + private final AtomicLong readerMoveCnt = new AtomicLong(); + + /** */ + private final AtomicLong writerMoveCnt = new AtomicLong(); + + /** */ + private final IgniteRunnable balancer; + /** * @param addr Address. * @param port Port. * @param log Log. * @param selectorCnt Count of selectors and selecting threads. * @param gridName Grid name. + * @param srvName Logical server name for threads identification. + * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before + * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. + * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets. * @param directBuf Direct buffer flag. * @param order Byte order. @@ -223,6 +250,7 @@ public class GridNioServer<T> { * @param writerFactory Writer factory. * @param skipRecoveryPred Skip recovery predicate. * @param msgQueueLsnr Message queue size listener. + * @param balancing NIO sessions balancing flag. * @param filters Filters for this server. * @throws IgniteCheckedException If failed. */ @@ -232,6 +260,8 @@ public class GridNioServer<T> { IgniteLogger log, int selectorCnt, @Nullable String gridName, + @Nullable String srvName, + long selectorSpins, boolean tcpNoDelay, boolean directBuf, ByteOrder order, @@ -245,6 +275,7 @@ public class GridNioServer<T> { GridNioMessageWriterFactory writerFactory, IgnitePredicate<Message> skipRecoveryPred, IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr, + boolean balancing, GridNioFilter... filters ) throws IgniteCheckedException { if (port != -1) @@ -268,6 +299,7 @@ public class GridNioServer<T> { this.sockSndBuf = sockSndBuf; this.sndQueueLimit = sndQueueLimit; this.msgQueueLsnr = msgQueueLsnr; + this.selectorSpins = selectorSpins; filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); @@ -299,9 +331,16 @@ public class GridNioServer<T> { clientThreads = new IgniteThread[selectorCnt]; for (int i = 0; i < selectorCnt; i++) { + String threadName; + + if (srvName == null) + threadName = "grid-nio-worker-" + i; + else + threadName = "grid-nio-worker-" + srvName + "-" + i; + AbstractNioClientWorker worker = directMode ? - new DirectNioClientWorker(i, gridName, "grid-nio-worker-" + i, log) : - new ByteBufferNioClientWorker(i, gridName, "grid-nio-worker-" + i, log); + new DirectNioClientWorker(i, gridName, threadName, log) : + new ByteBufferNioClientWorker(i, gridName, threadName, log); clientWorkers.add(worker); @@ -315,6 +354,32 @@ public class GridNioServer<T> { this.writerFactory = writerFactory; this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse(); + + long balancePeriod = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, 5000); + + IgniteRunnable balancer0 = null; + + if (balancing && balancePeriod > 0) { + boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false); + + balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod); + } + + this.balancer = balancer0; + } + + /** + * @return Number of reader sessions move. + */ + public long readerMoveCount() { + return readerMoveCnt.get(); + } + + /** + * @return Number of reader writer move. + */ + public long writerMoveCount() { + return writerMoveCnt.get(); } /** @@ -377,6 +442,13 @@ public class GridNioServer<T> { } /** + * @return Selector spins. + */ + public long selectorSpins() { + return selectorSpins; + } + + /** * @param ses Session to close. * @return Future for operation. */ @@ -390,7 +462,7 @@ public class GridNioServer<T> { NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); - clientWorkers.get(impl.selectorIndex()).offer(fut); + impl.offerStateChange(fut); return fut; } @@ -398,61 +470,91 @@ public class GridNioServer<T> { /** * @param ses Session. * @param msg Message. + * @param createFut {@code True} if future should be created. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) { - assert ses instanceof GridSelectorNioSessionImpl; + GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException { + assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + if (createFut) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); - send0(impl, fut, false); + send0(impl, fut, false); - return fut; + return fut; + } + else { + SessionWriteRequest req = new WriteRequestImpl(ses, msg, true); + + send0(impl, req, false); + + return null; + } } /** * @param ses Session. * @param msg Message. + * @param createFut {@code True} if future should be created. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, Message msg) { + GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); + if (createFut) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); - send0(impl, fut, false); + send0(impl, fut, false); - return fut; + return fut; + } + else { + SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg)); + + send0(impl, req, false); + + return null; + } } /** * @param ses Session. - * @param fut Future. + * @param req Request. * @param sys System message flag. + * @throws IgniteCheckedException If session was closed. */ - private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) { + private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException { assert ses != null; - assert fut != null; + assert req != null; - int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req); IgniteInClosure<IgniteException> ackC; if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) - fut.ackClosure(ackC); + req.ackClosure(ackC); if (ses.closed()) { - if (ses.removeFuture(fut)) - fut.connectionClosed(); + if (ses.removeFuture(req)) { + IOException err = new IOException("Failed to send message (connection was closed): " + ses); + + req.onError(err); + + if (!(req instanceof GridNioFuture)) + throw new IgniteCheckedException(err); + } + } + else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) { + AbstractNioClientWorker worker = (AbstractNioClientWorker)ses.worker(); + + if (worker != null) + worker.offer((SessionChangeRequest)req); } - else if (msgCnt == 1) - // Change from 0 to 1 means that worker thread should be waken up. - clientWorkers.get(ses.selectorIndex()).offer(fut); if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -463,10 +565,10 @@ public class GridNioServer<T> { * * @param ses Session. * @param msg Message. - * @return Future. + * @throws IgniteCheckedException If session was closed. */ - public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) { - return sendSystem(ses, msg, null); + public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException { + sendSystem(ses, msg, null); } /** @@ -475,27 +577,30 @@ public class GridNioServer<T> { * @param ses Session. * @param msg Message. * @param lsnr Future listener notified from the session thread. - * @return Future. + * @throws IgniteCheckedException If session was closed. */ - public GridNioFuture<?> sendSystem(GridNioSession ses, + public void sendSystem(GridNioSession ses, Message msg, - @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) { + @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); - if (lsnr != null) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); + fut.listen(lsnr); assert !fut.isDone(); - } - send0(impl, fut, true); + send0(impl, fut, true); + } + else { + SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); - return fut; + send0(impl, req, true); + } } /** @@ -504,37 +609,69 @@ public class GridNioServer<T> { public void resend(GridNioSession ses) { assert ses instanceof GridSelectorNioSessionImpl; - GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor(); - if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) { - Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures(); + if (recoveryDesc != null && !recoveryDesc.messagesRequests().isEmpty()) { + Deque<SessionWriteRequest> futs = recoveryDesc.messagesRequests(); if (log.isDebugEnabled()) log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']'); GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; - GridNioFuture<?> fut0 = futs.iterator().next(); + SessionWriteRequest fut0 = futs.iterator().next(); - for (GridNioFuture<?> fut : futs) { + for (SessionWriteRequest fut : futs) { fut.messageThread(true); - ((NioOperationFuture)fut).resetSession(ses0); + fut.resetSession(ses0); } ses0.resend(futs); // Wake up worker. - clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0); } } /** + * @return Sessions. + */ + public Collection<? extends GridNioSession> sessions() { + return sessions; + } + + /** + * @return Workers. + */ + public List<AbstractNioClientWorker> workers() { + return clientWorkers; + } + + /** + * @param ses Session. + * @param from Move from index. + * @param to Move to index. + */ + private void moveSession(GridNioSession ses, int from, int to) { + assert from >= 0 && from < clientWorkers.size() : from; + assert to >= 0 && to < clientWorkers.size() : to; + assert from != to; + + GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; + + SessionMoveFuture fut = new SessionMoveFuture(ses0, to); + + if (!ses0.offerMove(clientWorkers.get(from), fut)) + fut.onDone(false); + } + + /** * @param ses Session. * @param op Operation. * @return Future for operation. */ - GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) { + private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) { assert ses instanceof GridSelectorNioSessionImpl; assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ; @@ -546,7 +683,7 @@ public class GridNioServer<T> { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op); - clientWorkers.get(impl.selectorIndex()).offer(fut); + impl.offerStateChange(fut); return fut; } @@ -555,6 +692,9 @@ public class GridNioServer<T> { * */ public void dumpStats() { + U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + + ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); + for (int i = 0; i < clientWorkers.size(); i++) clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS)); } @@ -675,12 +815,35 @@ public class GridNioServer<T> { * @param req Request to balance. */ private synchronized void offerBalanced(NioOperationFuture req) { - clientWorkers.get(balanceIdx).offer(req); + assert req.operation() == NioOperation.REGISTER : req; + assert req.socketChannel() != null : req; + + int workers = clientWorkers.size(); + + int balanceIdx; + + if (workers > 1) { + if (req.accepted()) { + balanceIdx = readBalanceIdx; + + readBalanceIdx += 2; + + if (readBalanceIdx >= workers) + readBalanceIdx = 0; + } + else { + balanceIdx = writeBalanceIdx; - balanceIdx++; + writeBalanceIdx += 2; - if (balanceIdx == clientWorkers.size()) + if (writeBalanceIdx >= workers) + writeBalanceIdx = 1; + } + } + else balanceIdx = 0; + + clientWorkers.get(balanceIdx).offer(req); } /** {@inheritDoc} */ @@ -792,21 +955,30 @@ public class GridNioServer<T> { while (true) { ByteBuffer buf = ses.removeMeta(BUF_META_KEY); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); // Check if there were any pending data from previous writes. if (buf == null) { assert req == null; - req = (NioOperationFuture<?>)ses.pollFuture(); + req = ses.pollFuture(); if (req == null) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (ses.procWrite.get()) { + ses.procWrite.set(false); + + if (ses.writeQueue().isEmpty()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + else + ses.procWrite.set(true); + } break; } - buf = req.message(); + buf = (ByteBuffer)req.message(); } if (!skipWrite) { @@ -841,10 +1013,15 @@ public class GridNioServer<T> { // Message was successfully written. assert req != null; - req.onDone(); + req.onMessageWritten(); } } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ByteBufferNioClientWorker.class, this, super.toString()); + } } /** @@ -909,6 +1086,7 @@ public class GridNioServer<T> { metricsLsnr.onBytesReceived(cnt); ses.bytesReceived(cnt); + onRead(cnt); readBuf.flip(); @@ -921,6 +1099,12 @@ public class GridNioServer<T> { readBuf.compact(); else readBuf.clear(); + + if (ses.hasSystemMessage() && !ses.procWrite.get()) { + ses.procWrite.set(true); + + registerWrite(ses); + } } catch (IgniteCheckedException e) { close(ses, e); @@ -993,16 +1177,29 @@ public class GridNioServer<T> { if (ses.meta(WRITE_BUF_LIMIT) != null) buf.limit((int)ses.meta(WRITE_BUF_LIMIT)); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); while (true) { if (req == null) { - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); - if (req == null && buf.position() == 0) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (req == null) { + req = ses.pollFuture(); - break; + if (req == null && buf.position() == 0) { + if (ses.procWrite.get()) { + ses.procWrite.set(false); + + if (ses.writeQueue().isEmpty()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + else + ses.procWrite.set(true); + } + + break; + } } } @@ -1010,7 +1207,7 @@ public class GridNioServer<T> { boolean finished = false; if (req != null) { - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1025,14 +1222,17 @@ public class GridNioServer<T> { // Fill up as many messages as possible to write buffer. while (finished) { - req.onDone(); + req.onMessageWritten(); - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); + + if (req == null) + req = ses.pollFuture(); if (req == null) break; - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1129,13 +1329,31 @@ public class GridNioServer<T> { ses.bytesSent(cnt); if (!buf.hasRemaining()) - queue.remove(buf); + queue.poll(); else break; } } /** + * @param ses Session. + * @return System message request. + */ + private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) { + if (ses.hasSystemMessage()) { + Object msg = ses.systemMessage(); + + SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); + + assert !ses.hasSystemMessage(); + + return req; + } + + return null; + } + + /** * Processes write-ready event on the key. * * @param key Key that is ready to be written. @@ -1147,7 +1365,7 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); ByteBuffer buf = ses.writeBuffer(); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); @@ -1161,12 +1379,25 @@ public class GridNioServer<T> { } if (req == null) { - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); - if (req == null && buf.position() == 0) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (req == null) { + req = ses.pollFuture(); - return; + if (req == null && buf.position() == 0) { + if (ses.procWrite.get()) { + ses.procWrite.set(false); + + if (ses.writeQueue().isEmpty()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + else + ses.procWrite.set(true); + } + + return; + } } } @@ -1174,9 +1405,9 @@ public class GridNioServer<T> { boolean finished = false; if (req != null) { - msg = req.directMessage(); + msg = (Message)req.message(); - assert msg != null; + assert msg != null : req; if (writer != null) writer.setCurrentWriteClass(msg.getClass()); @@ -1189,14 +1420,17 @@ public class GridNioServer<T> { // Fill up as many messages as possible to write buffer. while (finished) { - req.onDone(); + req.onMessageWritten(); - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); + + if (req == null) + req = ses.pollFuture(); if (req == null) break; - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1223,6 +1457,7 @@ public class GridNioServer<T> { metricsLsnr.onBytesSent(cnt); ses.bytesSent(cnt); + onWrite(cnt); } else { // For test purposes only (skipWrite is set to true in tests only). @@ -1242,14 +1477,19 @@ public class GridNioServer<T> { else buf.clear(); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DirectNioClientWorker.class, this, super.toString()); + } } /** * Thread performing only read operations from the channel. */ - private abstract class AbstractNioClientWorker extends GridWorker { + private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker { /** Queue of change requests on this selector. */ - private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>(); /** Selector to select read events. */ private Selector selector; @@ -1260,6 +1500,25 @@ public class GridNioServer<T> { /** Worker index. */ private final int idx; + /** */ + private long bytesRcvd; + + /** */ + private long bytesSent; + + /** */ + private volatile long bytesRcvd0; + + /** */ + private volatile long bytesSent0; + + /** Sessions assigned to this worker. */ + private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions = + new GridConcurrentHashSet<>(); + + /** {@code True} if worker has called or is about to call {@code Selector.select()}. */ + private volatile boolean select; + /** * @param idx Index of this worker in server's array. * @param gridName Grid name. @@ -1322,15 +1581,15 @@ public class GridNioServer<T> { try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); - Class<?> selectorImplClass = + Class<?> selectorImplCls = Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader()); // Ensure the current selector implementation is what we can instrument. - if (!selectorImplClass.isAssignableFrom(selector.getClass())) + if (!selectorImplCls.isAssignableFrom(selector.getClass())) return; - Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); - Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); + Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys"); + Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); @@ -1357,48 +1616,126 @@ public class GridNioServer<T> { * * @param req Change request. */ - private void offer(NioOperationFuture req) { + public void offer(SessionChangeRequest req) { changeReqs.offer(req); + if (select) + selector.wakeup(); + } + + /** {@inheritDoc} */ + @Override public void offer(Collection<SessionChangeRequest> reqs) { + for (SessionChangeRequest req : reqs) + changeReqs.offer(req); + selector.wakeup(); } + /** {@inheritDoc} */ + @Override public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) { + List<SessionChangeRequest> sesReqs = null; + + for (SessionChangeRequest changeReq : changeReqs) { + if (changeReq.session() == ses && !(changeReq instanceof SessionMoveFuture)) { + boolean rmv = changeReqs.remove(changeReq); + + assert rmv : changeReq; + + if (sesReqs == null) + sesReqs = new ArrayList<>(); + + sesReqs.add(changeReq); + } + } + + return sesReqs; + } + /** * Processes read and write events and registration requests. * * @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool. */ @SuppressWarnings("unchecked") - private void bodyInternal() throws IgniteCheckedException { + private void bodyInternal() throws IgniteCheckedException, InterruptedException { try { long lastIdleCheck = U.currentTimeMillis(); + mainLoop: while (!closed && selector.isOpen()) { - NioOperationFuture req; + SessionChangeRequest req0; - while ((req = changeReqs.poll()) != null) { - switch (req.operation()) { + while ((req0 = changeReqs.poll()) != null) { + switch (req0.operation()) { case REGISTER: { - register(req); + register((NioOperationFuture)req0); break; } - case REQUIRE_WRITE: { - //Just register write key. - SelectionKey key = req.session().key(); + case MOVE: { + SessionMoveFuture f = (SessionMoveFuture)req0; - if (key.isValid()) { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + GridSelectorNioSessionImpl ses = f.session(); + + if (idx == f.toIdx) { + assert f.movedSocketChannel() != null : f; + + boolean add = workerSessions.add(ses); + + assert add; - // Update timestamp to protected against false write timeout. - ((GridNioSessionImpl)key.attachment()).bytesSent(0); + ses.finishMoveSession(this); + + if (idx % 2 == 0) + readerMoveCnt.incrementAndGet(); + else + writerMoveCnt.incrementAndGet(); + + SelectionKey key = f.movedSocketChannel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, + ses); + + ses.key(key); + + ses.procWrite.set(true); + + f.onDone(true); + } + else { + assert f.movedSocketChannel() == null : f; + + if (workerSessions.remove(ses)) { + ses.startMoveSession(this); + + SelectionKey key = ses.key(); + + assert key.channel() != null : key; + + f.movedSocketChannel((SocketChannel)key.channel()); + + key.cancel(); + + clientWorkers.get(f.toIndex()).offer(f); + } + else + f.onDone(false); } break; } + case REQUIRE_WRITE: { + SessionWriteRequest req = (SessionWriteRequest)req0; + + registerWrite((GridSelectorNioSessionImpl)req.session()); + + break; + } + case CLOSE: { + NioOperationFuture req = (NioOperationFuture)req0; + if (close(req.session(), null)) req.onDone(true); else @@ -1408,6 +1745,8 @@ public class GridNioServer<T> { } case PAUSE_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + SelectionKey key = req.session().key(); if (key.isValid()) { @@ -1426,6 +1765,8 @@ public class GridNioServer<T> { } case RESUME_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + SelectionKey key = req.session().key(); if (key.isValid()) { @@ -1444,76 +1785,66 @@ public class GridNioServer<T> { } case DUMP_STATS: { - StringBuilder sb = new StringBuilder(); + NioOperationFuture req = (NioOperationFuture)req0; - Set<SelectionKey> keys = selector.keys(); - - sb.append(U.nl()) - .append(">> Selector info [idx=").append(idx) - .append(", keysCnt=").append(keys.size()) - .append("]").append(U.nl()); - - for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); - - sb.append(" Connection info [") - .append("rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()); - - GridNioRecoveryDescriptor desc = ses.recoveryDescriptor(); + try { + dumpStats(); + } + finally { + // Complete the request just in case (none should wait on this future). + req.onDone(true); + } + } + } + } - if (desc != null) { - sb.append(", msgsSent=").append(desc.sent()) - .append(", msgsAckedByRmt=").append(desc.acked()) - .append(", msgsRcvd=").append(desc.received()) - .append(", descIdHash=").append(System.identityHashCode(desc)); - } - else - sb.append(", recoveryDesc=null"); + int res = 0; - sb.append(", bytesRcvd=").append(ses.bytesReceived()) - .append(", bytesSent=").append(ses.bytesSent()) - .append(", opQueueSize=").append(ses.writeQueueSize()) - .append(", msgWriter=").append(writer != null ? writer.toString() : "null") - .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + for (long i = 0; i < selectorSpins && res == 0; i++) { + res = selector.selectNow(); - int cnt = 0; + if (res > 0) { + // Walk through the ready keys collection and process network events. + if (selectedKeys == null) + processSelectedKeys(selector.selectedKeys()); + else + processSelectedKeysOptimized(selectedKeys.flip()); + } - for (GridNioFuture<?> fut : ses.writeQueue()) { - if (cnt == 0) - sb.append(",\n opQueue=[").append(fut); - else - sb.append(',').append(fut); + if (!changeReqs.isEmpty()) + continue mainLoop; - if (++cnt == 5) { - sb.append(']'); + // Just in case we do busy selects. + long now = U.currentTimeMillis(); - break; - } - } + if (now - lastIdleCheck > 2000) { + lastIdleCheck = now; + checkIdle(selector.keys()); + } - sb.append("]").append(U.nl()); - } + if (isCancelled()) + return; + } - U.warn(log, sb.toString()); + // Falling to blocking select. + select = true; - // Complete the request just in case (none should wait on this future). - req.onDone(true); - } + try { + if (!changeReqs.isEmpty()) + continue; + + // Wake up every 2 seconds to check if closed. + if (selector.select(2000) > 0) { + // Walk through the ready keys collection and process network events. + if (selectedKeys == null) + processSelectedKeys(selector.selectedKeys()); + else + processSelectedKeysOptimized(selectedKeys.flip()); } } - - // Wake up every 2 seconds to check if closed. - if (selector.select(2000) > 0) { - // Walk through the ready keys collection and process network events. - if (selectedKeys == null) - processSelectedKeys(selector.selectedKeys()); - else - processSelectedKeysOptimized(selectedKeys.flip()); + finally { + select = false; } long now = U.currentTimeMillis(); @@ -1554,6 +1885,98 @@ public class GridNioServer<T> { } /** + * @param ses Session. + */ + public final void registerWrite(GridSelectorNioSessionImpl ses) { + SelectionKey key = ses.key(); + + if (key.isValid()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + + // Update timestamp to protected against false write timeout. + ses.bytesSent(0); + } + } + + /** + * + */ + private void dumpStats() { + StringBuilder sb = new StringBuilder(); + + Set<SelectionKey> keys = selector.keys(); + + sb.append(U.nl()) + .append(">> Selector info [idx=").append(idx) + .append(", keysCnt=").append(keys.size()) + .append(", bytesRcvd=").append(bytesRcvd) + .append(", bytesRcvd0=").append(bytesRcvd0) + .append(", bytesSent=").append(bytesSent) + .append(", bytesSent0=").append(bytesSent0) + .append("]").append(U.nl()); + + for (SelectionKey key : keys) { + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + + MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); + MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + + sb.append(" Connection info [") + .append("in=").append(ses.accepted()) + .append(", rmtAddr=").append(ses.remoteAddress()) + .append(", locAddr=").append(ses.localAddress()); + + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); + + if (outDesc != null) { + sb.append(", msgsSent=").append(outDesc.sent()) + .append(", msgsAckedByRmt=").append(outDesc.acked()) + .append(", descIdHash=").append(System.identityHashCode(outDesc)); + } + else + sb.append(", outRecoveryDesc=null"); + + GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); + + if (inDesc != null) { + sb.append(", msgsRcvd=").append(inDesc.received()) + .append(", lastAcked=").append(inDesc.lastAcknowledged()) + .append(", descIdHash=").append(System.identityHashCode(inDesc)); + } + else + sb.append(", inRecoveryDesc=null"); + + sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesRcvd0=").append(ses.bytesReceived0()) + .append(", bytesSent=").append(ses.bytesSent()) + .append(", bytesSent0=").append(ses.bytesSent0()) + .append(", opQueueSize=").append(ses.writeQueueSize()) + .append(", msgWriter=").append(writer != null ? writer.toString() : "null") + .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + + int cnt = 0; + + for (SessionWriteRequest req : ses.writeQueue()) { + if (cnt == 0) + sb.append(",\n opQueue=[").append(req); + else + sb.append(',').append(req); + + if (++cnt == 5) { + sb.append(']'); + + break; + } + } + + sb.append("]").append(U.nl()); + } + + U.warn(log, sb.toString()); + } + + /** * Processes keys selected by a selector. * * @param keys Selected keys. @@ -1671,7 +2094,9 @@ public class GridNioServer<T> { long idleTimeout0 = idleTimeout; - if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) { + if (!opWrite && + now - ses.lastReceiveTime() > idleTimeout0 && + now - ses.lastSendScheduleTime() > idleTimeout0) { filterChain.onSessionIdleTimeout(ses); // Update timestamp to avoid multiple notifications within one timeout interval. @@ -1715,7 +2140,7 @@ public class GridNioServer<T> { final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl( log, - idx, + this, filterChain, (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(), @@ -1739,6 +2164,7 @@ public class GridNioServer<T> { resend(ses); sessions.add(ses); + workerSessions.add(ses); try { filterChain.onSessionOpened(ses); @@ -1764,7 +2190,7 @@ public class GridNioServer<T> { } /** - * Closes the ses and all associated resources, then notifies the listener. + * Closes the session and all associated resources, then notifies the listener. * * @param ses Session to be closed. * @param e Exception to be passed to the listener, if any. @@ -1781,12 +2207,10 @@ public class GridNioServer<T> { } sessions.remove(ses); + workerSessions.remove(ses); SelectionKey key = ses.key(); - // Shutdown input and output so that remote client will see correct socket close. - Socket sock = ((SocketChannel)key.channel()).socket(); - if (ses.setClosed()) { ses.onClosed(); @@ -1798,6 +2222,9 @@ public class GridNioServer<T> { ((DirectBuffer)ses.readBuffer()).cleaner().clean(); } + // Shutdown input and output so that remote client will see correct socket close. + Socket sock = ((SocketChannel)key.channel()).socket(); + try { try { sock.shutdownInput(); @@ -1824,28 +2251,35 @@ public class GridNioServer<T> { ses.removeMeta(BUF_META_KEY); // Since ses is in closed state, no write requests will be added. - NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); + + GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor(); + GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor(); - GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + IOException err = new IOException("Failed to send message (connection was closed): " + ses); - if (recovery != null) { + if (outRecovery != null || inRecovery != null) { try { // Poll will update recovery data. - while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) { - if (fut.skipRecovery()) - fut.connectionClosed(); + while ((req = ses.pollFuture()) != null) { + if (req.skipRecovery()) + req.onError(err); } } finally { - recovery.release(); + if (outRecovery != null) + outRecovery.release(); + + if (inRecovery != null && inRecovery != outRecovery) + inRecovery.release(); } } else { - if (fut != null) - fut.connectionClosed(); + if (req != null) + req.onError(err); - while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) - fut.connectionClosed(); + while ((req = ses.pollFuture()) != null) + req.onError(err); } try { @@ -1876,12 +2310,44 @@ public class GridNioServer<T> { * @throws IOException If write failed. */ protected abstract void processWrite(SelectionKey key) throws IOException; - } - /** - * Gets outbound messages queue size. - * - * @return Write queue size. + /** + * @param cnt + */ + final void onRead(int cnt) { + bytesRcvd += cnt; + bytesRcvd0 += cnt; + } + + /** + * @param cnt + */ + final void onWrite(int cnt) { + bytesSent += cnt; + bytesSent0 += cnt; + } + + /** + * + */ + final void reset0() { + bytesSent0 = 0; + bytesRcvd0 = 0; + + for (GridSelectorNioSessionImpl ses : workerSessions) + ses.reset0(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AbstractNioClientWorker.class, this, super.toString()); + } + } + + /** + * Gets outbound messages queue size. + * + * @return Write queue size. */ public int outboundMessagesQueueSize() { int res = 0; @@ -1952,6 +2418,9 @@ public class GridNioServer<T> { if (selector.select(2000) > 0) // Walk through the ready keys collection and process date requests. processSelectedKeys(selector.selectedKeys()); + + if (balancer != null) + balancer.run(); } } // Ignore this exception as thread interruption is equal to 'close' call. @@ -2048,10 +2517,13 @@ public class GridNioServer<T> { /** * Asynchronous operation that may be requested on selector. */ - private enum NioOperation { + enum NioOperation { /** Register read key selection. */ REGISTER, + /** Move session between workers. */ + MOVE, + /** Register write key selection. */ REQUIRE_WRITE, @@ -2069,9 +2541,193 @@ public class GridNioServer<T> { } /** + * + */ + private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest { + /** */ + private final Object msg; + + /** */ + private final GridNioSession ses; + + /** + * @param ses Session. + * @param msg Message. + */ + WriteRequestSystemImpl(GridNioSession ses, Object msg) { + this.ses = ses; + this.msg = msg; + } + + /** {@inheritDoc} */ + @Override public void messageThread(boolean msgThread) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean messageThread() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return true; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> c) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return null; + } + + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Object message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resetSession(GridNioSession ses) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridNioSession session() { + return ses; + } + + /** {@inheritDoc} */ + @Override public NioOperation operation() { + return NioOperation.REQUIRE_WRITE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(WriteRequestSystemImpl.class, this); + } + } + + /** + * + */ + private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest { + /** */ + private GridNioSession ses; + + /** */ + private final Object msg; + + /** */ + private boolean msgThread; + + /** */ + private final boolean skipRecovery; + + /** */ + private IgniteInClosure<IgniteException> ackC; + + /** + * @param ses Session. + * @param msg Message. + * @param skipRecovery Skip recovery flag. + */ + WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) { + this.ses = ses; + this.msg = msg; + this.skipRecovery = skipRecovery; + } + + /** {@inheritDoc} */ + @Override public void messageThread(boolean msgThread) { + this.msgThread = msgThread; + } + + /** {@inheritDoc} */ + @Override public boolean messageThread() { + return msgThread; + } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return skipRecovery; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> c) { + ackC = c; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + assert msg instanceof Message; + + ((Message)msg).onAckReceived(); + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return ackC; + } + + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Object message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resetSession(GridNioSession ses) { + this.ses = ses; + } + + /** {@inheritDoc} */ + @Override public GridNioSession session() { + return ses; + } + + /** {@inheritDoc} */ + @Override public NioOperation operation() { + return NioOperation.REQUIRE_WRITE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(WriteRequestImpl.class, this); + } + } + + /** * Class for requesting write and session close operations. */ - private static class NioOperationFuture<R> extends GridNioFutureImpl<R> { + private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest, + SessionChangeRequest { /** */ private static final long serialVersionUID = 0L; @@ -2087,11 +2743,7 @@ public class GridNioServer<T> { private NioOperation op; /** Message. */ - @GridToStringExclude - private ByteBuffer msg; - - /** Direct message. */ - private Message commMsg; + private Object msg; /** */ @GridToStringExclude @@ -2153,8 +2805,7 @@ public class GridNioServer<T> { * @param op Requested operation. * @param msg Message. */ - NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, - ByteBuffer msg) { + NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) { assert ses != null; assert op != null; assert op != NioOperation.REGISTER; @@ -2182,51 +2833,36 @@ public class GridNioServer<T> { this.ses = ses; this.op = op; - this.commMsg = commMsg; + this.msg = commMsg; this.skipRecovery = skipRecovery; } - /** - * @return Requested change operation. - */ - private NioOperation operation() { + /** {@inheritDoc} */ + public NioOperation operation() { return op; } - /** - * @return Message. - */ - private ByteBuffer message() { + /** {@inheritDoc} */ + public Object message() { return msg; } - /** - * @return Direct message. - */ - private Message directMessage() { - return commMsg; - } - - /** - * @param ses New session instance. - */ - private void resetSession(GridSelectorNioSessionImpl ses) { - assert commMsg != null; + /** {@inheritDoc} */ + public void resetSession(GridNioSession ses) { + assert msg instanceof Message : msg; - this.ses = ses; + this.ses = (GridSelectorNioSessionImpl)ses; } /** * @return Socket channel for register request. */ - private SocketChannel socketChannel() { + SocketChannel socketChannel() { return sockCh; } - /** - * @return Session for this change request. - */ - private GridSelectorNioSessionImpl session() { + /** {@inheritDoc} */ + public GridSelectorNioSessionImpl session() { return ses; } @@ -2244,21 +2880,21 @@ public class GridNioServer<T> { return meta; } - /** - * Applicable to write futures only. Fails future with corresponding IOException. - */ - private void connectionClosed() { - assert op == NioOperation.REQUIRE_WRITE; - assert ses != null; - - onDone(new IOException("Failed to send message (connection was closed): " + ses)); + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + onDone(e); } /** {@inheritDoc} */ @Override public void onAckReceived() { - assert commMsg != null; + assert msg instanceof Message : msg; - commMsg.onAckReceived(); + ((Message)msg).onAckReceived(); + } + + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + onDone(); } /** {@inheritDoc} */ @@ -2273,6 +2909,59 @@ public class GridNioServer<T> { } /** + * + */ + private static class SessionMoveFuture extends NioOperationFuture<Boolean> { + /** */ + private final int toIdx; + + /** */ + @GridToStringExclude + private SocketChannel movedSockCh; + + /** + * @param ses Session. + * @param toIdx Target worker index. + */ + SessionMoveFuture( + GridSelectorNioSessionImpl ses, + int toIdx + ) { + super(ses, NioOperation.MOVE); + + this.toIdx = toIdx; + } + + /** + * @return Target worker index. + */ + int toIndex() { + return toIdx; + } + + /** + * @return Moved session socket channel. + */ + SocketChannel movedSocketChannel() { + return movedSockCh; + } + + /** + * @param movedSockCh Moved session socket channel. + */ + void movedSocketChannel(SocketChannel movedSockCh) { + assert movedSockCh != null; + + this.movedSockCh = movedSockCh; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionMoveFuture.class, this, super.toString()); + } + } + + /** * Filter forwarding messages from chain's head to this server. */ private class HeadFilter extends GridNioFilterAdapter { @@ -2302,7 +2991,7 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { if (directMode) { boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; @@ -2313,18 +3002,18 @@ public class GridNioServer<T> { queue.offer((ByteBuffer)msg); - SelectionKey key = ((GridSelectorNioSessionImpl)ses).key(); + GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; - if (key.isValid()) - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + if (!ses0.procWrite.get() && ses0.procWrite.compareAndSet(false, true)) + ses0.worker().registerWrite(ses0); return null; } else - return send(ses, (Message)msg); + return send(ses, (Message)msg, fut); } else - return send(ses, (ByteBuffer)msg); + return send(ses, (ByteBuffer)msg, fut); } /** {@inheritDoc} */ @@ -2429,6 +3118,15 @@ public class GridNioServer<T> { /** Message queue size listener. */ private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; + /** Name for threads identification. */ + private String srvName; + + /** */ + private long selectorSpins; + + /** NIO sessions balancing flag. */ + private boolean balancing; + /** * Finishes building the instance. * @@ -2442,6 +3140,8 @@ public class GridNioServer<T> { log, selectorCnt, gridName, + srvName, + selectorSpins, tcpNoDelay, directBuf, byteOrder, @@ -2455,6 +3155,7 @@ public class GridNioServer<T> { writerFactory, skipRecoveryPred, msgQueueLsnr, + balancing, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -2468,6 +3169,16 @@ public class GridNioServer<T> { } /** + * @param balancing NIO sessions balancing flag. + * @return This for chaining. + */ + public Builder<T> balancing(boolean balancing) { + this.balancing = balancing; + + return this; + } + + /** * @param addr Local address. * @return This for chaining. */ @@ -2519,6 +3230,28 @@ public class GridNioServer<T> { } /** + * @param srvName Logical server name for threads identification. + * @return This for chaining. + */ + public Builder<T> serverName(@Nullable String srvName) { + this.srvName = srvName; + + return this; + } + + /** + * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before + * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. + * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. + * @return This for chaining. + */ + public Builder<T> selectorSpins(long selectorSpins) { + this.selectorSpins = selectorSpins; + + return this; + } + + /** * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets. * @return This for chaining. */ @@ -2678,4 +3411,225 @@ public class GridNioServer<T> { return this; } } + + /** + * + */ + private class SizeBasedBalancer implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long lastBalance; + + /** */ + private final long balancePeriod; + + /** + * @param balancePeriod Period. + */ + SizeBasedBalancer(long balancePeriod) { + this.balancePeriod = balancePeriod; + } + + /** {@inheritDoc} */ + @Override public void run() { + long now = U.currentTimeMillis(); + + if (lastBalance + balancePeriod < now) { + lastBalance = now; + + long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1; + int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1; + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + int sesCnt = worker.workerSessions.size(); + + if (i % 2 == 0) { + // Reader. + long bytesRcvd0 = worker.bytesRcvd0; + + if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && sesCnt > 1) { + maxRcvd0 = bytesRcvd0; + maxRcvdIdx = i; + } + + if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) { + minRcvd0 = bytesRcvd0; + minRcvdIdx = i; + } + } + else { + // Writer. + long bytesSent0 = worker.bytesSent0; + + if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && sesCnt > 1) { + maxSent0 = bytesSent0; + maxSentIdx = i; + } + + if (minSent0 == -1 || bytesSent0 < minSent0) { + minSent0 = bytesSent0; + minSentIdx = i; + } + } + } + + if (log.isDebugEnabled()) + log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx + + ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx + + ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx + + ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']'); + + if (maxSent0 != -1 && minSent0 != -1) { + GridSelectorNioSessionImpl ses = null; + + long sentDiff = maxSent0 - minSent0; + long delta = sentDiff; + double threshold = sentDiff * 0.9; + + GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = + clientWorkers.get(maxSentIdx).workerSessions; + + for (GridSelectorNioSessionImpl ses0 : sessions) { + long bytesSent0 = ses0.bytesSent0(); + + if (bytesSent0 < threshold && + (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) { + ses = ses0; + delta = U.safeAbs(bytesSent0 - sentDiff / 2); + } + } + + if (ses != null) { + if (log.isDebugEnabled()) + log.debug("Will move session to less loaded writer [ses=" + ses + + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); + + moveSession(ses, maxSentIdx, minSentIdx); + } + else { + if (log.isDebugEnabled()) + log.debug("Unable to find session to move for writers."); + } + } + + if (maxRcvd0 != -1 && minRcvd0 != -1) { + GridSelectorNioSessionImpl ses = null; + + long rcvdDiff = maxRcvd0 - minRcvd0; + long delta = rcvdDiff; + double threshold = rcvdDiff * 0.9; + + GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = + clientWorkers.get(maxRcvdIdx).workerSessions; + + for (GridSelectorNioSessionImpl ses0 : sessions) { + long bytesRcvd0 = ses0.bytesReceived0(); + + if (bytesRcvd0 < threshold && + (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) { + ses = ses0; + delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2); + } + } + + if (ses != null) { + if (log.isDebugEnabled()) + log.debug("Will move session to less loaded reader [ses=" + ses + + ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']'); + + moveSession(ses, maxRcvdIdx, minRcvdIdx); + } + else { + if (log.isDebugEnabled()) + log.debug("Unable to find session to move for readers."); + } + } + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + worker.reset0(); + } + } + } + } + + /** + * For tests only. + */ + @SuppressWarnings("unchecked") + private class RandomBalancer implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int w1 = rnd.nextInt(clientWorkers.size()); + + if (clientWorkers.get(w1).workerSessions.isEmpty()) + return; + + int w2 = rnd.nextInt(clientWorkers.size()); + + while (w2 == w1) + w2 = rnd.nextInt(clientWorkers.size()); + + GridNioSession ses = randomSession(clientWorkers.get(w1)); + + if (ses != null) { + log.info("Move session [from=" + w1 + + ", to=" + w2 + + ", ses=" + ses + ']'); + + moveSession(ses, w1, w2); + } + } + + /** + * @param worker Worker. + * @return NIO session. + */ + private GridNioSession randomSession(GridNioServer.AbstractNioClientWorker worker) { + Collection<GridNioSession> sessions = worker.workerSessions; + + int size = sessions.size(); + + if (size == 0) + return null; + + int idx = ThreadLocalRandom.current().nextInt(size); + + Iterator<GridNioSession> it = sessions.iterator(); + + int cnt = 0; + + while (it.hasNext()) { + GridNioSession ses = it.next(); + + if (cnt == idx) + return ses; + } + + return null; + } + + } + + /** + * + */ + interface SessionChangeRequest { + GridNioSession session(); + + /** + * @return Requested change operation. + */ + NioOperation operation(); + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java index e4a7225..c1b60ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; +import org.apache.ignite.IgniteCheckedException; import org.jetbrains.annotations.Nullable; /** @@ -105,6 +106,11 @@ public interface GridNioSession { public GridNioFuture<?> send(Object msg); /** + * @param msg Message to be sent. + */ + public void sendNoFuture(Object msg) throws IgniteCheckedException; + + /** * Gets metadata associated with specified key. * * @param key Key to look up. @@ -158,10 +164,25 @@ public interface GridNioSession { /** * @param recoveryDesc Recovery descriptor. */ - public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc); + public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc); + + /** + * @param recoveryDesc Recovery descriptor. + */ + public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc); /** * @return Recovery descriptor if recovery is supported, {@code null otherwise.} */ - @Nullable public GridNioRecoveryDescriptor recoveryDescriptor(); + @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor(); + + /** + * @return Recovery descriptor if recovery is supported, {@code null otherwise.} + */ + @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor(); + + /** + * @param msg System message to send. + */ + public void systemMessage(Object msg); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java index 0bcfe64..7424531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java @@ -51,6 +51,12 @@ public class GridNioSessionImpl implements GridNioSession { /** Received bytes counter. */ private volatile long bytesRcvd; + /** Sent bytes since last NIO sessions balancing. */ + private volatile long bytesSent0; + + /** Received bytes since last NIO sessions balancing. */ + private volatile long bytesRcvd0; + /** Last send schedule timestamp. */ private volatile long sndSchedTime; @@ -99,7 +105,7 @@ public class GridNioSessionImpl implements GridNioSession { try { resetSendScheduleTime(); - return chain().onSessionWrite(this, msg); + return chain().onSessionWrite(this, msg, true); } catch (IgniteCheckedException e) { close(); @@ -109,6 +115,18 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + try { + chain().onSessionWrite(this, msg, false); + } + catch (IgniteCheckedException e) { + close(); + + throw e; + } + } + + /** {@inheritDoc} */ @Override public GridNioFuture<?> resumeReads() { try { return chain().onResumeReads(this); @@ -163,6 +181,28 @@ public class GridNioSessionImpl implements GridNioSession { return bytesRcvd; } + /** + * @return Sent bytes since last NIO sessions balancing. + */ + public long bytesSent0() { + return bytesSent0; + } + + /** + * @return Received bytes since last NIO sessions balancing. + */ + public long bytesReceived0() { + return bytesRcvd0; + } + + /** + * + */ + public void reset0() { + bytesSent0 = 0; + bytesRcvd0 = 0; + } + /** {@inheritDoc} */ @Override public long createTime() { return createTime; @@ -240,6 +280,7 @@ public class GridNioSessionImpl implements GridNioSession { */ public void bytesSent(int cnt) { bytesSent += cnt; + bytesSent0 += cnt; lastSndTime = U.currentTimeMillis(); } @@ -253,6 +294,7 @@ public class GridNioSessionImpl implements GridNioSession { */ public void bytesReceived(int cnt) { bytesRcvd += cnt; + bytesRcvd0 += cnt; lastRcvTime = U.currentTimeMillis(); } @@ -296,17 +338,32 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ - @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() { + return null; + } + + /** {@inheritDoc} */ + @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { return null; } /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioSessionImpl.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java new file mode 100644 index 0000000..62985ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import java.util.Collection; +import java.util.List; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +interface GridNioWorker { + /** + * @param req Change request. + */ + public void offer(GridNioServer.SessionChangeRequest req); + + /** + * @param reqs Change requests. + */ + public void offer(Collection<GridNioServer.SessionChangeRequest> reqs); + + /** + * @param ses Session. + * @return Session state change requests. + */ + @Nullable public List<GridNioServer.SessionChangeRequest> clearSessionRequests(GridNioSession ses); + + /** + * @param ses Session to register write interest for. + */ + public void registerWrite(GridSelectorNioSessionImpl ses); +}
