Repository: ignite Updated Branches: refs/heads/ignite-4296 [created] d4477e845
ignite-4296 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4477e84 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4477e84 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4477e84 Branch: refs/heads/ignite-4296 Commit: d4477e8456731db6acb7e669dfc89de2de0341cc Parents: 7a47a01 Author: sboikov <sboi...@gridgain.com> Authored: Thu Nov 24 12:19:59 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Nov 24 13:26:26 2016 +0300 ---------------------------------------------------------------------- .../rendezvous/RendezvousAffinityFunction.java | 80 +++++++++---- .../discovery/GridDiscoveryManager.java | 114 ++----------------- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCacheUtils.java | 17 --- .../binary/CacheObjectBinaryProcessorImpl.java | 3 +- .../dht/GridClientPartitionTopology.java | 26 ++--- .../dht/GridDhtPartitionTopology.java | 8 +- .../dht/GridDhtPartitionTopologyImpl.java | 105 +++++++++-------- .../GridDhtPartitionsExchangeFuture.java | 49 +++++++- .../service/GridServiceProcessor.java | 4 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 41 +++++-- .../AbstractAffinityFunctionSelfTest.java | 2 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +- .../GridCachePartitionedAffinitySpreadTest.java | 7 +- 14 files changed, 227 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index ec12973..c76aae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -17,7 +17,6 @@ package org.apache.ignite.cache.affinity.rendezvous; -import java.io.ByteArrayOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Returns collection of nodes (primary first) for specified partition. + * + * @param d Message digest. + * @param part Partition. + * @param nodes Nodes. + * @param nodesHash Serialized nodes hashes. + * @param backups Number of backups. + * @param neighborhoodCache Neighborhood. + * @return Assignment. */ - public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, + public List<ClusterNode> assignPartition(MessageDigest d, + int part, + List<ClusterNode> nodes, + Map<ClusterNode, byte[]> nodesHash, + int backups, @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { if (nodes.size() <= 1) return nodes; - List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); + if (d == null) + d = digest.get(); - MessageDigest d = digest.get(); + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size()); - for (ClusterNode node : nodes) { - Object nodeHash = resolveNodeHash(node); + try { + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] nodeHashBytes = nodesHash.get(node); - byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + if (nodeHashBytes == null) { + Object nodeHash = resolveNodeHash(node); - out.write(U.intToBytes(part), 0, 4); // Avoid IOException. - out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. + byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + + // Add 4 bytes for partition bytes. + nodeHashBytes = new byte[nodeHashBytes0.length + 4]; + + System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); + + nodesHash.put(node, nodeHashBytes0); + } + + U.intToBytes(part, nodeHashBytes, 0); d.reset(); - byte[] bytes = d.digest(out.toByteArray()); + byte[] bytes = d.digest(nodeHashBytes); long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); lst.add(F.t(hash, node)); } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } Collections.sort(lst, COMPARATOR); @@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; + MessageDigest d = digest.get(); + + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size()); + for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), + List<ClusterNode> partAssignment = assignPartition(d, + i, + nodes, + nodesHash, + affCtx.backups(), neighborhoodCache); assignments.add(partAssignment); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index d24f900..488dabe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -42,7 +42,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Gets cache remote nodes for cache with given name. * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion()); - } - - /** - * Gets cache remote nodes for cache with given name. - * * @param topVer Topology version. * @return Collection of cache nodes. */ @@ -1664,33 +1653,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets alive remote server nodes with at least one cache configured. - * * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. + * @return Oldest alive server nodes with at least one cache configured. */ - public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion()); - } + @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { + DiscoCache cache = resolveDiscoCache(null, topVer); - /** - * Gets alive server nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. - */ - public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion()); - } + Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry(); - /** - * Gets alive nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of alive cache nodes. - */ - public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion()); + return e != null ? e.getKey() : null; } /** @@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; /** - * Cached alive remote nodes with caches. - */ - private final Collection<ClusterNode> aliveNodesWithCaches; - - /** * Cached alive server remote nodes with caches. */ - private final Collection<ClusterNode> aliveSrvNodesWithCaches; - - /** - * Cached alive remote server nodes with caches. - */ - private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; + private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches; /** * @param loc Local node. @@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; @@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - if (hasCaches) { - if (alive(node.id())) { - aliveNodesWithCaches.add(node); - - if (!CU.clientNode(node)) { - aliveSrvNodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - aliveRmtSrvNodesWithCaches.add(node); - } - } - } + if (hasCaches && alive(node.id()) && !CU.clientNode(node)) + aliveSrvNodesWithCaches.put(node, Boolean.TRUE); IgniteProductVersion nodeVer = U.productVersion(node); @@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all remote nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, rmtCacheNodes.get(cacheName)); - } - - /** * Gets all remote nodes that have at least one cache configured. * * @param topVer Topology version. @@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all alive remote server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtSrvNodesWithCaches); - } - - /** - * Gets all alive server nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { - return filter(topVer, aliveSrvNodesWithCaches); - } - - /** - * Gets all alive remote nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveNodesWithCaches(final long topVer) { - return filter(topVer, aliveNodesWithCaches); - } - - /** * Checks if cache with given name has at least one node with near cache enabled. * * @param cacheName Cache name. @@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { filterNodeMap(aliveRmtCacheNodes, leftNode); - aliveNodesWithCaches.remove(leftNode); aliveSrvNodesWithCaches.remove(leftNode); - aliveRmtSrvNodesWithCaches.remove(leftNode); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 503b334..5651e58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Partition refresh callback. */ private void refreshPartitions() { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { if (log.isDebugEnabled()) @@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null); } if (!cctx.kernalContext().clientNode() && updated) @@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) { - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null); cctx.affinity().checkRebalanceState(top, cacheId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 90e428c..d32f4c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -490,23 +490,6 @@ public class GridCacheUtils { } /** - * Gets oldest alive server node with at least one cache configured for specified topology version. - * - * @param ctx Context. - * @param topVer Maximum allowed topology version. - * @return Oldest alive cache server node. - */ - @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx, - AffinityTopologyVersion topVer) { - Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer); - - if (nodes.isEmpty()) - return null; - - return oldest(nodes); - } - - /** * @param nodes Nodes. * @return Oldest node for the given topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 568a4da..1d60c42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm assert !metaDataCache.context().affinityNode(); while (true) { - ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE); + ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldestSrvNode == null) break; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5efb317..f2ee758 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); assert oldest != null; @@ -549,7 +549,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -563,7 +563,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (node2part != null && node2part.compareTo(partMap) >= 0) { @@ -571,7 +571,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return null; + return false; } updateSeq.incrementAndGet(); @@ -634,7 +634,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); - return null; + return false; } finally { lock.writeLock().unlock(); @@ -643,7 +643,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -654,21 +654,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); - return null; + return false; } lock.writeLock().lock(); try { if (stopping) - return null; + return false; if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (exchId != null) @@ -688,7 +688,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); @@ -740,7 +740,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); @@ -761,7 +761,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -811,7 +811,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 0f75a5d..33a6fdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -203,9 +203,9 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @return {@code True} if topology state changed. */ - public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, Long> cntrMap); @@ -213,9 +213,9 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param parts Partitions. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @return {@code True} if topology state changed. */ - @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, @Nullable Map<Integer, Long> cntrMap); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ab573bd..24ff3ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -340,7 +340,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); - initPartitions0(exchFut, updateSeq); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + initPartitions0(oldest, exchFut, updateSeq); consistencyCheck(); } @@ -350,14 +352,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * @param oldest Oldest server node. * @param exchFut Exchange future. * @param updateSeq Update sequence. */ - private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { + private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null || cctx.kernalContext().clientNode(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -397,12 +398,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Owned partition for oldest node: " + locPart); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } } else - createPartitions(aff, updateSeq); + createPartitions(oldest, aff, updateSeq); } else { // If preloader is disabled, then we simply clear out @@ -419,7 +420,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); if (log.isDebugEnabled()) log.debug("Evicting partition with rebalancing disabled " + @@ -433,7 +434,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (node2part != null && node2part.valid()) - checkEvictions(updateSeq, aff); + checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -442,7 +443,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param aff Affinity assignments. * @param updateSeq Update sequence. */ - private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { + private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) { ClusterNode loc = cctx.localNode(); int num = cctx.affinity().partitions(); @@ -454,7 +455,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // will be created in MOVING state. GridDhtLocalPartition locPart = createPartition(p); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } // If this node's map is empty, we pre-create local partitions, @@ -486,7 +487,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); assert oldest != null || cctx.kernalContext().clientNode(); @@ -523,11 +524,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (affReady) - initPartitions0(exchFut, updateSeq); + initPartitions0(oldest, exchFut, updateSeq); else { List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); - createPartitions(aff, updateSeq); + createPartitions(oldest, aff, updateSeq); } consistencyCheck(); @@ -574,6 +575,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); @@ -600,7 +603,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + locPart + ']'; - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); changed = true; @@ -620,7 +623,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart + ", owners = " + owners + ']'); } else - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } else { @@ -630,7 +633,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state == MOVING) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); changed = true; @@ -985,7 +988,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -997,7 +1000,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { if (stopping) - return null; + return false; if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { @@ -1025,7 +1028,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (node2part != null && node2part.compareTo(partMap) >= 0) { @@ -1033,7 +1036,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); @@ -1100,7 +1103,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - changed = checkEvictions(updateSeq, aff); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + changed = checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -1110,7 +1115,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); @@ -1119,7 +1124,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -1130,33 +1135,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); - return null; + return false; } lock.writeLock().lock(); try { if (stopping) - return null; + return false; if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); + Integer p = e.getKey(); - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(e.getKey(), e.getValue()); - } + Long cntr = this.cntrMap.get(p); - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(p, e.getValue()); - Long cntr = cntrMap.get(part.id()); + GridDhtLocalPartition part = locParts.get(p); - if (cntr != null) - part.updateCounter(cntr); + if (part != null) + part.updateCounter(e.getValue()); } } @@ -1165,7 +1165,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return null; + return false; } if (exchId != null) @@ -1182,7 +1182,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - return null; + return false; } long updateSeq = this.updateSeq.incrementAndGet(); @@ -1225,7 +1225,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - changed |= checkEvictions(updateSeq, aff); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + changed |= checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -1235,7 +1237,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); @@ -1243,11 +1245,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * @param oldest Oldest server node. * @param updateSeq Update sequence. * @param aff Affinity assignments. * @return Checks if any of the local partitions need to be evicted. */ - private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { + private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) { boolean changed = false; UUID locId = cctx.nodeId(); @@ -1270,7 +1273,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (nodeIds.containsAll(F.nodeIds(affNodes))) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateLocal(oldest, part.id(), locId, part.state(), updateSeq); changed = true; @@ -1295,7 +1298,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locId.equals(n.id())) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateLocal(oldest, part.id(), locId, part.state(), updateSeq); changed = true; @@ -1318,18 +1321,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * Updates value for single partition. * + * @param oldest Oldest server node. * @param p Partition. * @param nodeId Node ID. * @param state State. * @param updateSeq Update sequence. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { + private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { assert nodeId.equals(cctx.nodeId()); - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null || cctx.kernalContext().clientNode(); // If this node became the oldest node. @@ -1424,7 +1425,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { if (part.own()) { - updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); consistencyCheck(); @@ -1452,7 +1455,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + + updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq); consistencyCheck(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f391265..704c654 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ @GridToStringExclude + private int pendingSingleUpdates; + + /** */ + @GridToStringExclude private List<ClusterNode> srvNodes; /** */ @@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { boolean allReceived = false; + boolean updateSingleMap = false; synchronized (mux) { assert crd != null; if (crd.isLocal()) { if (remaining.remove(node.id())) { - updatePartitionSingleMap(msg); + updateSingleMap = true; + + pendingSingleUpdates++; allReceived = remaining.isEmpty(); } @@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT singleMsgs.put(node, msg); } - if (allReceived) + if (updateSingleMap) { + try { + updatePartitionSingleMap(msg); + } + finally { + synchronized (mux) { + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + mux.notifyAll(); + } + } + } + + if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(false); + } + } + + /** + * + */ + private void awaitSingleMapUpdates() { + synchronized (mux) { + try { + while (pendingSingleUpdates > 0) + U.wait(mux); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + } + } } /** @@ -1374,7 +1415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); else { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); @@ -1557,6 +1598,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd0.isLocal()) { if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(true); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6c26363..b9b92b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { try { if (!cache.context().affinityNode()) { ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldestSrvNode == null) return new GridEmptyIterator<>(); @@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { depExe.submit(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); + ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8814745..0e90418 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -136,6 +137,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_ import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -174,8 +176,7 @@ class ServerImpl extends TcpDiscoveryImpl { IgniteProductVersion.fromString("1.5.0"); /** */ - private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); + private IgniteThreadPoolExecutor utilityPool; /** Nodes ring. */ @GridToStringExclude @@ -297,6 +298,13 @@ class ServerImpl extends TcpDiscoveryImpl { spiState = DISCONNECTED; } + utilityPool = new IgniteThreadPoolExecutor("disco-pool", + spi.ignite().name(), + 0, + 1, + 2000, + new LinkedBlockingQueue<Runnable>()); + if (debugMode) { if (!log.isInfoEnabled()) throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + @@ -2403,9 +2411,12 @@ class ServerImpl extends TcpDiscoveryImpl { /** Connection check threshold. */ private long connCheckThreshold; + /** */ + private long lastRingMsgTime; + /** */ - protected RingMessageWorker() { + RingMessageWorker() { super("tcp-disco-msg-worker", 10); initConnectionCheckFrequency(); @@ -2500,6 +2511,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + sendHeartbeatMessage(); + DebugLogger log = messageLogger(msg); if (log.isDebugEnabled()) @@ -2508,6 +2521,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + boolean ensured = spi.ensured(msg); + + if (!locNode.id().equals(msg.senderNodeId()) && ensured) + lastRingMsgTime = U.currentTimeMillis(); + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -2564,7 +2582,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (spi.ensured(msg) && redirectToClients(msg)) + if (ensured && redirectToClients(msg)) msgHist.add(msg); if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { @@ -5336,12 +5354,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Sends heartbeat message if needed. */ private void sendHeartbeatMessage() { - if (!isLocalNodeCoordinator()) - return; - long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); - if (elapsed > 0) + if (elapsed > 0 || !isLocalNodeCoordinator()) return; TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); @@ -5361,7 +5376,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) lastTimeStatusMsgSent = locNode.lastUpdateTime(); - long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime); + + long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis(); if (elapsed > 0) return; @@ -6062,11 +6079,11 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); - long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); @@ -6103,7 +6120,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - spi.writeToSocket(msg, sock, res, socketTimeout); + spi.writeToSocket(msg, sock, res, sockTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java index 878d7d1..43017db 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java @@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac } /** + * @param backups Number of backups. * @throws Exception If failed. */ protected void checkNodeRemoved(int backups) throws Exception { @@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac } } - /** * @param assignment Assignment to verify. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 390c83e..31b4bc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe GridCacheSharedContext<?, ?> ctx = k.context().cache().context(); ClusterNode oldest = - GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer)); + ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer)); assertNotNull(oldest); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java index a59ca8b..2d46cf4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null); + Collection<ClusterNode> affNodes = aff.assignPartition(null, + part, + new ArrayList<>(nodes), + new HashMap<ClusterNode, byte[]>(), + 0, + null); assertEquals(1, affNodes.size());