ignite-4003 Async outgoing connections for communication SPI
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3f6b88a2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3f6b88a2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3f6b88a2 Branch: refs/heads/ignite-4003 Commit: 3f6b88a2765dec5d236d6b41f84b87f4d7584c06 Parents: 676d6f7 Author: agura <[email protected]> Authored: Tue Feb 7 14:45:57 2017 +0300 Committer: agura <[email protected]> Committed: Sun Apr 9 14:56:00 2017 +0300 ---------------------------------------------------------------------- .../GridClientConnectionManagerAdapter.java | 1 - .../connection/GridClientNioTcpConnection.java | 2 +- .../managers/communication/GridIoManager.java | 4 + .../internal/util/GridSpinReadWriteLock.java | 2 +- .../util/nio/GridNioRecoveryDescriptor.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 218 +- .../util/nio/GridSelectorNioSessionImpl.java | 2 - .../internal/util/nio/ssl/GridNioSslFilter.java | 12 +- .../communication/tcp/TcpCommunicationSpi.java | 1901 ++++++++++++------ .../IgniteCacheMessageWriteTimeoutTest.java | 4 +- .../internal/util/nio/GridNioSelfTest.java | 2 +- .../spi/GridTcpSpiForwardingSelfTest.java | 3 +- .../GridAbstractCommunicationSelfTest.java | 27 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 28 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 51 +- ...GridTcpCommunicationSpiRecoverySelfTest.java | 49 +- ...CommunicationRecoveryAckClosureSelfTest.java | 36 +- .../tcp/TcpCommunicationSpiDropNodesTest.java | 12 +- .../TcpCommunicationSpiFaultyClientTest.java | 5 +- .../HadoopExternalCommunication.java | 5 +- 20 files changed, 1659 insertions(+), 707 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index e325897..aa06322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog); sslFilter.directMode(false); - sslFilter.clientMode(true); filters = new GridNioFilter[]{codecFilter, sslFilter}; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index d3a30fb..73487db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); } - ses = (GridNioSession)srv.createSession(ch, meta).get(); + ses = (GridNioSession)srv.createSession(ch, meta, false, null).get(); if (sslHandshakeFut != null) sslHandshakeFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b64027b..1ad4f52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -1300,6 +1301,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC); else getSpi().sendMessage(node, ioMsg); + + if (ctx.discovery().node(node.id()) == null) + throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node); } catch (IgniteSpiException e) { throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java index 4f23979..8fef887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java @@ -70,7 +70,7 @@ public class GridSpinReadWriteLock { private int writeLockEntryCnt; /** - * Acquires write lock. + * Acquires read lock. */ @SuppressWarnings("BusyWait") public void readLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 6258c13..af7b757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor { /** * @param node Node. - * @return {@code True} if node is not null and has the same order as initial remtoe node. + * @return {@code True} if node is not null and has the same order as initial remote node. */ public boolean nodeAlive(@Nullable ClusterNode node) { return node != null && node.order() == this.node.order(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 7f25e40..0848a8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -110,6 +110,12 @@ public class GridNioServer<T> { /** SSL write buf limit. */ private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); + /** Session future meta key. */ + private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Selection key meta key. */ + private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** */ private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); @@ -462,7 +468,7 @@ public class GridNioServer<T> { * @return Future for operation. */ public GridNioFuture<Boolean> close(GridNioSession ses) { - assert ses instanceof GridSelectorNioSessionImpl; + assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -706,6 +712,7 @@ public class GridNioServer<T> { /** * */ + @SuppressWarnings("ForLoopReplaceableByForEach") public void dumpStats() { U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); @@ -719,17 +726,34 @@ public class GridNioServer<T> { * * @param ch Channel to register within the server and create session for. * @param meta Optional meta for new session. + * @param async Async connection. + * @param lsnr Listener that should be invoked in NIO thread. * @return Future to get session. */ - public GridNioFuture<GridNioSession> createSession(final SocketChannel ch, - @Nullable Map<Integer, ?> meta) { + public GridNioFuture<GridNioSession> createSession( + final SocketChannel ch, + @Nullable Map<Integer, Object> meta, + boolean async, + @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr + ) { try { if (!closed) { ch.configureBlocking(false); NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); - offerBalanced(req); + if (async) { + assert meta != null; + + req.op = NioOperation.CONNECT; + + meta.put(SESSION_FUT_META_KEY, req); + } + + if (lsnr != null) + req.listen(lsnr); + + offerBalanced(req, meta); return req; } @@ -743,6 +767,29 @@ public class GridNioServer<T> { } /** + * @param ch Channel. + * @param meta Session meta. + */ + public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) { + if (!closed) { + NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); + + req.op = NioOperation.CANCEL_CONNECT; + + Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY); + + assert idx != null : meta; + + clientWorkers.get(idx).offer(req); + + return req; + } + else + return new GridNioFinishedFuture<>( + new IgniteCheckedException("Failed to cancel connection, server is stopped.")); + } + + /** * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}. * * @return Write timeout in milliseconds. @@ -828,9 +875,11 @@ public class GridNioServer<T> { /** * @param req Request to balance. + * @param meta Session metadata. + * @return Worker index. */ - private synchronized void offerBalanced(NioOperationFuture req) { - assert req.operation() == NioOperation.REGISTER : req; + private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) { + assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req; assert req.socketChannel() != null : req; int workers = clientWorkers.size(); @@ -868,7 +917,12 @@ public class GridNioServer<T> { else balanceIdx = 0; + if (meta != null) + meta.put(WORKER_IDX_META_KEY, balanceIdx); + clientWorkers.get(balanceIdx).offer(req); + + return balanceIdx; } /** {@inheritDoc} */ @@ -1692,6 +1746,38 @@ public class GridNioServer<T> { while ((req0 = changeReqs.poll()) != null) { switch (req0.operation()) { + case CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + try { + ch.register(selector, SelectionKey.OP_CONNECT, req.meta()); + } + catch (IOException e) { + req.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); + } + + break; + } + + case CANCEL_CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + SelectionKey key = ch.keyFor(selector); + + if (key != null) + key.cancel(); + + U.closeQuiet(ch); + + req.onDone(); + + break; + } + case REGISTER: { register((NioOperationFuture)req0); @@ -1898,8 +1984,12 @@ public class GridNioServer<T> { log.debug("Closing all connected client sockets."); // Close all channels registered with selector. - for (SelectionKey key : selector.keys()) - close((GridSelectorNioSessionImpl)key.attachment(), null); + for (SelectionKey key : selector.keys()) { + Object attach = key.attachment(); + + if (attach instanceof GridSelectorNioSessionImpl) + close((GridSelectorNioSessionImpl)attach, null); + } if (log.isDebugEnabled()) log.debug("Closing NIO selector."); @@ -2022,11 +2112,19 @@ public class GridNioServer<T> { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - assert ses != null; + GridSelectorNioSessionImpl ses = null; try { + if (key.isConnectable()) { + processConnect(key); + + continue; + } + + ses = (GridSelectorNioSessionImpl)key.attachment(); + + assert ses != null; + if (key.isReadable()) processRead(key); @@ -2038,9 +2136,11 @@ public class GridNioServer<T> { throw e; } catch (Exception e) { - U.warn(log, "Failed to process selector key (will close): " + ses, e); + if (!closed) + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); - close(ses, new GridNioException(e)); + if (ses != null) + close(ses, new GridNioException(e)); } } } @@ -2067,11 +2167,19 @@ public class GridNioServer<T> { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - assert ses != null; + GridSelectorNioSessionImpl ses = null; try { + if (key.isConnectable()) { + processConnect(key); + + continue; + } + + ses = (GridSelectorNioSessionImpl)key.attachment(); + + assert ses != null; + if (key.isReadable()) processRead(key); @@ -2084,9 +2192,10 @@ public class GridNioServer<T> { } catch (Exception e) { if (!closed) - U.warn(log, "Failed to process selector key (will close): " + ses, e); + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); - close(ses, new GridNioException(e)); + if (ses != null) + close(ses, new GridNioException(e)); } } } @@ -2100,7 +2209,12 @@ public class GridNioServer<T> { long now = U.currentTimeMillis(); for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + Object obj = key.attachment(); + + if (!(obj instanceof GridSelectorNioSessionImpl)) + continue; + + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj; try { long writeTimeout0 = writeTimeout; @@ -2181,12 +2295,32 @@ public class GridNioServer<T> { ses.addMeta(e.getKey(), e.getValue()); } - SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses); + SelectionKey key; - ses.key(key); + if (!sockCh.isRegistered()) + key = sockCh.register(selector, SelectionKey.OP_READ, ses); + else { + key = sockCh.keyFor(selector); + + Map<Integer, Object> m = (Map<Integer, Object>)key.attachment(); + + NioOperationFuture<GridNioSession> fut = + (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY); + + assert fut != null; + + for (Entry<Integer, Object> e : m.entrySet()) + ses.addMeta(e.getKey(), e.getValue()); + + key.attach(ses); + + key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); + key.interestOps(key.interestOps() | SelectionKey.OP_READ); - if (!ses.accepted()) - resend(ses); + fut.onDone(ses); + } + + ses.key(key); sessions.add(ses); workerSessions.add(ses); @@ -2321,6 +2455,34 @@ public class GridNioServer<T> { } /** + * @param key Key. + * @throws IOException If failed. + */ + @SuppressWarnings("unchecked") + private void processConnect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel)key.channel(); + + Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment(); + + try { + if (ch.finishConnect()) + register(new NioOperationFuture<GridNioSession>(ch, false, meta)); + } + catch (IOException e) { + NioOperationFuture<GridNioSession> sesFut = + (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY); + + assert sesFut != null; + + U.closeQuiet(ch); + + sesFut.onDone(new GridNioException("Failed to connect to node", e)); + + throw e; + } + } + + /** * Processes read-available event on the key. * * @param key Key that is ready to be read. @@ -2537,14 +2699,20 @@ public class GridNioServer<T> { * @param sockCh Socket channel to be registered on one of the selectors. */ private void addRegistrationReq(SocketChannel sockCh) { - offerBalanced(new NioOperationFuture(sockCh)); + offerBalanced(new NioOperationFuture(sockCh), null); } } /** * Asynchronous operation that may be requested on selector. */ - enum NioOperation { + private enum NioOperation { + /** Register connect key selection. */ + CONNECT, + + /** Cancel connect. */ + CANCEL_CONNECT, + /** Register read key selection. */ REGISTER, http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 66f9176..2280321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (!outRecovery.pairedConnections()) inRecovery = outRecovery; - outRecovery.onConnected(); - return null; } else http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index b4bd34a..f8a0dce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { /** Allocate direct buffer or heap buffer. */ private boolean directBuf; - /** Whether SSLEngine should use client mode. */ - private boolean clientMode; - /** Whether direct mode is used. */ private boolean directMode; @@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { } /** - * @param clientMode Flag indicating whether SSLEngine should use client mode.. - */ - public void clientMode(boolean clientMode) { - this.clientMode = clientMode; - } - - /** * * @param directMode Flag indicating whether direct mode is used. */ @@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter { if (sslMeta == null) { engine = sslCtx.createSSLEngine(); + boolean clientMode = !ses.accepted(); + engine.setUseClientMode(clientMode); if (!clientMode) {
