http://git-wip-us.apache.org/repos/asf/ignite/blob/651c01b7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4c89a7c..943bfb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; @@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter; import org.apache.ignite.internal.util.nio.GridDirectParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; @@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient; import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient; -import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; -import org.apache.ignite.internal.util.nio.ssl.GridSslMeta; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -136,7 +134,6 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; /** * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses @@ -300,10 +297,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Connection index meta for session. */ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Recovery descriptor meta key. */ + private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Connection context meta key. */ + private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** * Default local port range (value is <tt>100</tt>). * See {@link #setLocalPortRange(int)} for details. @@ -338,6 +340,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ public static final byte HANDSHAKE_MSG_TYPE = -3; + /** Ignite header message. */ + private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage(); + + /** Skip ack. For test purposes only. */ + private boolean skipAck; + /** */ private ConnectGateway connectGate; @@ -402,10 +410,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + outDesc.node().id() + ']'); - DisconnectedSessionInfo disconnectData = - new DisconnectedSessionInfo(outDesc, connId.connectionIndex()); + SessionInfo sesInfo = + new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT); - commWorker.addProcessDisconnectRequest(disconnectData); + commWorker.addSessionStateChangeRequest(sesInfo); } } else @@ -431,14 +439,75 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (msg instanceof NodeIdMessage) { sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); - connKey = new ConnectionKey(sndId, 0, -1); + + if (ses.remoteAddress() != null) { // Not shmem. + assert !ses.accepted(); + + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + assert ctx != null; + assert ctx.expNodeId != null; + + if (sndId.equals(ctx.expNodeId)) { + GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor(); + + assert recoveryDesc != null; + + long connCnt = recoveryDesc.incrementConnectCount(); + + connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt); + + final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey); + + assert old == null; + + ses.send(IGNITE_HEADER_MSG); + + ClusterNode locNode = getLocalNode(); + + if (locNode == null) { + commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE, + new IgniteCheckedException("Local node has not been started or " + + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'))); + + return; + } + + Integer handshakeConnIdx = useMultipleConnections(getSpiContext().node(sndId)) + ? connKey.connectionIndex() : null; + + HandshakeMessage handshakeMsg; + + if (handshakeConnIdx != null) + handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt, recoveryDesc.received(), + handshakeConnIdx); + else + handshakeMsg = new HandshakeMessage(locNode.id(), connCnt, recoveryDesc.received()); + + if (log.isDebugEnabled()) + log.debug("Write handshake message [rmtNode=" + sndId + + ", msg=" + handshakeMsg + ']'); + + ses.send(handshakeMsg); + } + else { + commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE, + new IgniteCheckedException("Remote node ID is not as expected [expected=" + + ctx.expNodeId + ", rcvd=" + sndId + ']'))); + } + + return; + } + else + connKey = new ConnectionKey(sndId, 0, -1); } else { assert msg instanceof HandshakeMessage : msg; HandshakeMessage msg0 = (HandshakeMessage)msg; - sndId = ((HandshakeMessage)msg).nodeId(); + sndId = msg0.nodeId(); + connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } @@ -497,12 +566,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { assert connKey.connectionIndex() >= 0 : connKey; - GridCommunicationClient[] curClients = clients.get(sndId); - - GridCommunicationClient oldClient = - curClients != null && connKey.connectionIndex() < curClients.length ? - curClients[connKey.connectionIndex()] : - null; + GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex()); boolean hasShmemClient = false; @@ -531,10 +595,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); if (oldFut == null) { - curClients = clients.get(sndId); - - oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? - curClients[connKey.connectionIndex()] : null; + oldClient = nodeClient(sndId, connKey.connectionIndex()); if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { @@ -578,7 +639,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) { if (log.isDebugEnabled()) { log.debug("Received incoming connection from remote node while " + "connecting to this node, rejecting [locNode=" + locNode.id() + @@ -604,9 +665,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { - assert ses.accepted() : ses; - - if (!connectGate.tryEnter()) { + if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate. if (log.isDebugEnabled()) log.debug("Close incoming connection, failed to enter gateway."); @@ -619,7 +678,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter onFirstMessage(ses, msg); } finally { - connectGate.leave(); + if (ses.accepted()) + connectGate.leave(); } } else { @@ -631,13 +691,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (recovery != null) { RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; + long rcvCnt = msg0.received(); + if (log.isDebugEnabled()) { log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']'); } - recovery.ackReceived(msg0.received()); + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) { + HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj; + + Exception err = null; + + if (timeoutObj != null) { + if (!cancelHandshakeTimeout(timeoutObj)) { + err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " + + "(consider increasing 'connectionTimeout' configuration property)."); + } + + ctx.handshakeTimeoutObj = null; + } + + if (rcvCnt == -1 || err != null) { + if (ses.remoteAddress() != null) { + SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + } + else { + ctx.rcvCnt = rcvCnt; + + recovery.onHandshake(rcvCnt); + + nioSrvr.resend(ses); + + recovery.onConnected(); + + SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + } + else + recovery.ackReceived(rcvCnt); return; } @@ -655,7 +755,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", rcvCnt=" + rcvCnt + ']'); } - ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); + if (!skipAck) + ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); recovery.lastAcknowledged(rcvCnt); } @@ -706,7 +807,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert connKey != null && connKey.connectionIndex() >= 0 : connKey; assert !usePairedConnections(node); - recovery.onHandshake(rcvCnt); + if (ses.accepted()) + recovery.onHandshake(rcvCnt); ses.inRecoveryDescriptor(recovery); ses.outRecoveryDescriptor(recovery); @@ -913,10 +1015,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } else { try { - fut.onDone(); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); + } catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + } finally { + fut.onDone(); + + clientFuts.remove(connKey, fut); + + ses.close(); + } + } + }); } - finally { - clientFuts.remove(connKey, fut); + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); } } } @@ -1721,12 +1841,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.dumpStats(); } - /** */ - private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>(); - - /** */ - private final AtomicInteger connIdx = new AtomicInteger(); - /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -1982,9 +2096,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; + UUID rmtNodeId = null; + ConnectionKey key = ses.meta(CONN_IDX_META); - return key != null ? formatter.writer(key.nodeId()) : null; + if (key != null) + rmtNodeId = key.nodeId(); + else { + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + if (ctx != null) + rmtNodeId = ctx.expNodeId; + } + + return key != null ? formatter.writer(rmtNodeId) : null; } }; @@ -1994,7 +2119,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() { @Override public boolean apply(Message msg) { - return msg instanceof RecoveryLastReceivedMessage; + return msg instanceof NotRecoverable; } }; @@ -2329,52 +2454,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (node.id().equals(locNode.id())) notifyListener(node.id(), msg, NOOP); else { - GridCommunicationClient client = null; - int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0; + send(node, connIdx, msg, ackC); + } + } + + /** + * Try to send message. + */ + private void send(final ClusterNode node, + final int connIdx, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) { + final GridCommunicationClient client = nodeClient(node.id(), connIdx); + + if (client != null && client.reserve()) { try { - boolean retry; + send0(client, node, msg, ackC); + } + catch (IgniteCheckedException e) { + if (removeNodeClient(node.id(), client)) + client.forceClose(); + + throw new IgniteSpiException("Failed to send message to remote node: " + node, e); + } + } + else { + if (client != null) + removeNodeClient(node.id(), client); - do { - client = reserveClient(node, connIdx); + IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx); - UUID nodeId = null; + clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + GridCommunicationClient client = null; - if (!client.async()) - nodeId = node.id(); + try { + client = fut.get(); - retry = client.sendMessage(nodeId, msg, ackC); + send0(client, node, msg, ackC); + } + catch (IgniteCheckedException e) { + LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id()); - client.release(); + if (client != null && removeNodeClient(node.id(), client)) + client.forceClose(); + } + } + }); + } + } - if (!retry) - sentMsgsCnt.increment(); - else { - removeNodeClient(node.id(), client); + /** + * @param client Client. + * @param node Node. + * @param msg Message. + * @param ackC Ack closure. + */ + private void send0( + GridCommunicationClient client, + ClusterNode node, + Message msg, + IgniteInClosure<IgniteException> ackC + ) throws IgniteCheckedException { + assert client != null; - ClusterNode node0 = getSpiContext().node(node.id()); + UUID nodeId = null; - if (node0 == null) - throw new IgniteCheckedException("Failed to send message to remote node " + - "(node has left the grid): " + node.id()); - } + if (!client.async()) + nodeId = node.id(); - client = null; - } - while (retry); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to send message to remote node: " + node, e); - } - finally { - if (client != null && removeNodeClient(node.id(), client)) - client.forceClose(); + boolean retry = client.sendMessage(nodeId, msg, ackC); + + client.release(); + + if (!retry) + sentMsgsCnt.increment(); + else { + removeNodeClient(node.id(), client); + + ClusterNode node0 = getSpiContext().node(node.id()); + + if (node0 == null) { + U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id()); + + return; } + + send(node, client.connectionIndex(), msg, ackC); } } /** + * @param nodeId Node id. + * @param connIdx Connection index. + */ + private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) { + GridCommunicationClient[] curClients = clients.get(nodeId); + + return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null; + } + + /** * @param nodeId Node ID. * @param rmvClient Client to remove. * @return {@code True} if client was removed. @@ -2443,95 +2626,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node to which client should be open. * @param connIdx Connection index. * @return The existing or just created client. - * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - assert node != null; - assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; + private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) { + GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); - UUID nodeId = node.id(); + tryReserveClient(node, connIdx, fut); - while (true) { - GridCommunicationClient[] curClients = clients.get(nodeId); + return fut; + } + + /** + * @param node Node. + * @param connIdx Connection index. + * @param fut Future. + */ + private void tryReserveClient( + final ClusterNode node, + final int connIdx, + final GridFutureAdapter<GridCommunicationClient> fut) + { + final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx); + + reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) { + try { + GridCommunicationClient client = fut0.get(); + + if (client != null) + fut.onDone(client); + else + tryReserveClient(node, connIdx, fut); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + } + }); + + try { + reserveFut.reserve(); + } + catch (Exception e) { + fut.onDone(e); + } + } + + /** + * + */ + private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> { + /** Node. */ + private final ClusterNode node; + + /** Connection index. */ + private final int connIdx; + + /** + * @param node Node. + */ + ReserveClientFuture(ClusterNode node, int connIdx) { + assert node != null; + assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; + + this.node = node; + this.connIdx = connIdx; + } + + /** + * + */ + void reserve() { + final UUID nodeId = node.id(); + + final GridCommunicationClient client = nodeClient(nodeId, connIdx); - GridCommunicationClient client = curClients != null && connIdx < curClients.length ? - curClients[connIdx] : null; + final GridFutureAdapter<GridCommunicationClient> connFut; if (client == null) { - if (stopping) - throw new IgniteSpiException("Node is stopping."); + if (stopping) { + onDone(new IgniteSpiException("Node is stopping.")); + + return; + } // Do not allow concurrent connects. - GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); + connFut = this; - ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); + final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut); + final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut); if (oldFut == null) { try { - GridCommunicationClient[] curClients0 = clients.get(nodeId); - - GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ? - curClients0[connIdx] : null; + GridCommunicationClient client0 = nodeClient(nodeId, connIdx); if (client0 == null) { - client0 = createNioClient(node, connIdx); - - if (client0 != null) { - addNodeClient(node, connIdx, client0); - - if (client0 instanceof GridTcpNioCommunicationClient) { - GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); + IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx); - if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { - if (log.isDebugEnabled()) - log.debug("Session was closed after client creation, will retry " + - "[node=" + node + ", client=" + client0 + ']'); + clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + try { + GridCommunicationClient client0 = fut.get(); + + if (client0 != null) { + addNodeClient(node, connIdx, client0); + + if (client0 instanceof GridTcpNioCommunicationClient) { + GridTcpNioCommunicationClient tcpClient = + ((GridTcpNioCommunicationClient)client0); + + if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { + if (log.isDebugEnabled()) + log.debug("Session was closed after client creation, " + + "will retry [node=" + node + ", client=" + client0 + ']'); + + client0 = null; + } + } + + if (client0 == null) { + clientFuts.remove(connKey, connFut); + + onDone(); + } + else if (client0.reserve()) { + clientFuts.remove(connKey, connFut); + + onDone(client0); + } + else { + clientFuts.remove(connKey, connFut); + + removeNodeClient(nodeId, client0); + + onDone(); + } + } + else { + final long currTime = U.currentTimeMillis(); + + addTimeoutObject(new IgniteSpiTimeoutObject() { + private final IgniteUuid id = IgniteUuid.randomUuid(); + + @Override public IgniteUuid id() { + return id; + } + + @Override public long endTime() { + return currTime + 200; + } + + @Override public void onTimeout() { + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, + new Runnable() { + @Override public void run() { + clientFuts.remove(connKey, connFut); + + onDone(); + } + }); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + } + catch (IgniteCheckedException e) { + clientFuts.remove(connKey, connFut); - client0 = null; + onDone(e); } } - } - else - U.sleep(200); + }); } + else { + assert connIdx == client0.connectionIndex() : client0; + + if (client0.reserve()) + onDone(client0); + else { + removeNodeClient(nodeId, client0); - fut.onDone(client0); + onDone(); + } + } } catch (Throwable e) { - fut.onDone(e); + connFut.onDone(e); if (e instanceof Error) throw (Error)e; } - finally { - clientFuts.remove(connKey, fut); - } } - else - fut = oldFut; + else { + oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + try { + GridCommunicationClient client0 = fut.get(); - client = fut.get(); + if (client0 == null) { + clientFuts.remove(connKey, oldFut); - if (client == null) - continue; + onDone(); + } + else if (client0.reserve()) { + clientFuts.remove(connKey, oldFut); - if (getSpiContext().node(nodeId) == null) { - if (removeNodeClient(nodeId, client)) - client.forceClose(); + onDone(client0); + } + else { + clientFuts.remove(connKey, oldFut); - throw new IgniteSpiException("Destination node is not in topology: " + node.id()); + removeNodeClient(nodeId, client0); + + onDone(); + } + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); } } + else { + assert connIdx == client.connectionIndex() : client; - assert connIdx == client.connectionIndex() : client; + if (client.reserve()) + onDone(client); + else { + removeNodeClient(nodeId, client); - if (client.reserve()) - return client; - else - // Client has just been closed by idle worker. Help it and try again. - removeNodeClient(nodeId, client); + onDone(); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReserveClientFuture.class, this); } } @@ -2539,10 +2872,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node to create client for. * @param connIdx Connection index. * @return Client. - * @throws IgniteCheckedException If failed. */ - @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2550,7 +2881,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClusterNode locNode = getSpiContext().localNode(); if (locNode == null) - throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to create NIO client (local node is stopping)") + ); if (log.isDebugEnabled()) log.debug("Creating NIO client to node: " + node); @@ -2567,7 +2900,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Shmem client created: " + client); - return client; + return new GridFinishedFuture<>(client); } catch (IgniteCheckedException e) { if (e.hasCause(IpcOutOfSystemResourcesException.class)) @@ -2578,21 +2911,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else if (log.isDebugEnabled()) log.debug("Failed to establish shared memory connection with local node (node has left): " + node.id()); + + return new GridFinishedFuture<>(e); } } - connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node, connIdx); - - if (log.isDebugEnabled()) - log.debug("TCP client created: " + client); - - return client; + return createTcpClient(node, connIdx); } - finally { - connectGate.leave(); + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } @@ -2641,12 +2970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, - null, - node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), - null, - null); + safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2708,7 +3032,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - ClusterNode node = getSpiContext().node(id.nodeId); + ClusterNode node = getSpiContext().node(id.nodeId()); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2731,507 +3055,502 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * @param node Remote node. * @param connIdx Connection index. - * @return Client. + * @return Client future. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); - Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); - Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); + protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { + TcpClientFuture fut = new TcpClientFuture(node, connIdx); - boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null); - boolean isExtAddrsExist = !F.isEmpty(extAddrs); + connectGate.enter(); - if (!isRmtAddrsExist && !isExtAddrsExist) - throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " + - "TCP communication addresses or mapped external addresses. Check configuration and make sure " + - "that you use the same communication SPI on all nodes. Remote node id: " + node.id()); + fut.connect(); - LinkedHashSet<InetSocketAddress> addrs; + fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) { + connectGate.leave(); + } + }); - // Try to connect first on bound addresses. - if (isRmtAddrsExist) { - List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); + return fut; + } - boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); + /** + * @param timeoutObj Timeout object. + */ + private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) { + boolean cancelled = timeoutObj.cancel(); - Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); + if (cancelled) + removeTimeoutObject(timeoutObj); - addrs = new LinkedHashSet<>(addrs0); - } - else - addrs = new LinkedHashSet<>(); + return cancelled; + } - // Then on mapped external addresses. - if (isExtAddrsExist) - addrs.addAll(extAddrs); + /** + * + */ + private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> { + /** Node. */ + private final ClusterNode node; - boolean conn = false; - GridCommunicationClient client = null; - IgniteCheckedException errs = null; + /** Timeout helper. */ + private final IgniteSpiOperationTimeoutHelper timeoutHelper = + new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this); - int connectAttempts = 1; + /** Addresses. */ + private Collection<InetSocketAddress> addrs; - for (InetSocketAddress addr : addrs) { - long connTimeout0 = connTimeout; + /** Addresses it. */ + private Iterator<InetSocketAddress> addrsIt; - int attempt = 1; + /** Current addresses. */ + private volatile InetSocketAddress currAddr; - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + /** Err. */ + private volatile IgniteCheckedException err; - while (!conn) { // Reconnection on handshake timeout. - try { - SocketChannel ch = SocketChannel.open(); + /** Connect attempts. */ + private volatile int connectAttempts; - ch.configureBlocking(true); + /** Attempts. */ + private volatile int attempt; - ch.socket().setTcpNoDelay(tcpNoDelay); - ch.socket().setKeepAlive(true); + /** Connection index. */ + private volatile int connIdx; - if (sockRcvBuf > 0) - ch.socket().setReceiveBufferSize(sockRcvBuf); + /** + * @param node Node. + */ + TcpClientFuture(ClusterNode node, int connIdx) { + this.node = node; + this.connIdx = connIdx; + } - if (sockSndBuf > 0) - ch.socket().setSendBufferSize(sockSndBuf); + /** + * Connects to remote node. + */ + void connect() { + try { + addrs = addrs(); + } + catch (IgniteCheckedException e) { + onDone(e); - if (getSpiContext().node(node.id()) == null) { - U.closeQuiet(ch); + return; + } - throw new ClusterTopologyCheckedException("Failed to send message " + - "(node left topology): " + node); - } + addrsIt = addrs.iterator(); - ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); + tryConnect(true); + } - GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); + /** + * + */ + private void tryConnect(boolean next) { + if (next && !addrsIt.hasNext()) { + IgniteCheckedException err0 = err; - if (!recoveryDesc.reserve()) { - U.closeQuiet(ch); + assert err0 != null; - return null; - } + UUID nodeId = node.id(); - long rcvCnt = -1; + if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && + X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, + IgniteSpiOperationTimeoutException.class)) + { + LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " + + "dropped from cluster [" + "rmtNode=" + node + ", err=" + err + + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']'); - Map<Integer, Object> meta = new HashMap<>(); + getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " + + "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']'); + } - GridSslMeta sslMeta = null; + onDone(err0); - try { - ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout)); - - if (isSslEnabled()) { - meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta()); + return; + } - SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine(); + if (next) { + attempt = 0; - sslEngine.setUseClientMode(true); + connectAttempts = 0; - sslMeta.sslEngine(sslEngine); - } + currAddr = addrsIt.next(); + } - Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null; + InetSocketAddress addr = currAddr; - rcvCnt = safeHandshake(ch, - recoveryDesc, - node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), - sslMeta, - handshakeConnIdx); + try { + final SocketChannel ch = SocketChannel.open(); - if (rcvCnt == -1) - return null; - } - finally { - if (recoveryDesc != null && rcvCnt == -1) - recoveryDesc.release(); - } + ch.configureBlocking(false); - try { - meta.put(CONN_IDX_META, connKey); + ch.socket().setTcpNoDelay(tcpNoDelay); + ch.socket().setKeepAlive(true); - if (recoveryDesc != null) { - recoveryDesc.onHandshake(rcvCnt); + if (sockRcvBuf > 0) + ch.socket().setReceiveBufferSize(sockRcvBuf); - meta.put(-1, recoveryDesc); - } + if (sockSndBuf > 0) + ch.socket().setSendBufferSize(sockSndBuf); - GridNioSession ses = nioSrvr.createSession(ch, meta).get(); + if (getSpiContext().node(node.id()) == null) { + U.closeQuiet(ch); - client = new GridTcpNioCommunicationClient(connIdx, ses, log); + onError(new ClusterTopologyCheckedException("Failed to send message " + + "(node left topology): " + node)); - conn = true; - } - finally { - if (!conn) { - if (recoveryDesc != null) - recoveryDesc.release(); - } - } + return; } - catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { - if (client != null) { - client.forceClose(); - client = null; - } + final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); - if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || - timeoutHelper.checkFailureTimeoutReached(e))) { + final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); - String msg = "Handshake timed out (failure detection timeout is reached) " + - "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']'; + if (!recoveryDesc.reserve()) { + U.closeQuiet(ch); - onException(msg, e); + onDone(); - if (log.isDebugEnabled()) - log.debug(msg); + return; + } - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + final Map<Integer, Object> meta = new HashMap<>(); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + final ConnectContext ctx = new ConnectContext(); - break; - } + ctx.expNodeId = node.id(); - assert !failureDetectionTimeoutEnabled(); + ctx.tcpClientFut = this; - onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + - ", addr=" + addr + ']', e); + ctx.connIdx = connIdx; - if (log.isDebugEnabled()) - log.debug( - "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + - ", addr=" + addr + ", err=" + e + ']'); + meta.put(CONN_CTX_META_KEY, ctx); - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { - if (log.isDebugEnabled()) - log.debug("Handshake timedout (will stop attempts to perform the handshake) " + - "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + - ", attempt=" + attempt + ", reconCnt=" + reconCnt + - ", err=" + e.getMessage() + ", addr=" + addr + ']'); + meta.put(RECOVERY_DESC_META_KEY, recoveryDesc); - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + final int attempt0 = attempt; - break; - } - else { - attempt++; + final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta, + U.currentTimeMillis() + timeoutChunk * (attempt0 > 0 ? attempt0 * 2 : 1)); - connTimeout0 *= 2; + addTimeoutObject(connTimeoutObj); - // Continue loop. - } - } - catch (Exception e) { - if (client != null) { - client.forceClose(); + boolean connect = ch.connect(addr); - client = null; - } + IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) { + GridNioSession ses = null; - onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e); + try { + ses = fut.get(); - if (log.isDebugEnabled()) - log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); + boolean canceled = connTimeoutObj.cancel(); - boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); + if (canceled) + removeTimeoutObject(connTimeoutObj); + else { + final GridNioSession ses0 = ses; - if (failureDetThrReached) - LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " + - "configuration property) [addr=" + addr + ", failureDetectionTimeout=" + - failureDetectionTimeout() + ']'); - else if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " + - "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); + Runnable clo = new Runnable() { + @Override public void run() { + GridNioFuture<Boolean> fut = nioSrvr.close(ses0); - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + final SocketTimeoutException e = new SocketTimeoutException("Connect timed " + + "(consider increasing 'connTimeout' configuration property) [addr=" + + currAddr + ", connTimeout=" + connTimeout + ']'); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut0) { + Runnable clo = new Runnable() { + @Override public void run() { + onError(e); + } + }; - // Reconnect for the second time, if connection is not established. - if (!failureDetThrReached && connectAttempts < 2 && - (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { - connectAttempts++; + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo); - continue; - } + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + }; - break; - } - } + commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo)); - if (conn) - break; - } + return; + } - if (client == null) { - assert errs != null; + int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout); - if (X.hasCause(errs, ConnectException.class)) - LT.warn(log, "Failed to connect to a remote node " + - "(make sure that destination node is alive and " + - "operating system firewall is disabled on local and remote hosts) " + - "[addrs=" + addrs + ']'); + long time = U.currentTimeMillis() + timeoutChunk1 * (attempt0 > 0 ? attempt0 * 2 : 1); - if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && - X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, - IgniteSpiOperationTimeoutException.class)) { - LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + - "cluster [" + - "rmtNode=" + node + - ", err=" + errs + - ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); + HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj = + new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time); - getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + - "rmtNode=" + node + - ", errs=" + errs + - ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); - } + ctx.handshakeTimeoutObj = handshakeTimeoutObj; - throw errs; - } + addTimeoutObject(handshakeTimeoutObj); + } + catch (final IgniteSpiOperationTimeoutException e) { + assert ses != null; - return client; - } + final GridNioSession ses0 = ses; - /** - * Performs handshake in timeout-safe way. - * - * @param client Client. - * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}. - * @param rmtNodeId Remote node. - * @param timeout Timeout for handshake. - * @param sslMeta Session meta. - * @param handshakeConnIdx Non null connection index if need send it in handshake. - * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. - * @return Handshake response. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - private <T> long safeHandshake( - T client, - @Nullable GridNioRecoveryDescriptor recovery, - UUID rmtNodeId, - long timeout, - GridSslMeta sslMeta, - @Nullable Integer handshakeConnIdx - ) throws IgniteCheckedException { - HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); + commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() { + @Override public void run() { + GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0); - addTimeoutObject(obj); + closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut0) { + Runnable clo = new Runnable() { + @Override public void run() { + onError(e); + } + }; - long rcvCnt = 0; + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo); - try { - if (client instanceof GridCommunicationClient) - ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); - else { - SocketChannel ch = (SocketChannel)client; + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + })); + } + catch (IgniteCheckedException e) { + connTimeoutObj.cancel(); - boolean success = false; + removeTimeoutObject(connTimeoutObj); - try { - BlockingSslHandler sslHnd = null; + recoveryDesc.release(); - ByteBuffer buf; + onError(e); + } + } + }; - if (isSslEnabled()) { - assert sslMeta != null; + nioSrvr.createSession(ch, meta, !connect, lsnr0); + } + catch (Exception e) { + onDone(e); + } + } - sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log); + /** + * @param e Exception. + */ + void onError(Exception e) { + if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) { + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { - if (!sslHnd.handshake()) - throw new IgniteCheckedException("SSL handshake is not completed."); + String msg = "Handshake timed out (failure detection timeout is reached) " + + "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']'; - ByteBuffer handBuff = sslHnd.applicationBuffer(); + onException(msg, e); - if (handBuff.remaining() < 17) { - buf = ByteBuffer.allocate(1000); + if (log.isDebugEnabled()) + log.debug(msg); - int read = ch.read(buf); + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and cache Transaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - buf.flip(); + tryConnect(true); - buf = sslHnd.decode(buf); - } - else - buf = handBuff; - } - else { - buf = ByteBuffer.allocate(17); + return; + } - for (int i = 0; i < 17; ) { - int read = ch.read(buf); + assert !failureDetectionTimeoutEnabled(); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); + long connTimeout0 = connTimeout * attempt; - i += read; - } - } + onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout + + ", addr=" + currAddr + ']', e); - UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1); + if (log.isDebugEnabled()) + log.debug( + "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout + + ", addr=" + currAddr + ", err=" + e + ']'); - if (!rmtNodeId.equals(rmtNodeId0)) - throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + - ", rcvd=" + rmtNodeId0 + ']'); - else if (log.isDebugEnabled()) - log.debug("Received remote node ID: " + rmtNodeId0); + if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will stop attempts to perform the handshake) " + + "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + + ", attempt=" + attempt + ", reconCnt=" + reconCnt + + ", err=" + e.getMessage() + ", addr=" + currAddr + ']'); - if (isSslEnabled()) { - assert sslHnd != null; + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and cache Transaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); - } - else - ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - ClusterNode locNode = getLocalNode(); + tryConnect(true); + } + else { + attempt++; - if (locNode == null) - throw new IgniteCheckedException("Local node has not been started or " + - "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); + tryConnect(false); // Reconnection on handshake timeout. + } + } + else { + onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e); - if (recovery != null) { - HandshakeMessage msg; + if (log.isDebugEnabled()) + log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']'); - int msgSize = 33; + boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); - if (handshakeConnIdx != null) { - msg = new HandshakeMessage2(locNode.id(), - recovery.incrementConnectCount(), - recovery.received(), - handshakeConnIdx); + if (failureDetThrReached) + LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " + + "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" + + failureDetectionTimeout() + ']'); + else if (X.hasCause(e, SocketTimeoutException.class)) + LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " + + "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']'); - msgSize += 4; - } - else { - msg = new HandshakeMessage(locNode.id(), - recovery.incrementConnectCount(), - recovery.received()); - } + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - if (log.isDebugEnabled()) - log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - buf = ByteBuffer.allocate(msgSize); + // Reconnect for the second time, if connection is not established. + int connectAttempts0; - buf.order(ByteOrder.nativeOrder()); + if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 && + (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { + connectAttempts = connectAttempts0 + 1; - boolean written = msg.writeTo(buf, null); + tryConnect(false); - assert written; + return; + } - buf.flip(); + tryConnect(true); + } - if (isSslEnabled()) { - assert sslHnd != null; + onDone(e); + } - ch.write(sslHnd.encrypt(buf)); - } - else - ch.write(buf); - } - else { - if (isSslEnabled()) { - assert sslHnd != null; + /** + * + */ + private Collection<InetSocketAddress> addrs() throws IgniteCheckedException { + Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); + Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); + Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType))); - } - else - ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); - } + boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null); + boolean isExtAddrsExist = !F.isEmpty(extAddrs); - if (recovery != null) { - if (log.isDebugEnabled()) - log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']'); + if (!isRmtAddrsExist && !isExtAddrsExist) + throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " + + "TCP communication addresses or mapped external addresses. Check configuration and make sure " + + "that you use the same communication SPI on all nodes. Remote node id: " + node.id()); - if (isSslEnabled()) { - assert sslHnd != null; + LinkedHashSet<InetSocketAddress> addrs; - buf = ByteBuffer.allocate(1000); + // Try to connect first on bound addresses. + if (isRmtAddrsExist) { + List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); - ByteBuffer decode = null; + boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); - buf.order(ByteOrder.nativeOrder()); + Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); - for (int i = 0; i < 9; ) { - int read = ch.read(buf); + addrs = new LinkedHashSet<>(addrs0); + } + else + addrs = new LinkedHashSet<>(); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + // Then on mapped external addresses. + if (isExtAddrsExist) + addrs.addAll(extAddrs); - buf.flip(); + return addrs; + } - decode = sslHnd.decode(buf); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpClientFuture.class, this); + } + } - i += decode.remaining(); + /** + * Performs handshake in timeout-safe way. + * + * @param client Client. + * @param rmtNodeId Remote node. + * @param timeout Timeout for handshake. + * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. + * @return Handshake response. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException { + HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout); - buf.clear(); - } + addTimeoutObject(obj); - rcvCnt = decode.getLong(1); + long rcvCnt = 0; - if (decode.limit() > 9) { - decode.position(9); + try { + if (client instanceof GridCommunicationClient) + ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); + else { + SocketChannel ch = (SocketChannel)client; - sslMeta.decodedBuffer(decode); - } + boolean success = false; - ByteBuffer inBuf = sslHnd.inputBuffer(); + try { + ByteBuffer buf; - if (inBuf.position() > 0) - sslMeta.encodedBuffer(inBuf); - } - else { - buf = ByteBuffer.allocate(9); + buf = ByteBuffer.allocate(17); - buf.order(ByteOrder.nativeOrder()); + for (int i = 0; i < 17; ) { + int read = ch.read(buf); - for (int i = 0; i < 9; ) { - int read = ch.read(buf); + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + i += read; + } - i += read; - } + UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1); - rcvCnt = buf.getLong(1); - } + if (!rmtNodeId.equals(rmtNodeId0)) + throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + + ", rcvd=" + rmtNodeId0 + ']'); + else if (log.isDebugEnabled()) + log.debug("Received remote node ID: " + rmtNodeId0); - if (log.isDebugEnabled()) - log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); + ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); - if (rcvCnt == -1) { - if (log.isDebugEnabled()) - log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']'); - } - else - success = true; - } - else - success = true; + ClusterNode locNode = getLocalNode(); + + if (locNode == null) + throw new IgniteCheckedException("Local node has not been started or " + + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); + + ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); + + success = true; } catch (IOException e) { if (log.isDebugEnabled()) @@ -3573,7 +3892,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private class CommunicationWorker extends IgniteSpiThread { /** */ - private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>(); + private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>(); /** * @param gridName Grid name. @@ -3588,10 +3907,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Tcp communication worker has been started."); while (!isInterrupted()) { - DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + + if (sesInfo != null) { + ConnectContext ctx; + + TcpClientFuture clientFut; + + switch (sesInfo.state) { + case RECONNECT: + processDisconnect(sesInfo); - if (disconnectData != null) - processDisconnect(disconnectData); + break; + + case RETRY: + Runnable clo = sesInfo.clo; + + assert clo != null; + + clo.run(); + + break; + + case READY: + ctx = sesInfo.ses.meta(CONN_CTX_META_KEY); + + assert ctx != null; + assert ctx.tcpClientFut != null; + + GridTcpNioCommunicationClient client = + new GridTcpNioCommunicationClient(ctx.connIdx, sesInfo.ses, log); + + ctx.tcpClientFut.onDone(client); + + break; + + case CLOSE: + ctx = sesInfo.ses.meta(CONN_CTX_META_KEY); + + nioSrvr.close(sesInfo.ses); + + if (ctx != null && (clientFut = ctx.tcpClientFut) != null) { + if (sesInfo.err != null) + clientFut.onError(sesInfo.err); + else + clientFut.onDone(); + } + + break; + } + } else processIdle(); } @@ -3753,51 +4118,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * @param sesInfo Disconnected session information. */ - private void processDisconnect(DisconnectedSessionInfo sesInfo) { - GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; + private void processDisconnect(final SessionInfo sesInfo) { + assert sesInfo.state == SessionState.RECONNECT : sesInfo.state; + + final GridNioRecoveryDescriptor recoveryDesc = sesInfo.ses.outRecoveryDescriptor(); - ClusterNode node = recoveryDesc.node(); + assert recoveryDesc != null; + + final ClusterNode node = recoveryDesc.node(); if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) return; - try { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + + int connIdx = sesInfo.connIdx; - GridCommunicationClient client = reserveClient(node, sesInfo.connIdx); + final GridCommunicationClient client = nodeClient(node.id(), connIdx); + if (client != null && client.reserve()) client.release(); - } - catch (IgniteCheckedException | IgniteException e) { - try { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + else { + if (client != null) + clients.remove(node.id(), client); - addProcessDisconnectRequest(sesInfo); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + final IgniteInternalFuture<GridCommunicationClient> fut = reserveClient(node, connIdx); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) { + try { + GridCommunicationClient client = fut0.get(); + + client.release(); + } + catch (IgniteCheckedException e) { + try { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && + getSpiContext().pingNode(node.id())) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); + addSessionStateChangeRequest(sesInfo); + } + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + onException("Recovery reconnect failed, node left [rmtNode=" + + recoveryDesc.node().id() + "]", e); + } + } + catch (IgniteClientDisconnectedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node, client disconnected."); + } + } } - } - catch (IgniteClientDisconnectedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node, client disconnected."); -
<TRUNCATED>
