http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 63c9845..66f9176 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -37,17 +39,14 @@ import org.jsr166.ConcurrentLinkedDeque8; */ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Pending write requests. */ - private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>(); + private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>(); /** Selection key associated with this session. */ @GridToStringExclude private SelectionKey key; - /** Worker index for server */ - private final int selectorIdx; - - /** Size counter. */ - private final AtomicInteger queueSize = new AtomicInteger(); + /** Current worker thread. */ + private volatile GridNioWorker worker; /** Semaphore. */ @GridToStringExclude @@ -59,17 +58,29 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Read buffer. */ private ByteBuffer readBuf; - /** Recovery data. */ - private GridNioRecoveryDescriptor recovery; + /** Incoming recovery data. */ + private GridNioRecoveryDescriptor inRecovery; + + /** Outgoing recovery data. */ + private GridNioRecoveryDescriptor outRecovery; /** Logger. */ private final IgniteLogger log; + /** */ + private List<GridNioServer.SessionChangeRequest> pendingStateChanges; + + /** */ + final AtomicBoolean procWrite = new AtomicBoolean(); + + /** */ + private Object sysMsg; + /** * Creates session instance. * * @param log Logger. - * @param selectorIdx Selector index for this session. + * @param worker NIO worker thread. * @param filterChain Filter chain that will handle requests. * @param locAddr Local address. * @param rmtAddr Remote address. @@ -80,7 +91,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { */ GridSelectorNioSessionImpl( IgniteLogger log, - int selectorIdx, + GridNioWorker worker, GridNioFilterChain filterChain, InetSocketAddress locAddr, InetSocketAddress rmtAddr, @@ -91,7 +102,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { ) { super(filterChain, locAddr, rmtAddr, accepted); - assert selectorIdx >= 0; + assert worker != null; assert sndQueueLimit >= 0; assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address."; @@ -101,7 +112,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { this.log = log; - this.selectorIdx = selectorIdx; + this.worker = worker; sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null; @@ -119,12 +130,19 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** + * @return Worker. + */ + GridNioWorker worker() { + return worker; + } + + /** * Sets selection key for this session. * * @param key Selection key. */ void key(SelectionKey key) { - assert this.key == null; + assert key != null; this.key = key; } @@ -151,10 +169,88 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** - * @return Selector index. + * @param from Current session worker. + * @param fut Move future. + * @return {@code True} if session move was scheduled. + */ + boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) { + synchronized (this) { + if (log.isDebugEnabled()) + log.debug("Offered move [ses=" + this + ", fut=" + fut + ']'); + + GridNioWorker worker0 = worker; + + if (worker0 != from) + return false; + + worker.offer(fut); + } + + return true; + } + + /** + * @param fut Future. + */ + void offerStateChange(GridNioServer.SessionChangeRequest fut) { + synchronized (this) { + if (log.isDebugEnabled()) + log.debug("Offered move [ses=" + this + ", fut=" + fut + ']'); + + GridNioWorker worker0 = worker; + + if (worker0 == null) { + if (pendingStateChanges == null) + pendingStateChanges = new ArrayList<>(); + + pendingStateChanges.add(fut); + } + else + worker0.offer(fut); + } + } + + /** + * @param moveFrom Current session worker. */ - int selectorIndex() { - return selectorIdx; + void startMoveSession(GridNioWorker moveFrom) { + synchronized (this) { + assert this.worker == moveFrom; + + if (log.isDebugEnabled()) + log.debug("Started moving [ses=" + this + ", from=" + moveFrom + ']'); + + List<GridNioServer.SessionChangeRequest> sesReqs = moveFrom.clearSessionRequests(this); + + worker = null; + + if (sesReqs != null) { + if (pendingStateChanges == null) + pendingStateChanges = new ArrayList<>(); + + pendingStateChanges.addAll(sesReqs); + } + } + } + + /** + * @param moveTo New session worker. + */ + void finishMoveSession(GridNioWorker moveTo) { + synchronized (this) { + assert worker == null; + + if (log.isDebugEnabled()) + log.debug("Finishing moving [ses=" + this + ", to=" + moveTo + ']'); + + worker = moveTo; + + if (pendingStateChanges != null) { + moveTo.offer(pendingStateChanges); + + pendingStateChanges = null; + } + } } /** @@ -163,14 +259,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request. * @return Updated size of the queue. */ - int offerSystemFuture(GridNioFuture<?> writeFut) { + int offerSystemFuture(SessionWriteRequest writeFut) { writeFut.messageThread(true); boolean res = queue.offerFirst(writeFut); assert res : "Future was not added to queue"; - return queueSize.incrementAndGet(); + return queue.sizex(); } /** @@ -183,7 +279,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request to add. * @return Updated size of the queue. */ - int offerFuture(GridNioFuture<?> writeFut) { + int offerFuture(SessionWriteRequest writeFut) { boolean msgThread = GridNioBackPressureControl.threadProcessingMessage(); if (sem != null && !msgThread) @@ -195,47 +291,41 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { assert res : "Future was not added to queue"; - return queueSize.incrementAndGet(); + return queue.sizex(); } /** * @param futs Futures to resend. */ - void resend(Collection<GridNioFuture<?>> futs) { + void resend(Collection<SessionWriteRequest> futs) { assert queue.isEmpty() : queue.size(); boolean add = queue.addAll(futs); assert add; - - boolean set = queueSize.compareAndSet(0, futs.size()); - - assert set; } /** * @return Message that is in the head of the queue, {@code null} if queue is empty. */ - @Nullable GridNioFuture<?> pollFuture() { - GridNioFuture<?> last = queue.poll(); + @Nullable SessionWriteRequest pollFuture() { + SessionWriteRequest last = queue.poll(); if (last != null) { - queueSize.decrementAndGet(); - if (sem != null && !last.messageThread()) sem.release(); - if (recovery != null) { - if (!recovery.add(last)) { + if (outRecovery != null) { + if (!outRecovery.add(last)) { LT.warn(log, "Unacknowledged messages queue size overflow, will attempt to reconnect " + "[remoteAddr=" + remoteAddress() + - ", queueLimit=" + recovery.queueLimit() + ']'); + ", queueLimit=" + outRecovery.queueLimit() + ']'); if (log.isDebugEnabled()) log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " + "[remoteAddr=" + remoteAddress() + - ", queueSize=" + recovery.messagesFutures().size() + - ", queueLimit=" + recovery.queueLimit() + ']'); + ", queueSize=" + outRecovery.messagesRequests().size() + + ", queueLimit=" + outRecovery.queueLimit() + ']'); close(); } @@ -249,7 +339,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param fut Future. * @return {@code True} if future was removed from queue. */ - boolean removeFuture(GridNioFuture<?> fut) { + boolean removeFuture(SessionWriteRequest fut) { assert closed(); return queue.removeLastOccurrence(fut); @@ -261,35 +351,49 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @return Number of write requests. */ int writeQueueSize() { - return queueSize.get(); + return queue.sizex(); } /** * @return Write requests. */ - Collection<GridNioFuture<?>> writeQueue() { + Collection<SessionWriteRequest> writeQueue() { return queue; } /** {@inheritDoc} */ - @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { assert recoveryDesc != null; - recovery = recoveryDesc; + outRecovery = recoveryDesc; } /** {@inheritDoc} */ - @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { - return recovery; + @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() { + return outRecovery; + } + + /** {@inheritDoc} */ + @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + assert recoveryDesc != null; + + inRecovery = recoveryDesc; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { + return inRecovery; } /** {@inheritDoc} */ @Override public <T> T addMeta(int key, @Nullable T val) { - if (val instanceof GridNioRecoveryDescriptor) { - recovery = (GridNioRecoveryDescriptor)val; + if (!accepted() && val instanceof GridNioRecoveryDescriptor) { + outRecovery = (GridNioRecoveryDescriptor)val; + + if (!outRecovery.pairedConnections()) + inRecovery = outRecovery; - if (!accepted()) - recovery.connected(); + outRecovery.onConnected(); return null; } @@ -313,6 +417,31 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** {@inheritDoc} */ + @Override public void systemMessage(Object sysMsg) { + this.sysMsg = sysMsg; + } + + /** + * @return {@code True} if have pending system message to send. + */ + boolean hasSystemMessage() { + return sysMsg != null; + } + + /** + * Gets and clears pending system message. + * + * @return Pending system message. + */ + Object systemMessage() { + Object ret = sysMsg; + + sysMsg = null; + + return ret; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridSelectorNioSessionImpl.class, this, super.toString()); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index ebe86fb..d941bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien private final MessageFormatter formatter; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. @@ -55,14 +56,16 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien * @param formatter Message formatter. * @throws IgniteCheckedException If failed. */ - public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, + public GridShmemCommunicationClient( + int connIdx, + GridNioMetricsListener metricsLsnr, int port, long connTimeout, IgniteLogger log, MessageFormatter formatter) throws IgniteCheckedException { - super(metricsLsnr); + super(connIdx, metricsLsnr); assert metricsLsnr != null; assert port > 0 && port < 0xffff; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 5fe521d..3397772 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -45,11 +45,16 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie private final IgniteLogger log; /** + * @param connIdx Connection index. * @param ses Session. * @param log Logger. */ - public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) { - super(null); + public GridTcpNioCommunicationClient( + int connIdx, + GridNioSession ses, + IgniteLogger log + ) { + super(connIdx, null); assert ses != null; assert log != null; @@ -104,40 +109,36 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> c) throws IgniteCheckedException { - // Node ID is never provided in asynchronous send mode. - assert nodeId == null; + try { + // Node ID is never provided in asynchronous send mode. + assert nodeId == null; - if (closure != null) - ses.addMeta(ACK_CLOSURE.ordinal(), closure); + if (c != null) + ses.addMeta(ACK_CLOSURE.ordinal(), c); - GridNioFuture<?> fut = ses.send(msg); + ses.sendNoFuture(msg); - if (fut.isDone()) { - try { - fut.get(); - } - catch (IgniteCheckedException e) { - if (closure != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); + if (c != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); + } + catch (IgniteCheckedException e) { + if (c != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); - if (log.isDebugEnabled()) - log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); - if (e.getCause() instanceof IOException) { - ses.close(); + if (e.getCause() instanceof IOException) { + ses.close(); - return true; - } - else - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); + return true; } + else + throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); } - if (closure != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); - return false; } @@ -159,4 +160,4 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie @Override public String toString() { return S.toString(GridTcpNioCommunicationClient.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java new file mode 100644 index 0000000..508c791 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.lang.IgniteInClosure; + +/** + * + */ +public interface SessionWriteRequest { + /** + * Sets flag indicating that message send future was created in thread that was processing a message. + * + * @param msgThread {@code True} if future was created in thread that is processing message. + */ + public void messageThread(boolean msgThread); + + /** + * @return {@code True} if future was created in thread that was processing message. + */ + public boolean messageThread(); + + /** + * @return {@code True} if skip recovery for this operation. + */ + public boolean skipRecovery(); + + /** + * Sets ack closure which will be applied when ack received. + * + * @param c Ack closure. + */ + public void ackClosure(IgniteInClosure<IgniteException> c); + + /** + * The method will be called when ack received. + */ + public void onAckReceived(); + + /** + * @return Ack closure. + */ + public IgniteInClosure<IgniteException> ackClosure(); + + /** + * @return Session. + */ + public GridNioSession session(); + + /** + * @param ses Session. + */ + public void resetSession(GridNioSession ses); + + /** + * + */ + public void onError(Exception e); + + /** + * @return Message. + */ + public Object message(); + + /** + * + */ + public void onMessageWritten(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index d6f9d10..8ed7db0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -282,9 +282,13 @@ public class GridNioSslFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { if (directMode) - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); ByteBuffer input = checkMessage(ses, msg); @@ -441,4 +445,4 @@ public class GridNioSslFilter extends GridNioFilterAdapter { return (ByteBuffer)msg; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java index eb8dad4..269e8b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java @@ -437,7 +437,7 @@ class GridNioSslHandler extends ReentrantLock { while (!deferredWriteQueue.isEmpty()) { WriteRequest req = deferredWriteQueue.poll(); - req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer())); + req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true)); } } @@ -482,7 +482,7 @@ class GridNioSslHandler extends ReentrantLock { ByteBuffer cp = copy(outNetBuf); - return parent.proceedSessionWrite(ses, cp); + return parent.proceedSessionWrite(ses, cp, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java index b29d7cd..86aa7a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java @@ -598,4 +598,4 @@ public class GridToStringBuilder { return cd; } -} \ No newline at end of file +}
