Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-opt2 8a6117c60 -> 74d0ecf53


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74d0ecf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74d0ecf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74d0ecf5

Branch: refs/heads/ignite-4154-opt2
Commit: 74d0ecf536f2b154059c65fb89db31cfcd3e54ab
Parents: 8a6117c
Author: sboikov <[email protected]>
Authored: Mon Nov 21 13:14:47 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 21 13:14:47 2016 +0300

----------------------------------------------------------------------
 .../affinity/AffinityCalculateCache.java        | 20 +++++++
 .../affinity/GridAffinityAssignmentCache.java   |  2 +
 .../cache/CacheAffinitySharedManager.java       | 60 ++++++++++++--------
 .../GridCachePartitionExchangeManager.java      | 10 +++-
 .../GridDhtPartitionsExchangeFuture.java        | 24 +++++++-
 5 files changed, 90 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
index 49a7e49..45758b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
@@ -42,11 +42,29 @@ public class AffinityCalculateCache {
     /** */
     private Map<Integer, List<List<ClusterNode>>> grpAssign;
 
+    /** */
+    private int calcCnt;
+
+    /** */
+    private long lateAffTime;
+
     public AffinityCalculateCache(AffinityTopologyVersion topVer, 
DiscoveryEvent discoEvt) {
         this.topVer = topVer;
         this.discoEvt = discoEvt;
     }
 
+    public int calculateCount() {
+        return calcCnt;
+    }
+
+    public void addLateAffinityCalculateTime(long time) {
+        lateAffTime += time;
+    }
+
+    public long lateAffinityCalculateTime() {
+        return lateAffTime;
+    }
+
     public List<List<ClusterNode>> assignPartitions(AffinityFunction aff,
         int backups,
         List<ClusterNode> nodes,
@@ -63,6 +81,8 @@ public class AffinityCalculateCache {
             }
         }
 
+        calcCnt++;
+
         AffinityFunctionContext ctx = new 
GridAffinityFunctionContextImpl(nodes,
             prevAssignment,
             discoEvt,

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index babe02e..c77124b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -152,6 +152,8 @@ public class GridAffinityAssignmentCache {
         assert similarAffKey != null;
 
         affGrp = locCache ? null : 
ctx.cache().context().affinity().equalAffinityGroup(cacheId, affCfg);
+
+        log.info("Initialized cache affinity group [cache=" + cacheName + ", 
grp=" + affGrp + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 654e456..9727905 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1191,6 +1191,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
 
             centralizedAff = true;
+
+            log.info("Initialized affinity on node left [topVer=" + 
fut.topologyVersion() +
+                ", calcCnt=" + affCache.calculateCount() + ']');
         }
         else {
             initCachesAffinity(fut);
@@ -1379,32 +1382,38 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final AffinityCalculateCache affCache = new 
AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
 
-        if (!crd) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
-                    continue;
+        try {
+            if (!crd) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (cacheCtx.isLocal())
+                        continue;
 
-                boolean latePrimary = cacheCtx.rebalanceEnabled();
+                    boolean latePrimary = cacheCtx.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, 
cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
-            }
+                    initAffinityOnNodeJoin(fut, 
cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
+                }
 
-            return null;
-        }
-        else {
-            final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(topVer);
+                return null;
+            }
+            else {
+                final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(topVer);
 
-            forAllRegisteredCaches(new 
IgniteInClosureX<DynamicCacheDescriptor>() {
-                @Override public void applyx(DynamicCacheDescriptor cacheDesc) 
throws IgniteCheckedException {
-                    CacheHolder cache = cache(fut, cacheDesc);
+                forAllRegisteredCaches(new 
IgniteInClosureX<DynamicCacheDescriptor>() {
+                    @Override public void applyx(DynamicCacheDescriptor 
cacheDesc) throws IgniteCheckedException {
+                        CacheHolder cache = cache(fut, cacheDesc);
 
-                    boolean latePrimary = cache.rebalanceEnabled;
+                        boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), 
waitRebalanceInfo, latePrimary, affCache);
-                }
-            });
+                        initAffinityOnNodeJoin(fut, cache.affinity(), 
waitRebalanceInfo, latePrimary, affCache);
+                    }
+                });
 
-            return waitRebalanceInfo;
+                return waitRebalanceInfo;
+            }
+        }
+        finally {
+            log.info("Initialized affinity on node join [topVer=" + topVer +
+                ", calcCnt=" + affCache.calculateCount() + ", 
lateAffCalcTime=" + affCache.lateAffinityCalculateTime() + ']');
         }
     }
 
@@ -1432,14 +1441,16 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized 
[cache=" + aff.cacheName() +
             ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
 
-        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
-
-        assert aff.idealAssignment() != null : "Previous assignment is not 
available.";
-
         List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, 
fut.discoveryEvent(), affCache);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
+            long start = U.currentTimeMillis();
+
+            List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
+
+            assert aff.idealAssignment() != null : "Previous assignment is not 
available.";
+
             for (int p = 0; p < idealAssignment.size(); p++) {
                 List<ClusterNode> newNodes = idealAssignment.get(p);
                 List<ClusterNode> curNodes = curAff.get(p);
@@ -1462,6 +1473,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     newAssignment.set(p, nodes0);
                 }
             }
+
+            if (affCache != null)
+                affCache.addLateAffinityCalculateTime(U.currentTimeMillis() - 
start);
         }
 
         if (newAssignment == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a006df5..16ce38a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1123,7 +1123,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + 
", err=" + err + ']');
 
-        log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']');
+        if (exchFut.singleMsgUpdateCnt > 0) {
+            log.info("Exchange done [topVer=" + topVer +
+                ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime +
+                ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt +
+                ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime +
+                ", err=" + err + ']');
+        }
+        else
+            log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']');
 
         IgniteProductVersion minVer = cctx.localNode().version();
         IgniteProductVersion maxVer = cctx.localNode().version();

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d0ecf5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 92a3874..a4f615f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -111,6 +111,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private final Set<UUID> remaining = new HashSet<>();
 
     /** */
+    public int singleMsgUpdateCnt;
+
+    /** */
+    public long singleMsgUpdateTime;
+
+    /** */
+    public long singleMsgUpdateMaxTime;
+
+    /** */
     @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
@@ -803,11 +812,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                 skipSnd = true;
 
             if (!skipSnd) {
-                long sendStart = System.currentTimeMillis();
+                long sndStart = System.currentTimeMillis();
 
                 sendPartitions(crd);
 
-                log.info("Send parts time [topVer=" + topologyVersion() + ", 
time=" + (System.currentTimeMillis() - sendStart) + ']');
+                log.info("Send parts time [topVer=" + topologyVersion() + ", 
time=" + (System.currentTimeMillis() - sndStart) + ']');
             }
             else
                 log.info("Skip first exchange message [topVer=" + 
topologyVersion() + ']');
@@ -1240,11 +1249,22 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             assert crd != null;
 
             if (crd.isLocal()) {
+                long start = U.currentTimeMillis();
+
                 if (remaining.remove(node.id())) {
                     updatePartitionSingleMap(msg);
 
                     allReceived = remaining.isEmpty();
                 }
+
+                singleMsgUpdateCnt++;
+
+                long time = U.currentTimeMillis() - start;
+
+                if (time > singleMsgUpdateMaxTime)
+                    singleMsgUpdateMaxTime = time;
+
+                singleMsgUpdateTime += time;
             }
             else
                 singleMsgs.put(node, msg);

Reply via email to