Repository: ignite Updated Branches: refs/heads/ignite-3477-master 7b0eb4181 -> d758a82a7
IGNITE-3477 - Fixing SSL failover tests Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d758a82a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d758a82a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d758a82a Branch: refs/heads/ignite-3477-master Commit: d758a82a72f485f374c2623d213156cb487d0beb Parents: 7b0eb41 Author: Alexey Goncharuk <[email protected]> Authored: Fri Mar 31 14:16:31 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Mar 31 14:16:31 2017 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 101 +++++++++++-------- 1 file changed, 59 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d758a82a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2a7c16a..e13438c 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -475,6 +475,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter HandshakeMessage msg0 = (HandshakeMessage)msg; + if (log.isDebugEnabled()) + log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId + + ", msg=" + msg0 + ']'); + if (usePairedConnections(rmtNode)) { final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); @@ -569,7 +573,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + - "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + + ", recovery=" + recoveryDesc + ']'); if (reserved) { try { @@ -1768,43 +1773,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } if (connectionsPerNode > 1) { - int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0); - - switch (idxMode) { - case 0: { - connPlc = new ConnectionPolicy() { - @Override public int connectionIndex() { - return (int)(Thread.currentThread().getId() % connectionsPerNode); - } - }; - - break; - } - - case 1: { - connPlc = new ConnectionPolicy() { - @Override public int connectionIndex() { - Integer threadIdx = threadConnIdx.get(); - - if (threadIdx != null) - return threadIdx; - - for (;;) { - int idx = connIdx.get(); - int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1; - - if (connIdx.compareAndSet(idx, nextIdx)) { - threadConnIdx.set(idx); - - return idx; - } - } - } - }; - - break; + connPlc = new ConnectionPolicy() { + @Override public int connectionIndex() { + return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode); } - } + }; } else { connPlc = new ConnectionPolicy() { @@ -3193,7 +3166,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } if (log.isDebugEnabled()) - log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); + log.debug("Writing handshake message [locNodeId=" + locNode.id() + + ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); buf = ByteBuffer.allocate(msgSize); @@ -3232,7 +3206,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf = ByteBuffer.allocate(1000); - ByteBuffer decode = null; + ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity()); buf.order(ByteOrder.nativeOrder()); @@ -3245,13 +3219,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf.flip(); - decode = sslHnd.decode(buf); + ByteBuffer decode0 = sslHnd.decode(buf); - i += decode.remaining(); + i += decode0.remaining(); + + decode = appendAndResizeIfNeeded(decode, decode0); buf.clear(); } + decode.flip(); + rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE); if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) { @@ -3340,6 +3318,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * @param target Target buffer to append to. + * @param src Source buffer to get data. + * @return Original or expanded buffer. + */ + private ByteBuffer appendAndResizeIfNeeded(ByteBuffer target, ByteBuffer src) { + if (target.remaining() < src.remaining()) { + int newSize = Math.max(target.capacity() * 2, target.capacity() + src.remaining()); + + ByteBuffer tmp = ByteBuffer.allocate(newSize); + + tmp.order(target.order()); + + target.flip(); + + tmp.put(target); + + target = tmp; + } + + target.put(src); + + return target; + } + + /** * Stops service threads to simulate node failure. * * FOR TEST PURPOSES ONLY!!! @@ -3413,6 +3416,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery = recoveryDescs.get(key); if (recovery == null) { + if (log.isDebugEnabled()) + log.debug("Missing recovery descriptor for the node (will create a new one) " + + "[locNodeId=" + getLocalNode().id() + + ", key=" + key + ", rmtNode=" + node + ']'); + int maxSize = Math.max(msgQueueLimit, ackSndThreshold); int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128); @@ -3420,8 +3428,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log)); - if (old != null) + if (old != null) { recovery = old; + + if (log.isDebugEnabled()) + log.debug("Will use existing recovery descriptor: " + recovery); + } + else { + if (log.isDebugEnabled()) + log.debug("Initialized recovery descriptor [desc=" + recovery + ", maxSize=" + maxSize + + ", queueLimit=" + queueLimit + ']'); + } } return recovery;
