Repository: ignite Updated Branches: refs/heads/ignite-4154-opt2 c05e49bb7 -> 283b23237
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/283b2323 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/283b2323 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/283b2323 Branch: refs/heads/ignite-4154-opt2 Commit: 283b2323767ec4ab3f2c6a84c6dac405b01922bb Parents: c05e49b Author: sboikov <[email protected]> Authored: Mon Nov 14 13:44:32 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 14 13:44:32 2016 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 8 +++ .../dht/GridDhtPartitionTopologyImpl.java | 59 +++++++++++--------- .../GridDhtPartitionsExchangeFuture.java | 18 +++++- 3 files changed, 57 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 27c2bac..b50479d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -933,6 +933,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. */ private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + long start = System.currentTimeMillis(); + AffinityTopologyVersion topVer = fut.topologyVersion(); List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>(); @@ -967,6 +969,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut); } + + log.info("Affinity fetch time [topVer=" + topVer + ", time=" + (System.currentTimeMillis() - start) + ']'); } /** @@ -1055,12 +1059,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { assert !lateAffAssign; + long start = System.currentTimeMillis(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; initAffinity(cacheCtx.affinity().affinityCache(), fut, false); } + + log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis() - start) + ']'); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f3751ac..f50116d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -350,7 +350,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); - initPartitions0(exchFut, updateSeq); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + initPartitions0(oldest, exchFut, updateSeq); consistencyCheck(); } @@ -363,11 +365,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param exchFut Exchange future. * @param updateSeq Update sequence. */ - private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { + private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null || cctx.kernalContext().clientNode(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -407,12 +407,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Owned partition for oldest node: " + locPart); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } } else - createPartitions(aff, updateSeq); + createPartitions(oldest, aff, updateSeq); } else { // If preloader is disabled, then we simply clear out @@ -429,7 +429,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); if (log.isDebugEnabled()) log.debug("Evicting partition with rebalancing disabled " + @@ -443,7 +443,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (node2part != null && node2part.valid()) - checkEvictions(updateSeq, aff); + checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -452,7 +452,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param aff Affinity assignments. * @param updateSeq Update sequence. */ - private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { + private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) { ClusterNode loc = cctx.localNode(); int num = cctx.affinity().partitions(); @@ -464,7 +464,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // will be created in MOVING state. GridDhtLocalPartition locPart = createPartition(p); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } // If this node's map is empty, we pre-create local partitions, @@ -533,11 +533,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (affReady) - initPartitions0(exchFut, updateSeq); + initPartitions0(oldest, exchFut, updateSeq); else { List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); - createPartitions(aff, updateSeq); + createPartitions(oldest, aff, updateSeq); } consistencyCheck(); @@ -584,6 +584,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); @@ -610,7 +612,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + locPart + ']'; - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); changed = true; @@ -630,7 +632,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart + ", owners = " + owners + ']'); } else - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); } } else { @@ -640,7 +642,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state == MOVING) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq); changed = true; @@ -1129,7 +1131,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - changed = checkEvictions(updateSeq, aff); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + changed = checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -1254,7 +1258,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - changed |= checkEvictions(updateSeq, aff); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + changed |= checkEvictions(oldest, updateSeq, aff); updateRebalanceVersion(aff); } @@ -1276,7 +1282,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param aff Affinity assignments. * @return Checks if any of the local partitions need to be evicted. */ - private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { + private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) { boolean changed = false; UUID locId = cctx.nodeId(); @@ -1299,7 +1305,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (nodeIds.containsAll(F.nodeIds(affNodes))) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateLocal(oldest, part.id(), locId, part.state(), updateSeq); changed = true; @@ -1324,7 +1330,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locId.equals(n.id())) { part.rent(false); - updateLocal(part.id(), locId, part.state(), updateSeq); + updateLocal(oldest, part.id(), locId, part.state(), updateSeq); changed = true; @@ -1353,12 +1359,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { + private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { assert nodeId.equals(cctx.nodeId()); - // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null || cctx.kernalContext().clientNode(); // If this node became the oldest node. @@ -1453,7 +1456,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { if (part.own()) { - updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); consistencyCheck(); @@ -1481,7 +1486,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq); consistencyCheck(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/283b2323/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9af7a7b..468e16e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -432,6 +432,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !dummy && !forcePreload : this; try { + long initStart = System.currentTimeMillis(); + log.info("Start exchange init [topVer=" + topologyVersion() + ']'); srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion())); @@ -465,14 +467,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } + long affStart = System.currentTimeMillis(); + if (CU.clientNode(discoEvt.eventNode())) exchange = onClientNodeEvent(crdNode); else exchange = onServerNodeEvent(crdNode); + + log.info("Affinity call time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - affStart) + ']'); } + long topUpdateStart = System.currentTimeMillis(); + updateTopologies(crdNode); + log.info("Top update time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - topUpdateStart) + ']'); + switch (exchange) { case ALL: { distributedExchange(); @@ -501,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } - log.info("Finish exchange init [topVer=" + topologyVersion() + ']'); + log.info("Finish exchange init [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - initStart) + ']'); } catch (IgniteInterruptedCheckedException e) { onDone(e); @@ -725,6 +735,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; + long beforeExchStart = System.currentTimeMillis(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId())) continue; @@ -739,12 +751,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cacheCtx.topology().beforeExchange(this, !centralizedAff); } + log.info("Before exchange time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - beforeExchStart) + ']'); + if (crd.isLocal()) { ClusterNode node = discoEvt.eventNode(); Object attr = node.attribute("SKIP_FIRST_EXCHANGE_MSG"); - boolean skipFirstExchange = Boolean.TRUE.equals(attr) || "true".equals(attr); + boolean skipFirstExchange = Boolean.TRUE.equals(attr) || ((attr instanceof String) && "true".equalsIgnoreCase((String)attr)); if (discoEvt.type() == EVT_NODE_JOINED && !node.isLocal() && skipFirstExchange) { assert !CU.clientNode(node) : discoEvt;
