ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17b82918 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17b82918 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17b82918 Branch: refs/heads/ignite-4154-2 Commit: 17b82918ad37c19fd6574ee1b5870c25fd9d540b Parents: d4568ff Author: sboikov <[email protected]> Authored: Wed Nov 2 07:31:07 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 2 07:31:07 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 49 +++++++++++++------- .../TcpDiscoveryNodeAddFinishedMessage.java | 11 +++++ 2 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index ee58421..9179ddb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl { } nodeAddedMsg.topology(topToSnd); - nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId); + nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId); Map<Long, Collection<ClusterNode>> hist; @@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; if (addFinishMsg.clientDiscoData() != null) { + addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); + + msg = addFinishMsg; + Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData(); Set<UUID> replaced = null; @@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) { addFinishMsg.clientDiscoData(null); + addFinishMsg.clientNodeAttributes(null); break; } @@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl { private static final int MAX = 1024; /** Pending messages. */ - private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2); /** Processed custom message IDs. */ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2); @@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void add(TcpDiscoveryAbstractMessage msg) { - msgs.add(msg); + msgs.put(msg.id(), msg); while (msgs.size() > MAX) { - TcpDiscoveryAbstractMessage polled = msgs.poll(); + TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next()); assert polled != null; @@ -2135,8 +2141,10 @@ class ServerImpl extends TcpDiscoveryImpl { ) { this.msgs.clear(); - if (msgs != null) - this.msgs.addAll(msgs); + if (msgs != null) { + for (TcpDiscoveryAbstractMessage msg : msgs) + this.msgs.put(msg.id(), msg); + } this.discardId = discardId; this.customDiscardId = customDiscardId; @@ -2148,21 +2156,26 @@ class ServerImpl extends TcpDiscoveryImpl { * @param id Discarded message ID. * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}. */ - void discard(IgniteUuid id, boolean custom, boolean cleanup) { + void discard(IgniteUuid id, boolean custom) { if (custom) customDiscardId = id; else discardId = id; - if (cleanup) - cleanup(); + cleanup(); } /** * */ void cleanup() { - Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); + if (discardId != null && !msgs.containsKey(discardId)) + return; + + if (customDiscardId != null && !msgs.containsKey(customDiscardId)) + return; + + Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator(); boolean skipMsg = discardId != null; boolean skipCustomMsg = customDiscardId != null; @@ -2219,7 +2232,7 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean skipCustomMsg = customDiscardId != null; /** Internal iterator. */ - private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); + private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator(); /** Next message. */ private TcpDiscoveryAbstractMessage next; @@ -2837,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { long tstamp = U.currentTimeMillis(); - prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, + prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(), pendingMsgs.discardId, pendingMsgs.customDiscardId); if (timeoutHelper == null) @@ -2881,8 +2894,8 @@ class ServerImpl extends TcpDiscoveryImpl { msg = new TcpDiscoveryStatusCheckMessage(locNode, null); } else - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(), + pendingMsgs.discardId, pendingMsgs.customDiscardId); try { long tstamp = U.currentTimeMillis(); @@ -3045,8 +3058,8 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog(msg, "Pending messages will be resent to local node"); for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { - prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(), + pendingMsgs.discardId, pendingMsgs.customDiscardId); pendingMsg.senderNodeId(locNodeId); @@ -3106,7 +3119,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (pendingMsgs.msgs.isEmpty()) return false; - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) { if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg; @@ -4933,7 +4946,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified()) - pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED); + pendingMsgs.discard(msgId, msg.customMessageDiscard()); if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 1b99a56..80f4565 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess } /** + * @param msg Message. + */ + public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { + super(msg); + + nodeId = msg.nodeId; + clientDiscoData = msg.clientDiscoData; + clientNodeAttrs = msg.clientNodeAttrs; + } + + /** * Gets ID of the node added. * * @return ID of the node added.
