http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index f13f1f2..937523c 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; @@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter; import org.apache.ignite.internal.util.nio.GridDirectParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; @@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient; import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient; -import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; -import org.apache.ignite.internal.util.nio.ssl.GridSslMeta; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -136,7 +134,6 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; /** * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses @@ -296,10 +293,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Connection index meta for session. */ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Recovery descriptor meta key. */ + private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Connection context meta key. */ + private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** * Default local port range (value is <tt>100</tt>). * See {@link #setLocalPortRange(int)} for details. @@ -334,6 +336,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Handshake message type. */ public static final short HANDSHAKE_MSG_TYPE = -3; + /** Ignite header message. */ + private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage(); + + /** Skip ack. For test purposes only. */ + private boolean skipAck; + /** */ private ConnectGateway connectGate; @@ -407,10 +415,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + outDesc.node().id() + ']'); - DisconnectedSessionInfo disconnectData = - new DisconnectedSessionInfo(outDesc, connId.connectionIndex()); + SessionInfo sesInfo = + new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT); - commWorker.addProcessDisconnectRequest(disconnectData); + commWorker.addSessionStateChangeRequest(sesInfo); } } else @@ -436,14 +444,69 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (msg instanceof NodeIdMessage) { sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); - connKey = new ConnectionKey(sndId, 0, -1); + + if (ses.remoteAddress() != null) { // Not shmem. + assert !ses.accepted(); + + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + assert ctx != null; + assert ctx.expNodeId != null; + + if (sndId.equals(ctx.expNodeId)) { + GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor(); + + assert recoveryDesc != null; + + long connCnt = recoveryDesc.incrementConnectCount(); + + connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt); + + final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey); + + assert old == null; + + ses.send(IGNITE_HEADER_MSG); + + ClusterNode locNode = getLocalNode(); + + if (locNode == null) { + commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE, + new IgniteCheckedException("Local node has not been started or " + + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'))); + + return; + } + + int handshakeConnIdx = connPlc.connectionIndex(); + + HandshakeMessage handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt, + recoveryDesc.received(), handshakeConnIdx); + + if (log.isDebugEnabled()) + log.debug("Write handshake message [rmtNode=" + sndId + + ", msg=" + handshakeMsg + ']'); + + ses.send(handshakeMsg); + } + else { + commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE, + new IgniteCheckedException("Remote node ID is not as expected [expected=" + + ctx.expNodeId + ", rcvd=" + sndId + ']'))); + } + + return; + } + else + connKey = new ConnectionKey(sndId, 0, -1); } else { assert msg instanceof HandshakeMessage : msg; HandshakeMessage msg0 = (HandshakeMessage)msg; - sndId = ((HandshakeMessage)msg).nodeId(); + sndId = msg0.nodeId(); + connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } @@ -484,30 +547,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (reserve) connectedNew(recoveryDesc, ses, true); else { - if (c.failed) { - ses.send(new RecoveryLastReceivedMessage(-1)); - - for (GridNioSession ses0 : nioSrvr.sessions()) { - ConnectionKey key0 = ses0.meta(CONN_IDX_META); - - if (ses0.accepted() && key0 != null && - key0.nodeId().equals(connKey.nodeId()) && - key0.connectionIndex() == connKey.connectionIndex() && - key0.connectCount() < connKey.connectCount()) - ses0.close(); - } + for (GridNioSession ses0 : nioSrvr.sessions()) { + ConnectionKey key0 = ses0.meta(CONN_IDX_META); + + if (ses0.accepted() && key0 != null && + key0.nodeId().equals(connKey.nodeId()) && + key0.connectionIndex() == connKey.connectionIndex() && + key0.connectCount() < connKey.connectCount()) + ses0.close(); } } } else { assert connKey.connectionIndex() >= 0 : connKey; - GridCommunicationClient[] curClients = clients.get(sndId); - - GridCommunicationClient oldClient = - curClients != null && connKey.connectionIndex() < curClients.length ? - curClients[connKey.connectionIndex()] : - null; + GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex()); boolean hasShmemClient = false; @@ -536,10 +590,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); if (oldFut == null) { - curClients = clients.get(sndId); - - oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? - curClients[connKey.connectionIndex()] : null; + oldClient = nodeClient(sndId, connKey.connectionIndex()); if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { @@ -583,7 +634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) { if (log.isDebugEnabled()) { log.debug("Received incoming connection from remote node while " + "connecting to this node, rejecting [locNode=" + locNode.id() + @@ -605,17 +656,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } - @Override public void onMessage(GridNioSession ses, Message msg) { + @Override public void onMessage(final GridNioSession ses, Message msg) { ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { - assert ses.accepted() : ses; - - if (!connectGate.tryEnter()) { + if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate. if (log.isDebugEnabled()) log.debug("Close incoming connection, failed to enter gateway."); - ses.close(); + try { + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + ses.close(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send last received message: " + e, e); + } + } + }); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } return; } @@ -624,7 +689,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati onFirstMessage(ses, msg); } finally { - connectGate.leave(); + if (ses.accepted()) + connectGate.leave(); } } else { @@ -636,13 +702,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (recovery != null) { RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; + long rcvCnt = msg0.received(); + if (log.isDebugEnabled()) { log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']'); } - recovery.ackReceived(msg0.received()); + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) { + HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj; + + Exception err = null; + + if (timeoutObj != null && !cancelHandshakeTimeout(timeoutObj)) { + err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " + + "(consider increasing 'connectionTimeout' configuration property)."); + } + + if (rcvCnt == -1 || err != null) { + if (ses.remoteAddress() != null) { + SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + } + else { + ctx.rcvCnt = rcvCnt; + + recovery.onHandshake(rcvCnt); + + nioSrvr.resend(ses); + + recovery.onConnected(); + + SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + } + else + recovery.ackReceived(rcvCnt); return; } @@ -660,7 +762,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ", rcvCnt=" + rcvCnt + ']'); } - ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); + if (!skipAck) + ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); recovery.lastAcknowledged(rcvCnt); } @@ -711,7 +814,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati assert connKey != null && connKey.connectionIndex() >= 0 : connKey; assert !usePairedConnections(node); - recovery.onHandshake(rcvCnt); + if (ses.accepted()) + recovery.onHandshake(rcvCnt); ses.inRecoveryDescriptor(recovery); ses.outRecoveryDescriptor(recovery); @@ -777,9 +881,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** */ private final ClusterNode rmtNode; - /** */ - private boolean failed; - /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. @@ -796,8 +897,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** {@inheritDoc} */ @Override public void apply(Boolean success) { try { - failed = !success; - if (success) { IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> msgFut) { @@ -918,10 +1017,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } else { try { - fut.onDone(); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); + } catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + } finally { + fut.onDone(); + + clientFuts.remove(connKey, fut); + + ses.close(); + } + } + }); } - finally { - clientFuts.remove(connKey, fut); + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); } } } @@ -1916,12 +2033,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati nioSrvr.dumpStats(); } - /** */ - private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>(); - - /** */ - private final AtomicInteger connIdx = new AtomicInteger(); - /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -2177,9 +2288,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati assert formatter != null; + UUID rmtNodeId = null; + ConnectionKey key = ses.meta(CONN_IDX_META); - return key != null ? formatter.writer(key.nodeId()) : null; + if (key != null) + rmtNodeId = key.nodeId(); + else { + ConnectContext ctx = ses.meta(CONN_CTX_META_KEY); + + if (ctx != null) + rmtNodeId = ctx.expNodeId; + } + + return key != null ? formatter.writer(rmtNodeId) : null; } }; @@ -2189,7 +2311,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() { @Override public boolean apply(Message msg) { - return msg instanceof RecoveryLastReceivedMessage; + return msg instanceof NotRecoverable; } }; @@ -2528,48 +2650,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati int connIdx = connPlc.connectionIndex(); + send(node, connIdx, msg, ackC); + } + } + + /** + * Try to send message. + */ + private void send(final ClusterNode node, + final int connIdx, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) { + final GridCommunicationClient client = nodeClient(node.id(), connIdx); + + if (client != null && client.reserve()) { try { - boolean retry; + send0(client, node, msg, ackC); + } + catch (IgniteCheckedException e) { + if (removeNodeClient(node.id(), client)) + client.forceClose(); - do { - client = reserveClient(node, connIdx); + throw new IgniteSpiException("Failed to send message to remote node: " + node, e); + } + } + else { + if (client != null) + removeNodeClient(node.id(), client); - UUID nodeId = null; + IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx); - if (!client.async()) - nodeId = node.id(); + clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + GridCommunicationClient client = null; - retry = client.sendMessage(nodeId, msg, ackC); + try { + client = fut.get(); - client.release(); + send0(client, node, msg, ackC); + } + catch (IgniteCheckedException e) { + LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id()); - if (!retry) - sentMsgsCnt.increment(); - else { - removeNodeClient(node.id(), client); + if (client != null && removeNodeClient(node.id(), client)) + client.forceClose(); + } + } + }); + } + } - ClusterNode node0 = getSpiContext().node(node.id()); + /** + * @param client Client. + * @param node Node. + * @param msg Message. + * @param ackC Ack closure. + */ + private void send0( + GridCommunicationClient client, + ClusterNode node, + Message msg, + IgniteInClosure<IgniteException> ackC + ) throws IgniteCheckedException { + assert client != null; - if (node0 == null) - throw new IgniteCheckedException("Failed to send message to remote node " + - "(node has left the grid): " + node.id()); - } + UUID nodeId = null; - client = null; - } - while (retry); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to send message to remote node: " + node, e); - } - finally { - if (client != null && removeNodeClient(node.id(), client)) - client.forceClose(); + if (!client.async()) + nodeId = node.id(); + + boolean retry = client.sendMessage(nodeId, msg, ackC); + + client.release(); + + if (!retry) + sentMsgsCnt.increment(); + else { + removeNodeClient(node.id(), client); + + ClusterNode node0 = getSpiContext().node(node.id()); + + if (node0 == null) { + U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id()); + + return; } + + send(node, client.connectionIndex(), msg, ackC); } } /** + * @param nodeId Node id. + * @param connIdx Connection index. + */ + private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) { + GridCommunicationClient[] curClients = clients.get(nodeId); + + return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null; + } + + /** * @param nodeId Node ID. * @param rmvClient Client to remove. * @return {@code True} if client was removed. @@ -2638,95 +2820,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param node Node to which client should be open. * @param connIdx Connection index. * @return The existing or just created client. - * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - assert node != null; - assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; + private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) { + GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); - UUID nodeId = node.id(); + tryReserveClient(node, connIdx, fut); - while (true) { - GridCommunicationClient[] curClients = clients.get(nodeId); + return fut; + } + + /** + * @param node Node. + * @param connIdx Connection index. + * @param fut Future. + */ + private void tryReserveClient( + final ClusterNode node, + final int connIdx, + final GridFutureAdapter<GridCommunicationClient> fut) + { + final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx); + + reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) { + try { + GridCommunicationClient client = fut0.get(); + + if (client != null) + fut.onDone(client); + else + tryReserveClient(node, connIdx, fut); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + } + }); + + try { + reserveFut.reserve(); + } + catch (Exception e) { + fut.onDone(e); + } + } + + /** + * + */ + private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> { + /** Node. */ + private final ClusterNode node; + + /** Connection index. */ + private final int connIdx; + + /** + * @param node Node. + */ + ReserveClientFuture(ClusterNode node, int connIdx) { + assert node != null; + assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; + + this.node = node; + this.connIdx = connIdx; + } + + /** + * + */ + void reserve() { + final UUID nodeId = node.id(); - GridCommunicationClient client = curClients != null && connIdx < curClients.length ? - curClients[connIdx] : null; + final GridCommunicationClient client = nodeClient(nodeId, connIdx); + + final GridFutureAdapter<GridCommunicationClient> connFut; if (client == null) { - if (stopping) - throw new IgniteSpiException("Node is stopping."); + if (stopping) { + onDone(new IgniteSpiException("Node is stopping.")); + + return; + } // Do not allow concurrent connects. - GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); + connFut = this; - ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); + final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut); + final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut); if (oldFut == null) { try { - GridCommunicationClient[] curClients0 = clients.get(nodeId); - - GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ? - curClients0[connIdx] : null; + GridCommunicationClient client0 = nodeClient(nodeId, connIdx); if (client0 == null) { - client0 = createNioClient(node, connIdx); - - if (client0 != null) { - addNodeClient(node, connIdx, client0); + IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx); - if (client0 instanceof GridTcpNioCommunicationClient) { - GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); - - if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { - if (log.isDebugEnabled()) - log.debug("Session was closed after client creation, will retry " + - "[node=" + node + ", client=" + client0 + ']'); + clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + try { + GridCommunicationClient client0 = fut.get(); + + if (client0 != null) { + addNodeClient(node, connIdx, client0); + + if (client0 instanceof GridTcpNioCommunicationClient) { + GridTcpNioCommunicationClient tcpClient = + ((GridTcpNioCommunicationClient)client0); + + if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { + if (log.isDebugEnabled()) + log.debug("Session was closed after client creation, " + + "will retry [node=" + node + ", client=" + client0 + ']'); + + client0 = null; + } + } + + if (client0 == null) { + clientFuts.remove(connKey, connFut); + + onDone(); + } + else if (client0.reserve()) { + clientFuts.remove(connKey, connFut); + + onDone(client0); + } + else { + clientFuts.remove(connKey, connFut); + + removeNodeClient(nodeId, client0); + + onDone(); + } + } + else { + final long currTime = U.currentTimeMillis(); + + addTimeoutObject(new IgniteSpiTimeoutObject() { + private final IgniteUuid id = IgniteUuid.randomUuid(); + + @Override public IgniteUuid id() { + return id; + } + + @Override public long endTime() { + return currTime + 200; + } + + @Override public void onTimeout() { + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, + new Runnable() { + @Override public void run() { + clientFuts.remove(connKey, connFut); + + onDone(); + } + }); + + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + } + catch (IgniteCheckedException e) { + clientFuts.remove(connKey, connFut); - client0 = null; + onDone(e); } } - } - else - U.sleep(200); + }); } + else { + assert connIdx == client0.connectionIndex() : client0; - fut.onDone(client0); + if (client0.reserve()) + onDone(client0); + else { + removeNodeClient(nodeId, client0); + + onDone(); + } + } } catch (Throwable e) { - fut.onDone(e); + connFut.onDone(e); if (e instanceof Error) throw (Error)e; } - finally { - clientFuts.remove(connKey, fut); - } } - else - fut = oldFut; + else { + oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) { + try { + GridCommunicationClient client0 = fut.get(); - client = fut.get(); + if (client0 == null) { + clientFuts.remove(connKey, oldFut); - if (client == null) - continue; + onDone(); + } + else if (client0.reserve()) { + clientFuts.remove(connKey, oldFut); - if (getSpiContext().node(nodeId) == null) { - if (removeNodeClient(nodeId, client)) - client.forceClose(); + onDone(client0); + } + else { + clientFuts.remove(connKey, oldFut); + + removeNodeClient(nodeId, client0); - throw new IgniteSpiException("Destination node is not in topology: " + node.id()); + onDone(); + } + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); } } + else { + assert connIdx == client.connectionIndex() : client; + + if (client.reserve()) + onDone(client); + else { + removeNodeClient(nodeId, client); - assert connIdx == client.connectionIndex() : client; + onDone(); + } + } + } - if (client.reserve()) - return client; - else - // Client has just been closed by idle worker. Help it and try again. - removeNodeClient(nodeId, client); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReserveClientFuture.class, this); } } @@ -2734,10 +3066,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param node Node to create client for. * @param connIdx Connection index. * @return Client. - * @throws IgniteCheckedException If failed. */ - @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2745,7 +3075,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ClusterNode locNode = getSpiContext().localNode(); if (locNode == null) - throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to create NIO client (local node is stopping)") + ); if (log.isDebugEnabled()) log.debug("Creating NIO client to node: " + node); @@ -2762,7 +3094,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isDebugEnabled()) log.debug("Shmem client created: " + client); - return client; + return new GridFinishedFuture<>(client); } catch (IgniteCheckedException e) { if (e.hasCause(IpcOutOfSystemResourcesException.class)) @@ -2773,21 +3105,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else if (log.isDebugEnabled()) log.debug("Failed to establish shared memory connection with local node (node has left): " + node.id()); + + return new GridFinishedFuture<>(e); } } - connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node, connIdx); - - if (log.isDebugEnabled()) - log.debug("TCP client created: " + client); - - return client; + return createTcpClient(node, connIdx); } - finally { - connectGate.leave(); + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } @@ -2836,12 +3164,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } try { - safeHandshake(client, - null, - node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), - null, - null); + safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2903,7 +3226,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - ClusterNode node = getSpiContext().node(id.nodeId); + ClusterNode node = getSpiContext().node(id.nodeId()); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2926,532 +3249,527 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * * @param node Remote node. * @param connIdx Connection index. - * @return Client. + * @return Client future. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); - Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); - Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - - boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null); - boolean isExtAddrsExist = !F.isEmpty(extAddrs); + protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { + TcpClientFuture fut = new TcpClientFuture(node, connIdx); - if (!isRmtAddrsExist && !isExtAddrsExist) - throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " + - "TCP communication addresses or mapped external addresses. Check configuration and make sure " + - "that you use the same communication SPI on all nodes. Remote node id: " + node.id()); + connectGate.enter(); - LinkedHashSet<InetSocketAddress> addrs; + fut.connect(); - // Try to connect first on bound addresses. - if (isRmtAddrsExist) { - List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); + fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() { + @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) { + connectGate.leave(); + } + }); - boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); + return fut; + } - Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); + /** + * @param timeoutObj Timeout object. + */ + private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) { + boolean cancelled = timeoutObj.cancel(); - addrs = new LinkedHashSet<>(addrs0); - } - else - addrs = new LinkedHashSet<>(); + if (cancelled) + removeTimeoutObject(timeoutObj); - // Then on mapped external addresses. - if (isExtAddrsExist) - addrs.addAll(extAddrs); + return cancelled; + } - Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size()); + /** + * + */ + private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> { + /** Node. */ + private final ClusterNode node; - for (InetSocketAddress addr : addrs) - allInetAddrs.add(addr.getAddress()); + /** Timeout helper. */ + private final IgniteSpiOperationTimeoutHelper timeoutHelper = + new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this); - List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); + /** Addresses. */ + private Collection<InetSocketAddress> addrs; - if (reachableInetAddrs.size() < allInetAddrs.size()) { - LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); + /** Addresses it. */ + private Iterator<InetSocketAddress> addrsIt; - for (InetSocketAddress addr : addrs) { - if (reachableInetAddrs.contains(addr.getAddress())) - addrs0.add(addr); - } - for (InetSocketAddress addr : addrs) { - if (!reachableInetAddrs.contains(addr.getAddress())) - addrs0.add(addr); - } + /** Current addresses. */ + private volatile InetSocketAddress currAddr; - addrs = addrs0; - } + /** Err. */ + private volatile IgniteCheckedException err; - if (log.isDebugEnabled()) - log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']'); + /** Connect attempts. */ + private volatile int connectAttempts; - boolean conn = false; - GridCommunicationClient client = null; - IgniteCheckedException errs = null; + /** Attempts. */ + private volatile int attempt; - int connectAttempts = 1; + /** Connection index. */ + private volatile int connIdx; - for (InetSocketAddress addr : addrs) { - long connTimeout0 = connTimeout; + /** + * @param node Node. + */ + TcpClientFuture(ClusterNode node, int connIdx) { + this.node = node; + this.connIdx = connIdx; + } - int attempt = 1; + /** + * Connects to remote node. + */ + void connect() { + try { + addrs = addrs(); + } + catch (IgniteCheckedException e) { + onDone(e); - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + return; + } - while (!conn) { // Reconnection on handshake timeout. - try { - SocketChannel ch = SocketChannel.open(); + addrsIt = addrs.iterator(); - ch.configureBlocking(true); + tryConnect(true); + } - ch.socket().setTcpNoDelay(tcpNoDelay); - ch.socket().setKeepAlive(true); + /** + * + */ + private void tryConnect(boolean next) { + if (next && !addrsIt.hasNext()) { + IgniteCheckedException err0 = err; - if (sockRcvBuf > 0) - ch.socket().setReceiveBufferSize(sockRcvBuf); + assert err0 != null; - if (sockSndBuf > 0) - ch.socket().setSendBufferSize(sockSndBuf); + UUID nodeId = node.id(); - if (getSpiContext().node(node.id()) == null) { - U.closeQuiet(ch); + if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && + X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, + IgniteSpiOperationTimeoutException.class)) + { + LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " + + "dropped from cluster [" + "rmtNode=" + node + ", err=" + err + + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']'); - throw new ClusterTopologyCheckedException("Failed to send message " + - "(node left topology): " + node); - } + getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " + + "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']'); + } - ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); + onDone(err0); - GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); + return; + } - if (!recoveryDesc.reserve()) { - U.closeQuiet(ch); + if (next) { + attempt = 0; - return null; - } + connectAttempts = 0; - long rcvCnt = -1; + currAddr = addrsIt.next(); + } - Map<Integer, Object> meta = new HashMap<>(); + InetSocketAddress addr = currAddr; - GridSslMeta sslMeta = null; + try { + final SocketChannel ch = SocketChannel.open(); - try { - ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout)); + ch.configureBlocking(false); - if (isSslEnabled()) { - meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta()); + ch.socket().setTcpNoDelay(tcpNoDelay); + ch.socket().setKeepAlive(true); - SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine(); + if (sockRcvBuf > 0) + ch.socket().setReceiveBufferSize(sockRcvBuf); - sslEngine.setUseClientMode(true); + if (sockSndBuf > 0) + ch.socket().setSendBufferSize(sockSndBuf); - sslMeta.sslEngine(sslEngine); - } + if (getSpiContext().node(node.id()) == null) { + U.closeQuiet(ch); - Integer handshakeConnIdx = connIdx; + onError(new ClusterTopologyCheckedException("Failed to send message " + + "(node left topology): " + node)); - rcvCnt = safeHandshake(ch, - recoveryDesc, - node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), - sslMeta, - handshakeConnIdx); + return; + } - if (rcvCnt == -1) - return null; - } - finally { - if (recoveryDesc != null && rcvCnt == -1) - recoveryDesc.release(); - } + final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); - try { - meta.put(CONN_IDX_META, connKey); + final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); - if (recoveryDesc != null) { - recoveryDesc.onHandshake(rcvCnt); + if (!recoveryDesc.reserve()) { + U.closeQuiet(ch); - meta.put(-1, recoveryDesc); - } + onDone(); - GridNioSession ses = nioSrvr.createSession(ch, meta).get(); + return; + } - client = new GridTcpNioCommunicationClient(connIdx, ses, log); + final Map<Integer, Object> meta = new HashMap<>(); - conn = true; - } - finally { - if (!conn) { - if (recoveryDesc != null) - recoveryDesc.release(); - } - } - } - catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { - if (client != null) { - client.forceClose(); + final ConnectContext ctx = new ConnectContext(); - client = null; - } + ctx.expNodeId = node.id(); - if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || - timeoutHelper.checkFailureTimeoutReached(e))) { + ctx.tcpClientFut = this; - String msg = "Handshake timed out (failure detection timeout is reached) " + - "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']'; + ctx.connIdx = connIdx; - onException(msg, e); + meta.put(CONN_CTX_META_KEY, ctx); - if (log.isDebugEnabled()) - log.debug(msg); + meta.put(RECOVERY_DESC_META_KEY, recoveryDesc); - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + final int attempt0 = attempt; - break; - } + final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta, + U.currentTimeMillis() + timeoutChunk * (1L << attempt0)); - assert !failureDetectionTimeoutEnabled(); + addTimeoutObject(connTimeoutObj); - onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + - ", addr=" + addr + ']', e); + boolean connect = ch.connect(addr); - if (log.isDebugEnabled()) - log.debug( - "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + - ", addr=" + addr + ", err=" + e + ']'); + IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) { + GridNioSession ses = null; - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { - if (log.isDebugEnabled()) - log.debug("Handshake timedout (will stop attempts to perform the handshake) " + - "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + - ", attempt=" + attempt + ", reconCnt=" + reconCnt + - ", err=" + e.getMessage() + ", addr=" + addr + ']'); + try { + ses = fut.get(); - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + boolean canceled = connTimeoutObj.cancel(); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + if (canceled) + removeTimeoutObject(connTimeoutObj); + else { + final GridNioSession ses0 = ses; - break; - } - else { - attempt++; + Runnable clo = new Runnable() { + @Override public void run() { + GridNioFuture<Boolean> fut = nioSrvr.close(ses0); - connTimeout0 *= 2; + final SocketTimeoutException e = new SocketTimeoutException("Connect timed " + + "(consider increasing 'connTimeout' configuration property) [addr=" + + currAddr + ", connTimeout=" + connTimeout + ']'); - // Continue loop. - } - } - catch (Exception e) { - if (client != null) { - client.forceClose(); + fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut0) { + Runnable clo = new Runnable() { + @Override public void run() { + onError(e); + } + }; - client = null; - } + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo); - onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e); + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + }; - if (log.isDebugEnabled()) - log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); + commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo)); - boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); + return; + } - if (failureDetThrReached) - LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " + - "configuration property) [addr=" + addr + ", failureDetectionTimeout=" + - failureDetectionTimeout() + ']'); - else if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " + - "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); + int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout); - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + - "Make sure that each ComputeTask and cache Transaction has a timeout set " + - "in order to prevent parties from waiting forever in case of network issues " + - "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + long time = U.currentTimeMillis() + timeoutChunk1 * (1L << attempt0); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj = + new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time); - // Reconnect for the second time, if connection is not established. - if (!failureDetThrReached && connectAttempts < 2 && - (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { - connectAttempts++; + ctx.handshakeTimeoutObj = handshakeTimeoutObj; - continue; - } + addTimeoutObject(handshakeTimeoutObj); + } + catch (final IgniteSpiOperationTimeoutException e) { + assert ses != null; - break; - } - } + final GridNioSession ses0 = ses; - if (conn) - break; - } + commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() { + @Override public void run() { + GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0); - if (client == null) { - assert errs != null; + closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut0) { + Runnable clo = new Runnable() { + @Override public void run() { + onError(e); + } + }; - if (X.hasCause(errs, ConnectException.class)) - LT.warn(log, "Failed to connect to a remote node " + - "(make sure that destination node is alive and " + - "operating system firewall is disabled on local and remote hosts) " + - "[addrs=" + addrs + ']'); + SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo); - if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && - X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, - IgniteSpiOperationTimeoutException.class)) { - LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + - "cluster [" + - "rmtNode=" + node + - ", err=" + errs + - ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); + commWorker.addSessionStateChangeRequest(sesInfo); + } + }); + } + })); + } + catch (IgniteCheckedException e) { + connTimeoutObj.cancel(); - getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + - "rmtNode=" + node + - ", errs=" + errs + - ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); - } + removeTimeoutObject(connTimeoutObj); - throw errs; + recoveryDesc.release(); + + onError(e); + } + } + }; + + nioSrvr.createSession(ch, meta, !connect, lsnr0); + } + catch (Exception e) { + onDone(e); + } } - return client; - } + /** + * @param e Exception. + */ + void onError(Exception e) { + if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) { + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { - /** - * Performs handshake in timeout-safe way. - * - * @param client Client. - * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}. - * @param rmtNodeId Remote node. - * @param timeout Timeout for handshake. - * @param sslMeta Session meta. - * @param handshakeConnIdx Non null connection index if need send it in handshake. - * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. - * @return Handshake response. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - private <T> long safeHandshake( - T client, - @Nullable GridNioRecoveryDescriptor recovery, - UUID rmtNodeId, - long timeout, - GridSslMeta sslMeta, - @Nullable Integer handshakeConnIdx - ) throws IgniteCheckedException { - HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); + String msg = "Handshake timed out (failure detection timeout is reached) " + + "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']'; - addTimeoutObject(obj); + onException(msg, e); - long rcvCnt = 0; + if (log.isDebugEnabled()) + log.debug(msg); - try { - if (client instanceof GridCommunicationClient) - ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); - else { - SocketChannel ch = (SocketChannel)client; + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and cache Transaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - boolean success = false; + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - try { - BlockingSslHandler sslHnd = null; + tryConnect(true); - ByteBuffer buf; + return; + } - if (isSslEnabled()) { - assert sslMeta != null; + assert !failureDetectionTimeoutEnabled(); - sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log); + long connTimeout0 = connTimeout * attempt; - if (!sslHnd.handshake()) - throw new IgniteCheckedException("SSL handshake is not completed."); + onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout + + ", addr=" + currAddr + ']', e); - ByteBuffer handBuff = sslHnd.applicationBuffer(); + if (log.isDebugEnabled()) + log.debug( + "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout + + ", addr=" + currAddr + ", err=" + e + ']'); - if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) { - buf = ByteBuffer.allocate(1000); + if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will stop attempts to perform the handshake) " + + "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + + ", attempt=" + attempt + ", reconCnt=" + reconCnt + + ", err=" + e.getMessage() + ", addr=" + currAddr + ']'); - int read = ch.read(buf); + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and cache Transaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - buf.flip(); + tryConnect(true); + } + else { + attempt++; - buf = sslHnd.decode(buf); - } - else - buf = handBuff; - } - else { - buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE); + tryConnect(false); // Reconnection on handshake timeout. + } + } + else { + onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e); - for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) { - int read = ch.read(buf); + if (log.isDebugEnabled()) + log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']'); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); + boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); - i += read; - } - } + if (failureDetThrReached) + LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " + + "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" + + failureDetectionTimeout() + ']'); + else if (X.hasCause(e, SocketTimeoutException.class)) + LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " + + "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']'); - UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE); + if (err == null) + err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - if (!rmtNodeId.equals(rmtNodeId0)) - throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + - ", rcvd=" + rmtNodeId0 + ']'); - else if (log.isDebugEnabled()) - log.debug("Received remote node ID: " + rmtNodeId0); + err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e)); - if (isSslEnabled()) { - assert sslHnd != null; + // Reconnect for the second time, if connection is not established. + int connectAttempts0; - ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); - } - else - ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); + if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 && + (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { + connectAttempts = connectAttempts0 + 1; - ClusterNode locNode = getLocalNode(); + tryConnect(false); - if (locNode == null) - throw new IgniteCheckedException("Local node has not been started or " + - "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); + return; + } - if (recovery != null) { - HandshakeMessage msg; + tryConnect(true); + } - int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE; + onDone(e); + } - if (handshakeConnIdx != null) { - msg = new HandshakeMessage2(locNode.id(), - recovery.incrementConnectCount(), - recovery.received(), - handshakeConnIdx); + /** + * + */ + private Collection<InetSocketAddress> addrs() throws IgniteCheckedException { + Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); + Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); + Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - msgSize += 4; - } - else { - msg = new HandshakeMessage(locNode.id(), - recovery.incrementConnectCount(), - recovery.received()); - } + boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null); + boolean isExtAddrsExist = !F.isEmpty(extAddrs); - if (log.isDebugEnabled()) - log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); + if (!isRmtAddrsExist && !isExtAddrsExist) + throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " + + "TCP communication addresses or mapped external addresses. Check configuration and make sure " + + "that you use the same communication SPI on all nodes. Remote node id: " + node.id()); - buf = ByteBuffer.allocate(msgSize); + LinkedHashSet<InetSocketAddress> addrs; - buf.order(ByteOrder.nativeOrder()); + // Try to connect first on bound addresses. + if (isRmtAddrsExist) { + List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); - boolean written = msg.writeTo(buf, null); + boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); - assert written; + Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); - buf.flip(); + addrs = new LinkedHashSet<>(addrs0); + } + else + addrs = new LinkedHashSet<>(); - if (isSslEnabled()) { - assert sslHnd != null; + // Then on mapped external addresses. + if (isExtAddrsExist) + addrs.addAll(extAddrs); - ch.write(sslHnd.encrypt(buf)); - } - else - ch.write(buf); - } - else { - if (isSslEnabled()) { - assert sslHnd != null; + Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size()); - ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType))); - } - else - ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); - } + for (InetSocketAddress addr : addrs) + allInetAddrs.add(addr.getAddress()); - if (recovery != null) { - if (log.isDebugEnabled()) - log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']'); + List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); + + if (reachableInetAddrs.size() < allInetAddrs.size()) { + LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); - if (isSslEnabled()) { - assert sslHnd != null; + for (InetSocketAddress addr : addrs) { + if (reachableInetAddrs.contains(addr.getAddress())) + addrs0.add(addr); + } + for (InetSocketAddress addr : addrs) { + if (!reachableInetAddrs.contains(addr.getAddress())) + addrs0.add(addr); + } - buf = ByteBuffer.allocate(1000); + addrs = addrs0; + } - ByteBuffer decode = null; + if (log.isDebugEnabled()) + log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']'); - buf.order(ByteOrder.nativeOrder()); + return addrs; + } - for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) { - int read = ch.read(buf); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpClientFuture.class, this); + } + } - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + /** + * Performs handshake in timeout-safe way. + * + * @param client Client. + * @param rmtNodeId Remote node. + * @param timeout Timeout for handshake. + * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. + * @return Handshake response. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException { + HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout); - buf.flip(); + addTimeoutObject(obj); - decode = sslHnd.decode(buf); + long rcvCnt = 0; - i += decode.remaining(); + try { + if (client instanceof GridCommunicationClient) + ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); + else { + SocketChannel ch = (SocketChannel)client; - buf.clear(); - } + boolean success = false; - rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE); + try { + ByteBuffer buf; - if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) { - decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE); + buf = ByteBuffer.allocate(17); - sslMeta.decodedBuffer(decode); - } + for (int i = 0; i < 17; ) { + int read = ch.read(buf); - ByteBuffer inBuf = sslHnd.inputBuffer(); + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); - if (inBuf.position() > 0) - sslMeta.encodedBuffer(inBuf); - } - else { - buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE); + i += read; + } - buf.order(ByteOrder.nativeOrder()); + UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE); - for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) { - int read = ch.read(buf); + if (!rmtNodeId.equals(rmtNodeId0)) + throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + + ", rcvd=" + rmtNodeId0 + ']'); + else if (log.isDebugEnabled()) + log.debug("Received remote node ID: " + rmtNodeId0); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); - i += read; - } + ClusterNode locNode = getLocalNode(); - rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE); - } + if (locNode == null) + throw new IgniteCheckedException("Local node has not been started or " + + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); - if (log.isDebugEnabled()) - log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); + ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); - if (rcvCnt == -1) { - if (log.isDebugEnabled()) - log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']'); - } - else - success = true; - } - else - success = true; + success = true; } catch (IOException e) { if (log.isDebugEnabled()) @@ -3815,7 +4133,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati */ private class CommunicationWorker extends IgniteSpiThread { /** */ - private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>(); + private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>(); /** * @param igniteInstanceName Ignite instance name. @@ -3830,10 +4148,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati log.debug("Tcp communication worker has been started."); while (!isInterrupted()) { - DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + + if (sesInfo != null) { + ConnectContext ctx; + + TcpClientFuture clientFut; + + switch (sesInfo.state) { + case RECONNECT: + processDisconnect(sesInfo); + + break; + + case RETRY: + Runnable clo = sesInfo.clo; + + assert clo != null; - if (disconnectData != null) - processDisconnect(disconnectData); + clo.run(); + + break; + + case READY: +
<TRUNCATED>
