Repository: ignite Updated Branches: refs/heads/ignite-4154-opt2 8a6117c60 -> 74d0ecf53
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74d0ecf5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74d0ecf5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74d0ecf5 Branch: refs/heads/ignite-4154-opt2 Commit: 74d0ecf536f2b154059c65fb89db31cfcd3e54ab Parents: 8a6117c Author: sboikov <[email protected]> Authored: Mon Nov 21 13:14:47 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 21 13:14:47 2016 +0300 ---------------------------------------------------------------------- .../affinity/AffinityCalculateCache.java | 20 +++++++ .../affinity/GridAffinityAssignmentCache.java | 2 + .../cache/CacheAffinitySharedManager.java | 60 ++++++++++++-------- .../GridCachePartitionExchangeManager.java | 10 +++- .../GridDhtPartitionsExchangeFuture.java | 24 +++++++- 5 files changed, 90 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java index 49a7e49..45758b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java @@ -42,11 +42,29 @@ public class AffinityCalculateCache { /** */ private Map<Integer, List<List<ClusterNode>>> grpAssign; + /** */ + private int calcCnt; + + /** */ + private long lateAffTime; + public AffinityCalculateCache(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { this.topVer = topVer; this.discoEvt = discoEvt; } + public int calculateCount() { + return calcCnt; + } + + public void addLateAffinityCalculateTime(long time) { + lateAffTime += time; + } + + public long lateAffinityCalculateTime() { + return lateAffTime; + } + public List<List<ClusterNode>> assignPartitions(AffinityFunction aff, int backups, List<ClusterNode> nodes, @@ -63,6 +81,8 @@ public class AffinityCalculateCache { } } + calcCnt++; + AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(nodes, prevAssignment, discoEvt, http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index babe02e..c77124b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -152,6 +152,8 @@ public class GridAffinityAssignmentCache { assert similarAffKey != null; affGrp = locCache ? null : ctx.cache().context().affinity().equalAffinityGroup(cacheId, affCfg); + + log.info("Initialized cache affinity group [cache=" + cacheName + ", grp=" + affGrp + ']'); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 654e456..9727905 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1191,6 +1191,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } centralizedAff = true; + + log.info("Initialized affinity on node left [topVer=" + fut.topologyVersion() + + ", calcCnt=" + affCache.calculateCount() + ']'); } else { initCachesAffinity(fut); @@ -1379,32 +1382,38 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); - if (!crd) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) - continue; + try { + if (!crd) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; - boolean latePrimary = cacheCtx.rebalanceEnabled(); + boolean latePrimary = cacheCtx.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); - } + initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); + } - return null; - } - else { - final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); + return null; + } + else { + final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); - forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { - @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, cacheDesc); + forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { + @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { + CacheHolder cache = cache(fut, cacheDesc); - boolean latePrimary = cache.rebalanceEnabled; + boolean latePrimary = cache.rebalanceEnabled; - initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache); - } - }); + initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache); + } + }); - return waitRebalanceInfo; + return waitRebalanceInfo; + } + } + finally { + log.info("Initialized affinity on node join [topVer=" + topVer + + ", calcCnt=" + affCache.calculateCount() + ", lateAffCalcTime=" + affCache.lateAffinityCalculateTime() + ']'); } } @@ -1432,14 +1441,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; - List<List<ClusterNode>> curAff = aff.assignments(affTopVer); - - assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), affCache); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { + long start = U.currentTimeMillis(); + + List<List<ClusterNode>> curAff = aff.assignments(affTopVer); + + assert aff.idealAssignment() != null : "Previous assignment is not available."; + for (int p = 0; p < idealAssignment.size(); p++) { List<ClusterNode> newNodes = idealAssignment.get(p); List<ClusterNode> curNodes = curAff.get(p); @@ -1462,6 +1473,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap newAssignment.set(p, nodes0); } } + + if (affCache != null) + affCache.addLateAffinityCalculateTime(U.currentTimeMillis() - start); } if (newAssignment == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a006df5..16ce38a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1123,7 +1123,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']'); - log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']'); + if (exchFut.singleMsgUpdateCnt > 0) { + log.info("Exchange done [topVer=" + topVer + + ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime + + ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt + + ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime + + ", err=" + err + ']'); + } + else + log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']'); IgniteProductVersion minVer = cctx.localNode().version(); IgniteProductVersion maxVer = cctx.localNode().version(); http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 92a3874..a4f615f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -111,6 +111,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private final Set<UUID> remaining = new HashSet<>(); /** */ + public int singleMsgUpdateCnt; + + /** */ + public long singleMsgUpdateTime; + + /** */ + public long singleMsgUpdateMaxTime; + + /** */ @GridToStringExclude private List<ClusterNode> srvNodes; @@ -803,11 +812,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT skipSnd = true; if (!skipSnd) { - long sendStart = System.currentTimeMillis(); + long sndStart = System.currentTimeMillis(); sendPartitions(crd); - log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - sendStart) + ']'); + log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - sndStart) + ']'); } else log.info("Skip first exchange message [topVer=" + topologyVersion() + ']'); @@ -1240,11 +1249,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert crd != null; if (crd.isLocal()) { + long start = U.currentTimeMillis(); + if (remaining.remove(node.id())) { updatePartitionSingleMap(msg); allReceived = remaining.isEmpty(); } + + singleMsgUpdateCnt++; + + long time = U.currentTimeMillis() - start; + + if (time > singleMsgUpdateMaxTime) + singleMsgUpdateMaxTime = time; + + singleMsgUpdateTime += time; } else singleMsgs.put(node, msg);
