ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c624a81 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c624a81 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c624a81 Branch: refs/heads/ignite-4154 Commit: 8c624a81e9288d3ea7a428fdf68e780186abcb9c Parents: 63c9727 Author: sboikov <[email protected]> Authored: Wed Nov 2 13:15:47 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 2 13:15:47 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 93 ++++++++------------ .../TcpDiscoveryNodeAddFinishedMessage.java | 11 +++ .../messages/TcpDiscoveryNodeAddedMessage.java | 7 ++ 3 files changed, 54 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0277061..e182177 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl { } nodeAddedMsg.topology(topToSnd); - nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId); + nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId); Map<Long, Collection<ClusterNode>> hist; @@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; if (addFinishMsg.clientDiscoData() != null) { + addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); + + msg = addFinishMsg; + Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData(); Set<UUID> replaced = null; @@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) { addFinishMsg.clientDiscoData(null); + addFinishMsg.clientNodeAttributes(null); break; } @@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl { private static final int MAX = 1024; /** Pending messages. */ - private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2); /** Processed custom message IDs. */ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2); @@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void add(TcpDiscoveryAbstractMessage msg) { - msgs.add(msg); + msgs.put(msg.id(), msg); while (msgs.size() > MAX) { - TcpDiscoveryAbstractMessage polled = msgs.poll(); + TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next()); assert polled != null; @@ -2135,13 +2141,13 @@ class ServerImpl extends TcpDiscoveryImpl { ) { this.msgs.clear(); - if (msgs != null) - this.msgs.addAll(msgs); + if (msgs != null) { + for (TcpDiscoveryAbstractMessage msg : msgs) + this.msgs.put(msg.id(), msg); + } this.discardId = discardId; this.customDiscardId = customDiscardId; - - cleanup(); } /** @@ -2153,51 +2159,24 @@ class ServerImpl extends TcpDiscoveryImpl { void discard(IgniteUuid id, boolean custom) { if (custom) customDiscardId = id; - else + else { discardId = id; - cleanup(); - } + TcpDiscoveryAbstractMessage msg = msgs.get(id); - /** - * - */ - void cleanup() { -// Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); -// -// boolean skipMsg = discardId != null; -// boolean skipCustomMsg = customDiscardId != null; -// -// while (msgIt.hasNext()) { -// TcpDiscoveryAbstractMessage msg0 = msgIt.next(); -// -// if (msg0 instanceof TcpDiscoveryCustomEventMessage) { -// if (skipCustomMsg) { -// assert customDiscardId != null; -// -// if (F.eq(customDiscardId, msg0.id())) -// skipCustomMsg = false; -// else -// msgIt.remove(); -// -// continue; -// } -// } -// else { -// if (skipMsg) { -// assert discardId != null; -// -// if (F.eq(discardId, msg0.id())) -// skipMsg = false; -// else -// msgIt.remove(); -// -// continue; -// } -// } -// -// break; -// } + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg; + + msg0.oldNodesDiscoveryData(null); + msg0.newNodeDiscoveryData(null); + } + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg; + + msg0.clientDiscoData(null); + msg0.clientNodeAttributes(null); + } + } } /** @@ -2220,7 +2199,7 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean skipCustomMsg = customDiscardId != null; /** Internal iterator. */ - private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); + private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator(); /** Next message. */ private TcpDiscoveryAbstractMessage next; @@ -2838,7 +2817,7 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { long tstamp = U.currentTimeMillis(); - prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, + prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(), pendingMsgs.discardId, pendingMsgs.customDiscardId); if (timeoutHelper == null) @@ -2882,8 +2861,8 @@ class ServerImpl extends TcpDiscoveryImpl { msg = new TcpDiscoveryStatusCheckMessage(locNode, null); } else - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(), + pendingMsgs.discardId, pendingMsgs.customDiscardId); try { long tstamp = U.currentTimeMillis(); @@ -3046,8 +3025,8 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog(msg, "Pending messages will be resent to local node"); for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { - prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(), + pendingMsgs.discardId, pendingMsgs.customDiscardId); pendingMsg.senderNodeId(locNodeId); @@ -3107,7 +3086,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (pendingMsgs.msgs.isEmpty()) return false; - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) { if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg; http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 1b99a56..80f4565 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess } /** + * @param msg Message. + */ + public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { + super(msg); + + nodeId = msg.nodeId; + clientDiscoData = msg.clientDiscoData; + clientNodeAttrs = msg.clientNodeAttrs; + } + + /** * Gets ID of the node added. * * @return ID of the node added. http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index bd52c04..7b8e5c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -229,6 +229,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param newNodeDiscoData Discovery data from new node. + */ + public void newNodeDiscoveryData(Map<Integer, byte[]> newNodeDiscoData) { + this.newNodeDiscoData = newNodeDiscoData; + } + + /** * @return Discovery data from old nodes. */ public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {
