Repository: ignite
Updated Branches:
  refs/heads/ignite-4154 2825efcdf -> 8cce0a32e


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cce0a32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cce0a32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cce0a32

Branch: refs/heads/ignite-4154
Commit: 8cce0a32e744357ce1dc2304d2abab87baf20fe0
Parents: 2825efc
Author: sboikov <[email protected]>
Authored: Tue Nov 1 14:02:49 2016 +0300
Committer: sboikov <[email protected]>
Committed: Tue Nov 1 14:02:49 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 76 +++++++++++++++++++-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  7 ++
 2 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3acab40..d7b4d09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1872,7 +1872,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             assert spi.ensured(msg) && msg.verified() : msg;
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                TcpDiscoveryNodeAddedMessage addedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+                TcpDiscoveryNodeAddedMessage addedMsg =
+                    new 
TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+                msg = addedMsg;
 
                 TcpDiscoveryNode node = addedMsg.node();
 
@@ -1890,12 +1893,83 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     addedMsg.clientTopology(top);
                 }
+
+                // Do not need this data for client reconnect.
+                addedMsg.oldNodesDiscoveryData(null);
+            }
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                TcpDiscoveryNodeAddFinishedMessage addFinishMsg = 
(TcpDiscoveryNodeAddFinishedMessage)msg;
+
+                if (addFinishMsg.clientDiscoData() != null) {
+                    Map<UUID, Map<Integer, byte[]>> discoData = 
addFinishMsg.clientDiscoData();
+
+                    Set<UUID> replaced = null;
+
+                    for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+                        if (msg0 instanceof 
TcpDiscoveryNodeAddFinishedMessage) {
+                            Map<UUID, Map<Integer, byte[]>> existingDiscoData =
+                                
((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
+
+                            // Check if already stored message contains the 
same data to do not store copies multiple times.
+                            if (existingDiscoData != null) {
+                                for (Map.Entry<UUID, Map<Integer, byte[]>> e : 
discoData.entrySet()) {
+                                    UUID nodeId = e.getKey();
+
+                                    if (F.contains(replaced, nodeId))
+                                        continue;
+
+                                    Map<Integer, byte[]> existingData = 
existingDiscoData.get(e.getKey());
+
+                                    if (existingData != null && 
mapsEqual(e.getValue(), existingData)) {
+                                        e.setValue(existingData);
+
+                                        if (replaced == null)
+                                            replaced = new HashSet<>();
+
+                                        boolean add = replaced.add(nodeId);
+
+                                        assert add;
+
+                                        if (replaced.size() == 
discoData.size())
+                                            break;
+                                    }
+                                }
+
+                                if (replaced != null && replaced.size() == 
discoData.size())
+                                    break;
+                            }
+                        }
+                    }
+                }
             }
 
             msgs.add(msg);
         }
 
         /**
+         * @param m1 Map 1.
+         * @param m2 Map 2.
+         * @return {@code True} if maps contain the same data.
+         */
+        private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, 
byte[]> m2) {
+            if (m1 == m2)
+                return true;
+
+            if (m1.size() == m2.size()) {
+                for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+                    byte[] data = m2.get(e.getKey());
+
+                    if (!Arrays.equals(e.getValue(), data))
+                        return false;
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Gets messages starting from provided ID (exclusive). If such
          * message is not found, {@code null} is returned (this indicates
          * a failure condition when it was already removed from queue).

http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 7f7134f..bd52c04 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -236,6 +236,13 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param oldNodesDiscoData Discovery data from old nodes.
+     */
+    public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> 
oldNodesDiscoData) {
+        this.oldNodesDiscoData = oldNodesDiscoData;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param discoData Discovery data to add.
      */

Reply via email to