Repository: ignite Updated Branches: refs/heads/ignite-4154 bd28b16ac -> 2825efcdf
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2825efcd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2825efcd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2825efcd Branch: refs/heads/ignite-4154 Commit: 2825efcdfa947f5d6f7eb2c9fa975a5e04c164dd Parents: bd28b16 Author: sboikov <[email protected]> Authored: Tue Nov 1 11:27:04 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 1 11:27:04 2016 +0300 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeBatch.java | 7 ++++++ .../processors/cache/GridCacheProcessor.java | 5 ++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 1 + .../messages/TcpDiscoveryNodeAddedMessage.java | 26 ++++++++++++++++++-- 4 files changed, 35 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2825efcd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index e10e5aa..4dcff9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -62,6 +62,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** + * @param id Message ID. + */ + public void id(IgniteUuid id) { + this.id = id; + } + + /** * @return Collection of change requests. */ public Collection<DynamicCacheChangeRequest> requests() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2825efcd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index fd6abbd..5e777fd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1958,8 +1958,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.template(true); - req.deploymentId(desc.deploymentId()); - reqs.add(req); } @@ -1972,6 +1970,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { batch.clientReconnect(reconnect); + // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. + batch.id(null); + return batch; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2825efcd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 116300b..3acab40 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2031,6 +2031,7 @@ class ServerImpl extends TcpDiscoveryImpl { * * @param msgs Message. * @param discardId Discarded message ID. + * @param customDiscardId Discarded custom event message ID. */ void reset( @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, http://git-wip-us.apache.org/repos/asf/ignite/blob/2825efcd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 6f8e14e..7f7134f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -17,7 +17,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -240,8 +242,28 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) { // Old nodes disco data may be null if message // makes more than 1 pass due to stopping of the nodes in topology. - if (oldNodesDiscoData != null) - oldNodesDiscoData.put(nodeId, discoData); + if (oldNodesDiscoData != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> existingDataEntry : oldNodesDiscoData.entrySet()) { + Map<Integer, byte[]> existingData = existingDataEntry.getValue(); + + Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<Integer, byte[]> discoDataEntry = it.next(); + + byte[] curData = existingData.get(discoDataEntry.getKey()); + + if (Arrays.equals(curData, discoDataEntry.getValue())) + it.remove(); + } + + if (discoData.isEmpty()) + break; + } + + if (!discoData.isEmpty()) + oldNodesDiscoData.put(nodeId, discoData); + } } /**
