Repository: ignite
Updated Branches:
  refs/heads/ignite-3477-master 7b0eb4181 -> d758a82a7


IGNITE-3477 - Fixing SSL failover tests


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d758a82a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d758a82a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d758a82a

Branch: refs/heads/ignite-3477-master
Commit: d758a82a72f485f374c2623d213156cb487d0beb
Parents: 7b0eb41
Author: Alexey Goncharuk <[email protected]>
Authored: Fri Mar 31 14:16:31 2017 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Fri Mar 31 14:16:31 2017 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 101 +++++++++++--------
 1 file changed, 59 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d758a82a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2a7c16a..e13438c 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -475,6 +475,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
 
+                if (log.isDebugEnabled())
+                    log.debug("Received handshake message [locNodeId=" + 
locNode.id() + ", rmtNodeId=" + sndId +
+                        ", msg=" + msg0 + ']');
+
                 if (usePairedConnections(rmtNode)) {
                     final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode, connKey);
 
@@ -569,7 +573,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         if (log.isDebugEnabled())
                             log.debug("Received incoming connection from 
remote node " +
-                                "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
+                                "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved +
+                                ", recovery=" + recoveryDesc + ']');
 
                         if (reserved) {
                             try {
@@ -1768,43 +1773,11 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         }
 
         if (connectionsPerNode > 1) {
-            int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 
0);
-
-            switch (idxMode) {
-                case 0: {
-                    connPlc = new ConnectionPolicy() {
-                        @Override public int connectionIndex() {
-                            return (int)(Thread.currentThread().getId() % 
connectionsPerNode);
-                        }
-                    };
-
-                    break;
-                }
-
-                case 1: {
-                    connPlc = new ConnectionPolicy() {
-                        @Override public int connectionIndex() {
-                            Integer threadIdx = threadConnIdx.get();
-
-                            if (threadIdx != null)
-                                return threadIdx;
-
-                            for (;;) {
-                                int idx = connIdx.get();
-                                int nextIdx = idx == connectionsPerNode - 1 ? 
0 : idx + 1;
-
-                                if (connIdx.compareAndSet(idx, nextIdx)) {
-                                    threadConnIdx.set(idx);
-
-                                    return idx;
-                                }
-                            }
-                        }
-                    };
-
-                    break;
+            connPlc = new ConnectionPolicy() {
+                @Override public int connectionIndex() {
+                    return (int)(U.safeAbs(Thread.currentThread().getId()) % 
connectionsPerNode);
                 }
-            }
+            };
         }
         else {
             connPlc = new ConnectionPolicy() {
@@ -3193,7 +3166,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
 
                         if (log.isDebugEnabled())
-                            log.debug("Write handshake message [rmtNode=" + 
rmtNodeId + ", msg=" + msg + ']');
+                            log.debug("Writing handshake message [locNodeId=" 
+ locNode.id() +
+                                ", rmtNode=" + rmtNodeId + ", msg=" + msg + 
']');
 
                         buf = ByteBuffer.allocate(msgSize);
 
@@ -3232,7 +3206,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             buf = ByteBuffer.allocate(1000);
 
-                            ByteBuffer decode = null;
+                            ByteBuffer decode = ByteBuffer.allocate(2 * 
buf.capacity());
 
                             buf.order(ByteOrder.nativeOrder());
 
@@ -3245,13 +3219,17 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
 
                                 buf.flip();
 
-                                decode = sslHnd.decode(buf);
+                                ByteBuffer decode0 = sslHnd.decode(buf);
 
-                                i += decode.remaining();
+                                i += decode0.remaining();
+
+                                decode = appendAndResizeIfNeeded(decode, 
decode0);
 
                                 buf.clear();
                             }
 
+                            decode.flip();
+
                             rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
 
                             if (decode.limit() > 
RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
@@ -3340,6 +3318,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * @param target Target buffer to append to.
+     * @param src Source buffer to get data.
+     * @return Original or expanded buffer.
+     */
+    private ByteBuffer appendAndResizeIfNeeded(ByteBuffer target, ByteBuffer 
src) {
+        if (target.remaining() < src.remaining()) {
+            int newSize = Math.max(target.capacity() * 2, target.capacity() + 
src.remaining());
+
+            ByteBuffer tmp = ByteBuffer.allocate(newSize);
+
+            tmp.order(target.order());
+
+            target.flip();
+
+            tmp.put(target);
+
+            target = tmp;
+        }
+
+        target.put(src);
+
+        return target;
+    }
+
+    /**
      * Stops service threads to simulate node failure.
      *
      * FOR TEST PURPOSES ONLY!!!
@@ -3413,6 +3416,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
 
         if (recovery == null) {
+            if (log.isDebugEnabled())
+                log.debug("Missing recovery descriptor for the node (will 
create a new one) " +
+                    "[locNodeId=" + getLocalNode().id() +
+                    ", key=" + key + ", rmtNode=" + node + ']');
+
             int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
 
             int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : 
(maxSize * 128);
@@ -3420,8 +3428,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
                 recovery = new GridNioRecoveryDescriptor(pairedConnections, 
queueLimit, node, log));
 
-            if (old != null)
+            if (old != null) {
                 recovery = old;
+
+                if (log.isDebugEnabled())
+                    log.debug("Will use existing recovery descriptor: " + 
recovery);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Initialized recovery descriptor [desc=" + 
recovery + ", maxSize=" + maxSize +
+                        ", queueLimit=" + queueLimit + ']');
+            }
         }
 
         return recovery;

Reply via email to