Repository: ignite Updated Branches: refs/heads/ignite-4154 2825efcdf -> 8cce0a32e
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cce0a32 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cce0a32 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cce0a32 Branch: refs/heads/ignite-4154 Commit: 8cce0a32e744357ce1dc2304d2abab87baf20fe0 Parents: 2825efc Author: sboikov <[email protected]> Authored: Tue Nov 1 14:02:49 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 1 14:02:49 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 76 +++++++++++++++++++- .../messages/TcpDiscoveryNodeAddedMessage.java | 7 ++ 2 files changed, 82 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3acab40..d7b4d09 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1872,7 +1872,10 @@ class ServerImpl extends TcpDiscoveryImpl { assert spi.ensured(msg) && msg.verified() : msg; if (msg instanceof TcpDiscoveryNodeAddedMessage) { - TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + TcpDiscoveryNodeAddedMessage addedMsg = + new TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + + msg = addedMsg; TcpDiscoveryNode node = addedMsg.node(); @@ -1890,12 +1893,83 @@ class ServerImpl extends TcpDiscoveryImpl { addedMsg.clientTopology(top); } + + // Do not need this data for client reconnect. + addedMsg.oldNodesDiscoveryData(null); + } + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; + + if (addFinishMsg.clientDiscoData() != null) { + Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData(); + + Set<UUID> replaced = null; + + for (TcpDiscoveryAbstractMessage msg0 : msgs) { + if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) { + Map<UUID, Map<Integer, byte[]>> existingDiscoData = + ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData(); + + // Check if already stored message contains the same data to do not store copies multiple times. + if (existingDiscoData != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) { + UUID nodeId = e.getKey(); + + if (F.contains(replaced, nodeId)) + continue; + + Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey()); + + if (existingData != null && mapsEqual(e.getValue(), existingData)) { + e.setValue(existingData); + + if (replaced == null) + replaced = new HashSet<>(); + + boolean add = replaced.add(nodeId); + + assert add; + + if (replaced.size() == discoData.size()) + break; + } + } + + if (replaced != null && replaced.size() == discoData.size()) + break; + } + } + } + } } msgs.add(msg); } /** + * @param m1 Map 1. + * @param m2 Map 2. + * @return {@code True} if maps contain the same data. + */ + private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) { + if (m1 == m2) + return true; + + if (m1.size() == m2.size()) { + for (Map.Entry<Integer, byte[]> e : m1.entrySet()) { + byte[] data = m2.get(e.getKey()); + + if (!Arrays.equals(e.getValue(), data)) + return false; + } + + return true; + } + + return false; + } + + /** * Gets messages starting from provided ID (exclusive). If such * message is not found, {@code null} is returned (this indicates * a failure condition when it was already removed from queue). http://git-wip-us.apache.org/repos/asf/ignite/blob/8cce0a32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 7f7134f..bd52c04 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -236,6 +236,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param oldNodesDiscoData Discovery data from old nodes. + */ + public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData) { + this.oldNodesDiscoData = oldNodesDiscoData; + } + + /** * @param nodeId Node ID. * @param discoData Discovery data to add. */
