io optimizations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97176b4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97176b4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97176b4 Branch: refs/heads/ignite-comm-balance Commit: e97176b425bdb07987d8e00068f0c4507db33f26 Parents: 9b1c3a5 Author: sboikov <[email protected]> Authored: Wed Oct 5 11:50:19 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 5 11:50:19 2016 +0300 ---------------------------------------------------------------------- .../rest/protocols/tcp/MockNioSession.java | 11 + .../apache/ignite/internal/IgniteKernal.java | 16 +- .../managers/communication/GridIoManager.java | 131 +++- .../internal/util/ipc/IpcToNioAdapter.java | 2 +- .../nio/GridConnectionBytesVerifyFilter.java | 4 +- .../util/nio/GridNioAsyncNotifyFilter.java | 4 +- .../internal/util/nio/GridNioCodecFilter.java | 6 +- .../ignite/internal/util/nio/GridNioFilter.java | 10 +- .../internal/util/nio/GridNioFilterAdapter.java | 4 +- .../internal/util/nio/GridNioFilterChain.java | 8 +- .../ignite/internal/util/nio/GridNioFuture.java | 4 +- .../util/nio/GridNioRecoveryDescriptor.java | 66 +- .../ignite/internal/util/nio/GridNioServer.java | 695 +++++++++++++------ .../internal/util/nio/GridNioSession.java | 11 + .../internal/util/nio/GridNioSessionImpl.java | 19 +- .../util/nio/GridSelectorNioSessionImpl.java | 44 +- .../util/nio/GridTcpNioCommunicationClient.java | 44 +- .../internal/util/nio/SessionWriteRequest.java | 85 +++ .../internal/util/nio/ssl/GridNioSslFilter.java | 4 +- .../util/nio/ssl/GridNioSslHandler.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 143 ++-- .../nio/impl/GridNioFilterChainSelfTest.java | 18 +- .../io/IgniteIoTestAbstractBenchmark.java | 61 ++ .../io/IgniteIoTestSendAllBenchmark.java | 32 + .../io/IgniteIoTestSendRandomBenchmark.java | 35 + 25 files changed, 1075 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java index e848653..9bc4e7f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp; import java.net.InetSocketAddress; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFuture; @@ -111,6 +112,11 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Override public GridNioFuture<Object> resumeReads() { return null; } @@ -149,4 +155,9 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { return null; } + + /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + // No-op. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 65740a9..065f2f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3422,11 +3422,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param node Node. * @param payload Message payload. - * @param processFromNioThread If {@code true} message is processed from NIO thread. + * @param procFromNioThread If {@code true} message is processed from NIO thread. * @return Response future. */ - public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) { - return ctx.io().sendIoTest(node, payload, processFromNioThread); + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(node, payload, procFromNioThread); + } + + /** + * @param nodes Nodes. + * @param payload Message payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(nodes, payload, procFromNioThread); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index d6a2835..b465919 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -215,7 +215,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private boolean stopping; /** */ - private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>> ioTestMap = new AtomicReference<>(); + private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>(); /** */ private final AtomicLong ioTestId = new AtomicLong(); @@ -384,45 +384,72 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } else { - GridFutureAdapter fut = ioTestMap().remove(msg0.id()); + IoTestFuture fut = ioTestMap().get(msg0.id()); - if (fut == null) { + if (fut == null) U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']'); - - return; - } - - fut.onDone(); + else + fut.onResponse(); } } }); } /** - * @param node Node. + * @param nodes Nodes. * @param payload Payload. - * @param processFromNioThread If {@code true} message is processed from NIO thread. + * @param procFromNioThread If {@code true} message is processed from NIO thread. * @return Response future. */ - public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) { - if (ctx.localNodeId().equals(node.id())) - throw new IllegalArgumentException(); - + public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) { long id = ioTestId.getAndIncrement(); - GridFutureAdapter fut = new GridFutureAdapter(); + IoTestFuture fut = new IoTestFuture(id, nodes.size()); + + IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + + msg.processFromNioThread(procFromNioThread); ioTestMap().put(id, fut); - try { - IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + try { + send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + ioTestMap().remove(msg.id()); + + return new GridFinishedFuture(e); + } + } + + return fut; + } + + /** + * @param node Node. + * @param payload Payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + long id = ioTestId.getAndIncrement(); - msg.processFromNioThread(processFromNioThread); + IoTestFuture fut = new IoTestFuture(id, 1); + IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + + msg.processFromNioThread(procFromNioThread); + + ioTestMap().put(id, fut); + + try { send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL); } catch (IgniteCheckedException e) { - ioTestMap().remove(id); + ioTestMap().remove(msg.id()); return new GridFinishedFuture(e); } @@ -433,8 +460,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * @return IO test futures map. */ - private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() { - ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get(); + private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() { + ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get(); if (map == null) { if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>())) @@ -698,16 +725,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - // Check discovery. - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']'); - - return; // We can't receive messages from non-discovered ones. - } - if (msg.topic() == null) { int topicOrd = msg.topicOrdinal(); @@ -2720,4 +2737,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(DelayedMessage.class, this, super.toString()); } } + + /** + * + */ + private class IoTestFuture extends GridFutureAdapter<Object> { + /** */ + private final long id; + + /** */ + private int cntr; + + /** + * @param id ID. + * @param cntr Counter. + */ + IoTestFuture(long id, int cntr) { + assert cntr > 0 : cntr; + + this.id = id; + this.cntr = cntr; + } + + /** + * + */ + void onResponse() { + boolean complete; + + synchronized (this) { + complete = --cntr == 0; + } + + if (complete) + onDone(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + ioTestMap().remove(id); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IoTestFuture.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 6820dc7..d108b56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { assert ses == IpcToNioAdapter.this.ses; return send((Message)msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java index 13d7ca7..02a637d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java @@ -67,8 +67,8 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java index 9925d2e..8aa113a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java @@ -107,8 +107,8 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java index a2f543d..f7a3022 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java @@ -76,15 +76,15 @@ public class GridNioCodecFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { // No encoding needed in direct mode. if (directMode) - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); try { ByteBuffer res = parser.encode(ses, msg); - return proceedSessionWrite(ses, res); + return proceedSessionWrite(ses, res, fut); } catch (IOException e) { throw new GridNioException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java index 5f88b1f..f126cd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java @@ -105,10 +105,11 @@ public interface GridNioFilter { * * @param ses Session instance. * @param msg Message to send. - * @return Write future. + * @param fut {@code True} if write future should be created. + * @return Write future or {@code null}. * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter. */ - public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException; + public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException; /** * Forwards session close request to the next logical filter in filter chain. @@ -149,10 +150,11 @@ public interface GridNioFilter { * * @param ses Session on which message should be written. * @param msg Message being written. - * @return Write future. + * @param fut {@code True} if write future should be created. + * @return Write future or {@code null}. * @throws GridNioException If GridNioException occurred while handling event. */ - public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException; + public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException; /** * Invoked when a new messages received. http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java index 18ab1b2..0a825bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java @@ -108,10 +108,10 @@ public abstract class GridNioFilterAdapter implements GridNioFilter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { checkNext(); - return nextFilter.onSessionWrite(ses, msg); + return nextFilter.onSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java index 8a43e29..6415b46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java @@ -181,8 +181,8 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { * @return Send future. * @throws IgniteCheckedException If IgniteCheckedException occurred while handling event. */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return tail.onSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + return tail.onSessionWrite(ses, msg, fut); } /** @@ -255,9 +255,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index b02acc8..6c0c9c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { /** * Sets ack closure which will be applied when ack received. * - * @param closure Ack closure. + * @param c Ack closure. */ - public void ackClosure(IgniteInClosure<IgniteException> closure); + public void ackClosure(IgniteInClosure<IgniteException> c); /** * The method will be called when ack received. http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 4598eef..dfc78be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -36,8 +36,8 @@ public class GridNioRecoveryDescriptor { /** Number of acknowledged messages. */ private long acked; - /** Unacknowledged message futures. */ - private final ArrayDeque<GridNioFuture<?>> msgFuts; + /** Unacknowledged messages. */ + private final ArrayDeque<SessionWriteRequest> msgReqs; /** Number of messages to resend. */ private int resendCnt; @@ -87,7 +87,7 @@ public class GridNioRecoveryDescriptor { assert !node.isLocal() : node; assert queueLimit > 0; - msgFuts = new ArrayDeque<>(queueLimit); + msgReqs = new ArrayDeque<>(queueLimit); this.queueLimit = queueLimit; this.node = node; @@ -155,19 +155,19 @@ public class GridNioRecoveryDescriptor { } /** - * @param fut NIO future. + * @param req Write request. * @return {@code False} if queue limit is exceeded. */ - public boolean add(GridNioFuture<?> fut) { - assert fut != null; + public boolean add(SessionWriteRequest req) { + assert req != null; - if (!fut.skipRecovery()) { + if (!req.skipRecovery()) { if (resendCnt == 0) { - msgFuts.addLast(fut); + msgReqs.addLast(req); sentCnt++; - return msgFuts.size() < queueLimit; + return msgReqs.size() < queueLimit; } else resendCnt--; @@ -182,17 +182,15 @@ public class GridNioRecoveryDescriptor { public void ackReceived(long rcvCnt) { if (log.isDebugEnabled()) log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt + - ", msgFuts=" + msgFuts.size() + ']'); + ", msgReqs=" + msgReqs.size() + ']'); while (acked < rcvCnt) { - GridNioFuture<?> fut = msgFuts.pollFirst(); + SessionWriteRequest fut = msgReqs.pollFirst(); assert fut != null : "Missed message future [rcvCnt=" + rcvCnt + ", acked=" + acked + ", desc=" + this + ']'; - assert fut.isDone() : fut; - if (fut.ackClosure() != null) fut.ackClosure().apply(null); @@ -215,7 +213,7 @@ public class GridNioRecoveryDescriptor { * @return {@code False} if descriptor is reserved. */ public boolean onNodeLeft() { - GridNioFuture<?>[] futs = null; + SessionWriteRequest[] reqs = null; synchronized (this) { nodeLeft = true; @@ -223,15 +221,15 @@ public class GridNioRecoveryDescriptor { if (reserved) return false; - if (!msgFuts.isEmpty()) { - futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + if (!msgReqs.isEmpty()) { + reqs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]); - msgFuts.clear(); + msgReqs.clear(); } } - if (futs != null) - completeOnNodeLeft(futs); + if (reqs != null) + notifyOnNodeLeft(reqs); return true; } @@ -239,8 +237,8 @@ public class GridNioRecoveryDescriptor { /** * @return Message futures for unacknowledged messages. */ - public Deque<GridNioFuture<?>> messagesFutures() { - return msgFuts; + public Deque<SessionWriteRequest> messagesFutures() { + return msgReqs; } /** @@ -278,7 +276,7 @@ public class GridNioRecoveryDescriptor { if (!nodeLeft) ackReceived(rcvCnt); - resendCnt = msgFuts.size(); + resendCnt = msgReqs.size(); } } @@ -337,7 +335,7 @@ public class GridNioRecoveryDescriptor { * */ public void release() { - GridNioFuture<?>[] futs = null; + SessionWriteRequest[] futs = null; synchronized (this) { connected = false; @@ -357,15 +355,15 @@ public class GridNioRecoveryDescriptor { notifyAll(); } - if (nodeLeft && !msgFuts.isEmpty()) { - futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + if (nodeLeft && !msgReqs.isEmpty()) { + futs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]); - msgFuts.clear(); + msgReqs.clear(); } } if (futs != null) - completeOnNodeLeft(futs); + notifyOnNodeLeft(futs); } /** @@ -426,16 +424,16 @@ public class GridNioRecoveryDescriptor { } /** - * @param futs Futures to complete. + * @param reqs Requests to notify about error. */ - private void completeOnNodeLeft(GridNioFuture<?>[] futs) { - for (GridNioFuture<?> msg : futs) { - IOException e = new IOException("Failed to send message, node has left: " + node.id()); + private void notifyOnNodeLeft(SessionWriteRequest[] reqs) { + IOException e = new IOException("Failed to send message, node has left: " + node.id()); - ((GridNioFutureImpl)msg).onDone(e); + for (SessionWriteRequest req : reqs) { + req.onError(e); - if (msg.ackClosure() != null) - msg.ackClosure().apply(new IgniteException(e)); + if (req.ackClosure() != null) + req.ackClosure().apply(new IgniteException(e)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index ba8ae50..281d985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -412,60 +412,87 @@ public class GridNioServer<T> { /** * @param ses Session. * @param msg Message. + * @param createFut {@code True} if future should be created. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) { - assert ses instanceof GridSelectorNioSessionImpl; + GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException { + assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + if (createFut) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); - send0(impl, fut, false); + send0(impl, fut, false); - return fut; + return fut; + } + else { + SessionWriteRequest req = new WriteRequestImpl(ses, msg, true); + + send0(impl, req, false); + + return null; + } } /** * @param ses Session. * @param msg Message. + * @param createFut {@code True} if future should be created. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, Message msg) { + GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); + if (createFut) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); - send0(impl, fut, false); + send0(impl, fut, false); - return fut; + return fut; + } + else { + SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg)); + + send0(impl, req, false); + + return null; + } } /** * @param ses Session. - * @param fut Future. + * @param req Request. * @param sys System message flag. + * @throws IgniteCheckedException If session was closed. */ - private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) { + private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException { assert ses != null; - assert fut != null; + assert req != null; - int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req); IgniteInClosure<IgniteException> ackC; if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) - fut.ackClosure(ackC); + req.ackClosure(ackC); if (ses.closed()) { - if (ses.removeFuture(fut)) - fut.connectionClosed(); + if (ses.removeFuture(req)) { + IOException err = new IOException("Failed to send message (connection was closed): " + ses); + + req.onError(err); + + if (!(req instanceof GridNioFuture)) + throw new IgniteCheckedException(err); + } } else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) - clientWorkers.get(ses.selectorIndex()).offer(fut); + clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req); if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -476,10 +503,10 @@ public class GridNioServer<T> { * * @param ses Session. * @param msg Message. - * @return Future. + * @throws IgniteCheckedException If session was closed. */ - public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) { - return sendSystem(ses, msg, null); + public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException { + sendSystem(ses, msg, null); } /** @@ -488,27 +515,30 @@ public class GridNioServer<T> { * @param ses Session. * @param msg Message. * @param lsnr Future listener notified from the session thread. - * @return Future. + * @throws IgniteCheckedException If session was closed. */ - public GridNioFuture<?> sendSystem(GridNioSession ses, + public void sendSystem(GridNioSession ses, Message msg, - @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) { + @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); - if (lsnr != null) { + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); + fut.listen(lsnr); assert !fut.isDone(); - } - send0(impl, fut, true); + send0(impl, fut, true); + } + else { + SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); - return fut; + send0(impl, req, true); + } } /** @@ -520,25 +550,25 @@ public class GridNioServer<T> { GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor(); if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) { - Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures(); + Deque<SessionWriteRequest> futs = recoveryDesc.messagesFutures(); if (log.isDebugEnabled()) log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']'); GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; - GridNioFuture<?> fut0 = futs.iterator().next(); + SessionWriteRequest fut0 = futs.iterator().next(); - for (GridNioFuture<?> fut : futs) { + for (SessionWriteRequest fut : futs) { fut.messageThread(true); - ((NioOperationFuture)fut).resetSession(ses0); + fut.resetSession(ses0); } ses0.resend(futs); // Wake up worker. - clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0)); } } @@ -554,7 +584,7 @@ public class GridNioServer<T> { * @param op Operation. * @return Future for operation. */ - GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) { + private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) { assert ses instanceof GridSelectorNioSessionImpl; assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ; @@ -835,21 +865,30 @@ public class GridNioServer<T> { while (true) { ByteBuffer buf = ses.removeMeta(BUF_META_KEY); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); // Check if there were any pending data from previous writes. if (buf == null) { assert req == null; - req = (NioOperationFuture<?>)ses.pollFuture(); + req = ses.pollFuture(); if (req == null) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (ses.procWrite.get()) { + boolean set = ses.procWrite.compareAndSet(true, false); + + assert set; + + if (ses.writeQueue().isEmpty()) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + else + ses.procWrite.set(true); + } break; } - buf = req.message(); + buf = (ByteBuffer)req.message(); } if (!skipWrite) { @@ -884,7 +923,7 @@ public class GridNioServer<T> { // Message was successfully written. assert req != null; - req.onDone(); + req.onMessageWritten(); } } } @@ -964,6 +1003,12 @@ public class GridNioServer<T> { readBuf.compact(); else readBuf.clear(); + + if (ses.hasSystemMessage() && !ses.procWrite.get()) { + ses.procWrite.set(true); + + registerWrite(ses); + } } catch (IgniteCheckedException e) { close(ses, e); @@ -1036,16 +1081,29 @@ public class GridNioServer<T> { if (ses.meta(WRITE_BUF_LIMIT) != null) buf.limit((int)ses.meta(WRITE_BUF_LIMIT)); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); while (true) { if (req == null) { - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); - if (req == null && buf.position() == 0) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + if (req == null) { + req = ses.pollFuture(); - break; + if (req == null && buf.position() == 0) { + if (ses.procWrite.get()) { + boolean set = ses.procWrite.compareAndSet(true, false); + + assert set; + + if (ses.writeQueue().isEmpty()) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + else + ses.procWrite.set(true); + } + + break; + } } } @@ -1053,7 +1111,7 @@ public class GridNioServer<T> { boolean finished = false; if (req != null) { - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1068,14 +1126,17 @@ public class GridNioServer<T> { // Fill up as many messages as possible to write buffer. while (finished) { - req.onDone(); + req.onMessageWritten(); + + req = systemMessage(ses); - req = (NioOperationFuture<?>)ses.pollFuture(); + if (req == null) + req = ses.pollFuture(); if (req == null) break; - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1179,6 +1240,24 @@ public class GridNioServer<T> { } /** + * @param ses Session. + * @return System message request. + */ + private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) { + if (ses.hasSystemMessage()) { + Object msg = ses.systemMessage(); + + SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg); + + assert !ses.hasSystemMessage(); + + return req; + } + + return null; + } + + /** * Processes write-ready event on the key. * * @param key Key that is ready to be written. @@ -1190,7 +1269,7 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); ByteBuffer buf = ses.writeBuffer(); - NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); @@ -1204,21 +1283,25 @@ public class GridNioServer<T> { } if (req == null) { - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); - if (req == null && buf.position() == 0) { - if (ses.procWrite.get()) { - boolean set = ses.procWrite.compareAndSet(true, false); + if (req == null) { + req = ses.pollFuture(); - assert set; + if (req == null && buf.position() == 0) { + if (ses.procWrite.get()) { + boolean set = ses.procWrite.compareAndSet(true, false); - if (ses.writeQueue().isEmpty()) - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - else - ses.procWrite.set(true); - } + assert set; - return; + if (ses.writeQueue().isEmpty()) + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + else + ses.procWrite.set(true); + } + + return; + } } } @@ -1226,9 +1309,9 @@ public class GridNioServer<T> { boolean finished = false; if (req != null) { - msg = req.directMessage(); + msg = (Message)req.message(); - assert msg != null; + assert msg != null : req; if (writer != null) writer.setCurrentWriteClass(msg.getClass()); @@ -1241,14 +1324,17 @@ public class GridNioServer<T> { // Fill up as many messages as possible to write buffer. while (finished) { - req.onDone(); + req.onMessageWritten(); - req = (NioOperationFuture<?>)ses.pollFuture(); + req = systemMessage(ses); + + if (req == null) + req = ses.pollFuture(); if (req == null) break; - msg = req.directMessage(); + msg = (Message)req.message(); assert msg != null; @@ -1301,7 +1387,7 @@ public class GridNioServer<T> { */ private abstract class AbstractNioClientWorker extends GridWorker { /** Queue of change requests on this selector. */ - private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>(); /** Selector to select read events. */ private Selector selector; @@ -1409,7 +1495,7 @@ public class GridNioServer<T> { * * @param req Change request. */ - private void offer(NioOperationFuture req) { + private void offer(SessionChangeRequest req) { changeReqs.offer(req); selector.wakeup(); @@ -1426,31 +1512,27 @@ public class GridNioServer<T> { long lastIdleCheck = U.currentTimeMillis(); while (!closed && selector.isOpen()) { - NioOperationFuture req; + SessionChangeRequest req0; - while ((req = changeReqs.poll()) != null) { - switch (req.operation()) { + while ((req0 = changeReqs.poll()) != null) { + switch (req0.operation()) { case REGISTER: { - register(req); + register((NioOperationFuture)req0); break; } case REQUIRE_WRITE: { - //Just register write key. - SelectionKey key = req.session().key(); - - if (key.isValid()) { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + SessionWriteRequest req = (SessionWriteRequest)req0; - // Update timestamp to protected against false write timeout. - ((GridNioSessionImpl)key.attachment()).bytesSent(0); - } + registerWrite((GridSelectorNioSessionImpl)req.session()); break; } case CLOSE: { + NioOperationFuture req = (NioOperationFuture)req0; + if (close(req.session(), null)) req.onDone(true); else @@ -1460,6 +1542,8 @@ public class GridNioServer<T> { } case PAUSE_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + SelectionKey key = req.session().key(); if (key.isValid()) { @@ -1478,6 +1562,8 @@ public class GridNioServer<T> { } case RESUME_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + SelectionKey key = req.session().key(); if (key.isValid()) { @@ -1496,75 +1582,15 @@ public class GridNioServer<T> { } case DUMP_STATS: { - StringBuilder sb = new StringBuilder(); - - Set<SelectionKey> keys = selector.keys(); - - sb.append(U.nl()) - .append(">> Selector info [idx=").append(idx) - .append(", keysCnt=").append(keys.size()) - .append("]").append(U.nl()); - - for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); - - sb.append(" Connection info [") - .append("in=").append(ses.accepted()) - .append(", rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()); - - GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); - - if (outDesc != null) { - sb.append(", msgsSent=").append(outDesc.sent()) - .append(", msgsAckedByRmt=").append(outDesc.acked()) - .append(", descIdHash=").append(System.identityHashCode(outDesc)); - } - else - sb.append(", outRecoveryDesc=null"); - - GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); + NioOperationFuture req = (NioOperationFuture)req0; - if (inDesc != null) { - sb.append(", msgsRcvd=").append(inDesc.received()) - .append(", lastAcked=").append(inDesc.lastAcknowledged()) - .append(", descIdHash=").append(System.identityHashCode(inDesc)); - } - else - sb.append(", inRecoveryDesc=null"); - - sb.append(", bytesRcvd=").append(ses.bytesReceived()) - .append(", bytesSent=").append(ses.bytesSent()) - .append(", opQueueSize=").append(ses.writeQueueSize()) - .append(", msgWriter=").append(writer != null ? writer.toString() : "null") - .append(", msgReader=").append(reader != null ? reader.toString() : "null"); - - int cnt = 0; - - for (GridNioFuture<?> fut : ses.writeQueue()) { - if (cnt == 0) - sb.append(",\n opQueue=[").append(fut); - else - sb.append(',').append(fut); - - if (++cnt == 5) { - sb.append(']'); - - break; - } - } - - - sb.append("]").append(U.nl()); + try { + dumpStats(); + } + finally { + // Complete the request just in case (none should wait on this future). + req.onDone(true); } - - U.warn(log, sb.toString()); - - // Complete the request just in case (none should wait on this future). - req.onDone(true); } } } @@ -1616,6 +1642,93 @@ public class GridNioServer<T> { } /** + * @param ses Session. + */ + final void registerWrite(GridSelectorNioSessionImpl ses) { + SelectionKey key = ses.key(); + + if (key.isValid()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + + // Update timestamp to protected against false write timeout. + ses.bytesSent(0); + } + } + + /** + * + */ + private void dumpStats() { + StringBuilder sb = new StringBuilder(); + + Set<SelectionKey> keys = selector.keys(); + + sb.append(U.nl()) + .append(">> Selector info [idx=").append(idx) + .append(", keysCnt=").append(keys.size()) + .append("]").append(U.nl()); + + for (SelectionKey key : keys) { + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + + MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); + MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + + sb.append(" Connection info [") + .append("in=").append(ses.accepted()) + .append(", rmtAddr=").append(ses.remoteAddress()) + .append(", locAddr=").append(ses.localAddress()); + + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); + + if (outDesc != null) { + sb.append(", msgsSent=").append(outDesc.sent()) + .append(", msgsAckedByRmt=").append(outDesc.acked()) + .append(", descIdHash=").append(System.identityHashCode(outDesc)); + } + else + sb.append(", outRecoveryDesc=null"); + + GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); + + if (inDesc != null) { + sb.append(", msgsRcvd=").append(inDesc.received()) + .append(", lastAcked=").append(inDesc.lastAcknowledged()) + .append(", descIdHash=").append(System.identityHashCode(inDesc)); + } + else + sb.append(", inRecoveryDesc=null"); + + sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesSent=").append(ses.bytesSent()) + .append(", opQueueSize=").append(ses.writeQueueSize()) + .append(", msgWriter=").append(writer != null ? writer.toString() : "null") + .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + + int cnt = 0; + + for (SessionWriteRequest req : ses.writeQueue()) { + if (cnt == 0) + sb.append(",\n opQueue=[").append(req); + else + sb.append(',').append(req); + + if (++cnt == 5) { + sb.append(']'); + + break; + } + } + + + sb.append("]").append(U.nl()); + } + + U.warn(log, sb.toString()); + } + + /** * Processes keys selected by a selector. * * @param keys Selected keys. @@ -1886,17 +1999,19 @@ public class GridNioServer<T> { ses.removeMeta(BUF_META_KEY); // Since ses is in closed state, no write requests will be added. - NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal()); + SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor(); GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor(); + IOException err = new IOException("Failed to send message (connection was closed): " + ses); + if (outRecovery != null || inRecovery != null) { try { // Poll will update recovery data. - while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) { - if (fut.skipRecovery()) - fut.connectionClosed(); + while ((req = ses.pollFuture()) != null) { + if (req.skipRecovery()) + req.onError(err); } } finally { @@ -1908,11 +2023,11 @@ public class GridNioServer<T> { } } else { - if (fut != null) - fut.connectionClosed(); + if (req != null) + req.onError(err); - while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) - fut.connectionClosed(); + while ((req = ses.pollFuture()) != null) + req.onError(err); } try { @@ -2136,9 +2251,193 @@ public class GridNioServer<T> { } /** + * + */ + private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest { + /** */ + private final Object msg; + + /** */ + private final GridNioSession ses; + + /** + * @param ses Session. + * @param msg Message. + */ + WriteRequestSystemImpl(GridNioSession ses, Object msg) { + this.ses = ses; + this.msg = msg; + } + + /** {@inheritDoc} */ + @Override public void messageThread(boolean msgThread) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean messageThread() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return true; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> c) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return null; + } + + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Object message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resetSession(GridNioSession ses) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridNioSession session() { + return ses; + } + + /** {@inheritDoc} */ + @Override public NioOperation operation() { + return NioOperation.REQUIRE_WRITE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(WriteRequestSystemImpl.class, this); + } + } + + /** + * + */ + private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest { + /** */ + private GridNioSession ses; + + /** */ + private final Object msg; + + /** */ + private boolean msgThread; + + /** */ + private final boolean skipRecovery; + + /** */ + private IgniteInClosure<IgniteException> ackC; + + /** + * @param ses Session. + * @param msg Message. + * @param skipRecovery Skip recovery flag. + */ + WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) { + this.ses = ses; + this.msg = msg; + this.skipRecovery = skipRecovery; + } + + /** {@inheritDoc} */ + @Override public void messageThread(boolean msgThread) { + this.msgThread = msgThread; + } + + /** {@inheritDoc} */ + @Override public boolean messageThread() { + return msgThread; + } + + /** {@inheritDoc} */ + @Override public boolean skipRecovery() { + return skipRecovery; + } + + /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> c) { + ackC = c; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + assert msg instanceof Message; + + ((Message)msg).onAckReceived(); + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return ackC; + } + + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Object message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resetSession(GridNioSession ses) { + this.ses = ses; + } + + /** {@inheritDoc} */ + @Override public GridNioSession session() { + return ses; + } + + /** {@inheritDoc} */ + @Override public NioOperation operation() { + return NioOperation.REQUIRE_WRITE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(WriteRequestImpl.class, this); + } + } + + /** * Class for requesting write and session close operations. */ - private static class NioOperationFuture<R> extends GridNioFutureImpl<R> { + private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest, + SessionChangeRequest { /** */ private static final long serialVersionUID = 0L; @@ -2154,11 +2453,7 @@ public class GridNioServer<T> { private NioOperation op; /** Message. */ - @GridToStringExclude - private ByteBuffer msg; - - /** Direct message. */ - private Message commMsg; + private Object msg; /** */ @GridToStringExclude @@ -2220,8 +2515,7 @@ public class GridNioServer<T> { * @param op Requested operation. * @param msg Message. */ - NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, - ByteBuffer msg) { + NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) { assert ses != null; assert op != null; assert op != NioOperation.REGISTER; @@ -2249,38 +2543,25 @@ public class GridNioServer<T> { this.ses = ses; this.op = op; - this.commMsg = commMsg; + this.msg = commMsg; this.skipRecovery = skipRecovery; } - /** - * @return Requested change operation. - */ - private NioOperation operation() { + /** {@inheritDoc} */ + public NioOperation operation() { return op; } - /** - * @return Message. - */ - private ByteBuffer message() { + /** {@inheritDoc} */ + public Object message() { return msg; } - /** - * @return Direct message. - */ - private Message directMessage() { - return commMsg; - } - - /** - * @param ses New session instance. - */ - private void resetSession(GridSelectorNioSessionImpl ses) { - assert commMsg != null; + /** {@inheritDoc} */ + public void resetSession(GridNioSession ses) { + assert msg instanceof Message : msg; - this.ses = ses; + this.ses = (GridSelectorNioSessionImpl)ses; } /** @@ -2290,10 +2571,8 @@ public class GridNioServer<T> { return sockCh; } - /** - * @return Session for this change request. - */ - private GridSelectorNioSessionImpl session() { + /** {@inheritDoc} */ + public GridSelectorNioSessionImpl session() { return ses; } @@ -2311,21 +2590,21 @@ public class GridNioServer<T> { return meta; } - /** - * Applicable to write futures only. Fails future with corresponding IOException. - */ - private void connectionClosed() { - assert op == NioOperation.REQUIRE_WRITE; - assert ses != null; - - onDone(new IOException("Failed to send message (connection was closed): " + ses)); + /** {@inheritDoc} */ + @Override public void onError(Exception e) { + onDone(e); } /** {@inheritDoc} */ @Override public void onAckReceived() { - assert commMsg != null; + assert msg instanceof Message : msg; + + ((Message)msg).onAckReceived(); + } - commMsg.onAckReceived(); + /** {@inheritDoc} */ + @Override public void onMessageWritten() { + onDone(); } /** {@inheritDoc} */ @@ -2369,7 +2648,7 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { if (directMode) { boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; @@ -2388,10 +2667,10 @@ public class GridNioServer<T> { return null; } else - return send(ses, (Message)msg); + return send(ses, (Message)msg, fut); } else - return send(ses, (ByteBuffer)msg); + return send(ses, (ByteBuffer)msg, fut); } /** {@inheritDoc} */ @@ -2759,4 +3038,14 @@ public class GridNioServer<T> { return this; } } + + /** + * + */ + interface SessionChangeRequest { + /** + * @return Requested change operation. + */ + NioOperation operation(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java index 1e427d8..c1b60ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; +import org.apache.ignite.IgniteCheckedException; import org.jetbrains.annotations.Nullable; /** @@ -105,6 +106,11 @@ public interface GridNioSession { public GridNioFuture<?> send(Object msg); /** + * @param msg Message to be sent. + */ + public void sendNoFuture(Object msg) throws IgniteCheckedException; + + /** * Gets metadata associated with specified key. * * @param key Key to look up. @@ -174,4 +180,9 @@ public interface GridNioSession { * @return Recovery descriptor if recovery is supported, {@code null otherwise.} */ @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor(); + + /** + * @param msg System message to send. + */ + public void systemMessage(Object msg); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java index 53a624d..594bca2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java @@ -99,7 +99,7 @@ public class GridNioSessionImpl implements GridNioSession { try { resetSendScheduleTime(); - return chain().onSessionWrite(this, msg); + return chain().onSessionWrite(this, msg, true); } catch (IgniteCheckedException e) { close(); @@ -109,6 +109,18 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + try { + chain().onSessionWrite(this, msg, false); + } + catch (IgniteCheckedException e) { + close(); + + throw e; + } + } + + /** {@inheritDoc} */ @Override public GridNioFuture<?> resumeReads() { try { return chain().onResumeReads(this); @@ -316,6 +328,11 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioSessionImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 6b00281..245d5b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8; */ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Pending write requests. */ - private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>(); + private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>(); /** Selection key associated with this session. */ @GridToStringExclude @@ -68,6 +68,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** */ final AtomicBoolean procWrite = new AtomicBoolean(); + /** */ + private Object sysMsg; + /** * Creates session instance. * @@ -166,7 +169,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request. * @return Updated size of the queue. */ - int offerSystemFuture(GridNioFuture<?> writeFut) { + int offerSystemFuture(SessionWriteRequest writeFut) { writeFut.messageThread(true); boolean res = queue.offerFirst(writeFut); @@ -186,7 +189,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request to add. * @return Updated size of the queue. */ - int offerFuture(GridNioFuture<?> writeFut) { + int offerFuture(SessionWriteRequest writeFut) { boolean msgThread = GridNioBackPressureControl.threadProcessingMessage(); if (sem != null && !msgThread) @@ -204,7 +207,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** * @param futs Futures to resend. */ - void resend(Collection<GridNioFuture<?>> futs) { + void resend(Collection<SessionWriteRequest> futs) { assert queue.isEmpty() : queue.size(); boolean add = queue.addAll(futs); @@ -215,8 +218,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** * @return Message that is in the head of the queue, {@code null} if queue is empty. */ - @Nullable GridNioFuture<?> pollFuture() { - GridNioFuture<?> last = queue.poll(); + @Nullable SessionWriteRequest pollFuture() { + SessionWriteRequest last = queue.poll(); if (last != null) { if (sem != null && !last.messageThread()) @@ -246,7 +249,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param fut Future. * @return {@code True} if future was removed from queue. */ - boolean removeFuture(GridNioFuture<?> fut) { + boolean removeFuture(SessionWriteRequest fut) { assert closed(); return queue.removeLastOccurrence(fut); @@ -264,7 +267,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** * @return Write requests. */ - Collection<GridNioFuture<?>> writeQueue() { + Collection<SessionWriteRequest> writeQueue() { return queue; } @@ -321,6 +324,31 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** {@inheritDoc} */ + @Override public void systemMessage(Object sysMsg) { + this.sysMsg = sysMsg; + } + + /** + * @return {@code True} if have pending system message to send. + */ + boolean hasSystemMessage() { + return sysMsg != null; + } + + /** + * Gets and clears pending system message. + * + * @return Pending system message. + */ + Object systemMessage() { + Object ret = sysMsg; + + sysMsg = null; + + return ret; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridSelectorNioSessionImpl.class, this, super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index fcb40c7..c98c3aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -108,40 +108,36 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> c) throws IgniteCheckedException { - // Node ID is never provided in asynchronous send mode. - assert nodeId == null; + try { + // Node ID is never provided in asynchronous send mode. + assert nodeId == null; - if (closure != null) - ses.addMeta(ACK_CLOSURE.ordinal(), closure); + if (c != null) + ses.addMeta(ACK_CLOSURE.ordinal(), c); - GridNioFuture<?> fut = ses.send(msg); + ses.sendNoFuture(msg); - if (fut.isDone()) { - try { - fut.get(); - } - catch (IgniteCheckedException e) { - if (closure != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); + if (c != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); + } + catch (IgniteCheckedException e) { + if (c != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); - if (log.isDebugEnabled()) - log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); + if (log.isDebugEnabled()) + log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); - if (e.getCause() instanceof IOException) { - ses.close(); + if (e.getCause() instanceof IOException) { + ses.close(); - return true; - } - else - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); + return true; } + else + throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); } - if (closure != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); - return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java new file mode 100644 index 0000000..508c791 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.lang.IgniteInClosure; + +/** + * + */ +public interface SessionWriteRequest { + /** + * Sets flag indicating that message send future was created in thread that was processing a message. + * + * @param msgThread {@code True} if future was created in thread that is processing message. + */ + public void messageThread(boolean msgThread); + + /** + * @return {@code True} if future was created in thread that was processing message. + */ + public boolean messageThread(); + + /** + * @return {@code True} if skip recovery for this operation. + */ + public boolean skipRecovery(); + + /** + * Sets ack closure which will be applied when ack received. + * + * @param c Ack closure. + */ + public void ackClosure(IgniteInClosure<IgniteException> c); + + /** + * The method will be called when ack received. + */ + public void onAckReceived(); + + /** + * @return Ack closure. + */ + public IgniteInClosure<IgniteException> ackClosure(); + + /** + * @return Session. + */ + public GridNioSession session(); + + /** + * @param ses Session. + */ + public void resetSession(GridNioSession ses); + + /** + * + */ + public void onError(Exception e); + + /** + * @return Message. + */ + public Object message(); + + /** + * + */ + public void onMessageWritten(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index 63cdd83..2da8f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -249,9 +249,9 @@ public class GridNioSslFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { if (directMode) - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); ByteBuffer input = checkMessage(ses, msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java index 3272b8e..d65d576 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java @@ -430,7 +430,7 @@ class GridNioSslHandler extends ReentrantLock { while (!deferredWriteQueue.isEmpty()) { WriteRequest req = deferredWriteQueue.poll(); - req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer())); + req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true)); } } @@ -475,7 +475,7 @@ class GridNioSslHandler extends ReentrantLock { ByteBuffer cp = copy(outNetBuf); - return parent.proceedSessionWrite(ses, cp); + return parent.proceedSessionWrite(ses, cp, true); } /**
