Repository: ignite
Updated Branches:
  refs/heads/master 9cec13857 -> edcc1089a


IGNITE-10493 Refactor exchange timings measurement - Fixes #5688.

Signed-off-by: Pavel Kovalenko <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/edcc1089
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/edcc1089
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/edcc1089

Branch: refs/heads/master
Commit: edcc1089aaa8efa7ede38af2a5dfdb0ef00b7bc5
Parents: 9cec138
Author: Pavel Kovalenko <[email protected]>
Authored: Thu Dec 20 15:51:03 2018 +0300
Committer: Pavel Kovalenko <[email protected]>
Committed: Thu Dec 20 15:51:03 2018 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  93 +++---
 .../GridCachePartitionExchangeManager.java      |   2 +
 .../GridDhtPartitionsExchangeFuture.java        | 211 +++++++++----
 .../GridDhtPartitionsSingleMessage.java         |  55 +++-
 .../GridCacheDatabaseSharedManager.java         |  10 +-
 .../ignite/internal/util/IgniteStopwatch.java   | 230 ++++++++++++++
 .../ignite/internal/util/IgniteTicker.java      |  52 ++++
 .../apache/ignite/internal/util/TimeBag.java    | 312 +++++++++++++++++++
 8 files changed, 852 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 07fbef1..61d88c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -800,15 +800,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
-        long time = System.currentTimeMillis();
-
         IgniteInternalFuture<?> res = cachesRegistry.update(exchActions);
 
         // Affinity did not change for existing caches.
         onCustomMessageNoAffinityChange(fut, crd, exchActions);
 
-        if (log.isInfoEnabled())
-            log.info("Updating caches registry performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+        fut.timeBag().finishGlobalStage("Update caches registry");
 
         processCacheStartRequests(fut, crd, exchActions);
 
@@ -871,8 +868,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
-        long time = U.currentTimeMillis();
-
         Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new 
LinkedHashMap<>();
 
         for (ExchangeActions.CacheActionData action : 
exchActions.cacheStartRequests()) {
@@ -954,15 +949,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
         }
 
-        if (log.isInfoEnabled())
-            log.info("Caches starting performed in " + (U.currentTimeMillis() 
- time) + " ms.");
-
-        time = U.currentTimeMillis();
+        fut.timeBag().finishGlobalStage("Start caches");
 
         initAffinityOnCacheGroupsStart(fut, exchActions, crd);
 
-        if (log.isInfoEnabled())
-            log.info("Affinity initialization for started caches performed in 
" + (U.currentTimeMillis() - time) + " ms.");
+        fut.timeBag().finishGlobalStage("Affinity initialization on cache 
group start");
     }
 
     /**
@@ -998,6 +989,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     }
                 }
 
+                fut.timeBag().finishLocalStage("Affinity initialization on 
cache group start " +
+                    "[grp=" + grpDesc.cacheOrGroupName() + "]");
+
                 return null;
             }
         );
@@ -1106,6 +1100,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     newAssignment = idealAssignment;
 
                 aff.initialize(topVer, cachedAssignment(aff, newAssignment, 
affCache));
+
+                exchFut.timeBag().finishLocalStage("Affinity recalculate by 
change affinity message " +
+                    "[grp=" + aff.cacheOrGroupName() + "]");
             }
         });
     }
@@ -1190,6 +1187,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     aff.clientEventTopologyChange(exchFut.firstEvent(), 
topVer);
 
                 cctx.exchange().exchangerUpdateHeartbeat();
+
+                exchFut.timeBag().finishLocalStage("Affinity change by custom 
message " +
+                    "[grp=" + aff.cacheOrGroupName() + "]");
             }
         });
     }
@@ -1369,6 +1369,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         calculateAndInit(fut.events(), cache.affinity(), 
fut.initialVersion());
 
                         cctx.exchange().exchangerUpdateHeartbeat();
+
+                        fut.timeBag().finishLocalStage("Affinity 
initialization (crd, new cache) " +
+                            "[grp=" + desc.cacheOrGroupName() + "]");
                     }
                 }
             });
@@ -1380,6 +1383,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         initAffinity(cachesRegistry.group(aff.groupId()), aff, 
fut);
 
                         cctx.exchange().exchangerUpdateHeartbeat();
+
+                        fut.timeBag().finishLocalStage("Affinity 
initialization (new cache) " +
+                            "[grp=" + aff.cacheOrGroupName() + "]");
                     }
                 }
             });
@@ -1470,8 +1476,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final Map<Object, List<List<ClusterNode>>> affCache = new 
ConcurrentHashMap<>();
 
-        long time = System.currentTimeMillis();
-
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1503,11 +1507,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     newAssignment = idealAssignment;
 
                 aff.initialize(evts.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
+
+                fut.timeBag().finishLocalStage("Affinity applying from full 
message " +
+                    "[grp=" + aff.cacheOrGroupName() + "]");
             }
         });
-
-        if (log.isInfoEnabled())
-            log.info("Affinity applying from full message performed in " + 
(System.currentTimeMillis() - time) + " ms.");
     }
 
     /**
@@ -1532,8 +1536,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final Map<Long, ClusterNode> nodesByOrder = new ConcurrentHashMap<>();
 
-        long time = System.currentTimeMillis();
-
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1571,11 +1573,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     calculateAndInit(evts, aff, evts.topologyVersion());
 
                 grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
+
+                fut.timeBag().finishLocalStage("Affinity initialization (local 
join) " +
+                    "[grp=" + grp.cacheOrGroupName() + "]");
             }
         });
-
-        if (log.isInfoEnabled())
-            log.info("Affinity initialization on local join performed in " + 
(System.currentTimeMillis() - time) + " ms.");
     }
 
     /**
@@ -1590,8 +1592,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert fut.context().mergeExchanges();
         assert evts.hasServerJoin() && !evts.hasServerLeft();
 
-        long time = System.currentTimeMillis();
-
         WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
 
         this.waitInfo = waitRebalanceInfo != null && 
!waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
@@ -1604,10 +1604,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     ", waitGrps=" + (info != null ? 
groupNames(info.waitGrps.keySet()) : null) + ']');
             }
         }
-
-        if (log.isInfoEnabled())
-            log.info("Affinity recalculation (on server join) performed in "
-                + (System.currentTimeMillis() - time) + " ms.");
     }
 
     /**
@@ -1618,8 +1614,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     public Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
         final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
     {
-        long time = System.currentTimeMillis();
-
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
         assert fut.context().mergeExchanges();
@@ -1627,10 +1621,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         Map<Integer, CacheGroupAffinityMessage> result = 
onReassignmentEnforced(fut);
 
-        if (log.isInfoEnabled())
-            log.info("Affinity recalculation (on server left) performed in "
-                + (System.currentTimeMillis() - time) + " ms.");
-
         return result;
     }
 
@@ -1646,14 +1636,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     {
         assert 
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
 
-        long time = System.currentTimeMillis();
-
         Map<Integer, CacheGroupAffinityMessage> result = 
onReassignmentEnforced(fut);
 
-        if (log.isInfoEnabled())
-            log.info("Affinity recalculation (custom message) performed in "
-                + (System.currentTimeMillis() - time) + " ms.");
-
         return result;
     }
 
@@ -1679,6 +1663,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 if (!cache.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
                     cache.affinity().initialize(topVer, assign);
+
+                fut.timeBag().finishLocalStage("Affinity initialization 
(enforced) " +
+                    "[grp=" + desc.cacheOrGroupName() + "]");
             }
         });
 
@@ -1716,11 +1703,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         calculateAndInit(fut.events(), grpHolder.affinity(), 
topVer);
 
                         cctx.exchange().exchangerUpdateHeartbeat();
+
+                        fut.timeBag().finishLocalStage("First node affinity 
initialization (node join) " +
+                            "[grp=" + desc.cacheOrGroupName() + "]");
                     }
                 });
             }
-            else
+            else {
                 fetchAffinityOnJoin(fut);
+
+                fut.timeBag().finishLocalStage("Affinity fetch");
+            }
         }
         else
             waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
@@ -1931,6 +1924,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     cache.aff.calculate(fut.initialVersion(), fut.events(), 
fut.events().discoveryCache());
 
                     cctx.exchange().exchangerUpdateHeartbeat();
+
+                    fut.timeBag().finishLocalStage("Affinity centralized 
initialization (crd) " +
+                        "[grp=" + desc.cacheOrGroupName() + "]");
                 }
             });
         }
@@ -1940,6 +1936,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     aff.calculate(fut.initialVersion(), fut.events(), 
fut.events().discoveryCache());
 
                     cctx.exchange().exchangerUpdateHeartbeat();
+
+                    fut.timeBag().finishLocalStage("Affinity centralized 
initialization " +
+                        "[grp=" + aff.cacheOrGroupName() + "]");
                 }
             });
         }
@@ -2061,6 +2060,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 assert old == null : old;
 
                 cctx.exchange().exchangerUpdateHeartbeat();
+
+                fut.timeBag().finishLocalStage("Coordinator affinity cache 
init " +
+                    "[grp=" + desc.cacheOrGroupName() + "]");
             }
         });
 
@@ -2139,6 +2141,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         affCache);
 
                     cctx.exchange().exchangerUpdateHeartbeat();
+
+                    fut.timeBag().finishLocalStage("Affinity initialization 
(node join) " +
+                        "[grp=" + grp.cacheOrGroupName() + "]");
                 }
             });
 
@@ -2177,6 +2182,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     }
 
                     cctx.exchange().exchangerUpdateHeartbeat();
+
+                    fut.timeBag().finishLocalStage("Affinity initialization 
(crd, node join) " +
+                        "[grp=" + desc.cacheOrGroupName() + "]");
                 }
             });
 
@@ -2522,6 +2530,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 if (initAff)
                     grpHolder.affinity().initialize(topVer, newAssignment0);
+
+                fut.timeBag().finishLocalStage("Affinity recalculation 
(partitions availability) " +
+                    "[grp=" + desc.cacheOrGroupName() + "]");
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 364950c..01c10aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2869,6 +2869,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 }
                             }
 
+                            exchFut.timeBag().finishGlobalStage("Waiting in 
exchange queue");
+
                             exchFut.init(newCrd);
 
                             int dumpCnt = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5b9ebf1..dbc51f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -335,6 +336,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** Future for wait all exchange listeners comepleted. */
     private final GridFutureAdapter<?> afterLsnrCompleteFut = new 
GridFutureAdapter<>();
 
+    /** Time bag to measure and store exchange stages times. */
+    private final TimeBag timeBag;
+
+    /** Start time of exchange. */
+    private long startTime = System.nanoTime();
+
+    /** Discovery lag / Clocks discrepancy, calculated on coordinator when all 
single messages are received. */
+    private T2<Long, UUID> discoveryLag;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -366,6 +376,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         log = cctx.logger(getClass());
         exchLog = cctx.logger(EXCHANGE_LOG);
 
+        timeBag = new TimeBag();
+
         initFut = new GridFutureAdapter<Boolean>() {
             @Override public IgniteLogger logger() {
                 return log;
@@ -660,6 +672,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return Object to collect exchange timings.
+     */
+    public TimeBag timeBag() {
+        return timeBag;
+    }
+
+    /**
      * Starts activity.
      *
      * @param newCrd {@code True} if node become coordinator on this exchange.
@@ -716,6 +735,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     ", allowMerge=" + exchCtx.mergeExchanges() + ']');
             }
 
+            timeBag.finishGlobalStage("Exchange parameters initialization");
+
             ExchangeType exchange;
 
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
@@ -800,6 +821,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             updateTopologies(crdNode);
 
+            timeBag.finishGlobalStage("Determine exchange type");
+
             switch (exchange) {
                 case ALL: {
                     distributedExchange();
@@ -883,6 +906,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             finally {
                 cctx.exchange().exchangerBlockingSectionEnd();
             }
+
+            timeBag.finishGlobalStage("Baseline change callback");
         }
 
         cctx.exchange().exchangerBlockingSectionBegin();
@@ -894,6 +919,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             cctx.exchange().exchangerBlockingSectionEnd();
         }
 
+        timeBag.finishGlobalStage("Components activation");
+
         IgniteInternalFuture<?> cachesRegistrationFut = 
cctx.cache().startCachesOnLocalJoin(initialVersion(),
             exchActions == null ? null : exchActions.localJoinContext());
 
@@ -1340,6 +1367,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
+        timeBag.finishGlobalStage("Preloading notification");
+
         cctx.exchange().exchangerBlockingSectionBegin();
 
         try {
@@ -1352,6 +1381,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             cctx.exchange().exchangerBlockingSectionEnd();
         }
 
+        timeBag.finishGlobalStage("WAL history reservation");
+
         // Skipping wait on local join is available when all cluster nodes 
have the same protocol.
         boolean skipWaitOnLocalJoin = 
cctx.exchange().latch().canSkipJoiningNodes(initialVersion())
             && localJoinExchange();
@@ -1442,6 +1473,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
+        timeBag.finishGlobalStage("After states restored callback");
+
         changeWalModeIfNeeded();
 
         if (events().hasServerLeft())
@@ -1451,8 +1484,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         try {
             if (crd.isLocal()) {
-                if (remaining.isEmpty())
+                if (remaining.isEmpty()) {
+                    initFut.onDone(true);
+
                     onAllReceived(null);
+                }
             }
             else
                 sendPartitions(crd);
@@ -1674,6 +1710,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
+        timeBag.finishGlobalStage("Wait partitions release");
+
         if (releaseLatch == null) {
             assert !distributed : "Partitions release latch must be 
initialized in distributed mode.";
 
@@ -1714,6 +1752,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         catch (IgniteCheckedException e) {
             U.warn(log, "Stop waiting for partitions release latch: " + 
e.getMessage());
         }
+
+        timeBag.finishGlobalStage("Wait partitions release latch");
     }
 
     /**
@@ -1788,8 +1828,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void sendLocalPartitions(ClusterNode node) throws 
IgniteCheckedException {
         assert node != null;
 
-        long time = System.currentTimeMillis();
-
         GridDhtPartitionsSingleMessage msg;
 
         // Reset lost partitions before sending local partitions to 
coordinator.
@@ -1824,6 +1862,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         else if (localJoinExchange())
             
msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 
+        msg.exchangeStartTime(startTime);
+
         if (log.isTraceEnabled())
             log.trace("Sending local partitions [nodeId=" + node.id() + ", 
exchId=" + exchId + ", msg=" + msg + ']');
 
@@ -1834,9 +1874,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (log.isDebugEnabled())
                 log.debug("Node left during partition exchange [nodeId=" + 
node.id() + ", exchId=" + exchId + ']');
         }
-
-        if (log.isInfoEnabled())
-            log.info("Sending Single Message performed in " + 
(System.currentTimeMillis() - time) + " ms.");
     }
 
     /**
@@ -1992,6 +2029,28 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         super.onDone(null, null);
     }
 
+    /**
+     * Make a log message that contains given exchange timings.
+     *
+     * @param header Header of log message.
+     * @param timings Exchange stages timings.
+     * @return Log message with exchange timings and exchange version.
+     */
+    private String exchangeTimingsLogMessage(String header, List<String> 
timings) {
+        StringBuilder timingsToLog = new StringBuilder();
+
+        timingsToLog.append(header).append(" [");
+        timingsToLog.append("startVer=").append(initialVersion());
+        timingsToLog.append(", resVer=").append(topologyVersion());
+
+        for (String stageTiming : timings)
+            timingsToLog.append(", ").append(stageTiming);
+
+        timingsToLog.append(']');
+
+        return timingsToLog.toString();
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, 
@Nullable Throwable err) {
         assert res != null || err != null : "TopVer=" + res + ", err=" + err;
@@ -2130,14 +2189,26 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         if (super.onDone(res, err)) {
             afterLsnrCompleteFut.onDone();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Completed partition exchange [localNode=" + 
cctx.localNodeId() + ", exchange= " + this +
-                        ", durationFromInit=" + (U.currentTimeMillis() - 
initTs) + ']');
-            }
-            else if (log.isInfoEnabled()) {
+            if (log.isInfoEnabled()) {
                 log.info("Completed partition exchange [localNode=" + 
cctx.localNodeId() +
-                        ", exchange=" + shortInfo() + ", topVer=" + 
topologyVersion() +
-                        ", durationFromInit=" + (U.currentTimeMillis() - 
initTs) + ']');
+                        ", exchange=" + (log.isDebugEnabled() ? this : 
shortInfo()) + ", topVer=" + topologyVersion() + "]");
+
+                if (err == null) {
+                    timeBag.finishGlobalStage("Exchange done");
+
+                    // Collect all stages timings.
+                    List<String> timings = timeBag.stagesTimings();
+
+                    if (discoveryLag != null && discoveryLag.get1() != 0)
+                        timings.add("Discovery lag=" + discoveryLag.get1() +
+                            " ms, Latest started node id=" + 
discoveryLag.get2());
+
+                    log.info(exchangeTimingsLogMessage("Exchange timings", 
timings));
+
+                    List<String> localTimings = 
timeBag.longestLocalStagesTimings(3);
+
+                    log.info(exchangeTimingsLogMessage("Exchange longest local 
stages", localTimings));
+                }
             }
 
             initFut.onDone(err == null);
@@ -2175,6 +2246,42 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Calculates discovery lag (Maximal difference between exchange start 
times across all nodes).
+     *
+     * @param declared Single messages that were expected to be received 
during exchange.
+     * @param merged Single messages from nodes that were merged during 
exchange.
+     *
+     * @return Pair with discovery lag and node id which started exchange 
later than others.
+     */
+    private T2<Long, UUID> calculateDiscoveryLag(
+        Map<UUID, GridDhtPartitionsSingleMessage> declared,
+        Map<UUID, GridDhtPartitionsSingleMessage> merged
+    ) {
+        Map<UUID, GridDhtPartitionsSingleMessage> msgs = new 
HashMap<>(declared);
+
+        msgs.putAll(merged);
+
+        long minStartTime = startTime;
+        long maxStartTime = startTime;
+        UUID latestStartedNode = cctx.localNodeId();
+
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> msg : 
msgs.entrySet()) {
+            UUID nodeId = msg.getKey();
+            long exchangeTime = msg.getValue().exchangeStartTime();
+
+            if (exchangeTime != 0) {
+                minStartTime = Math.min(minStartTime, exchangeTime);
+                maxStartTime = Math.max(maxStartTime, exchangeTime);
+            }
+
+            if (maxStartTime == exchangeTime)
+                latestStartedNode = nodeId;
+        }
+
+        return new T2<>(TimeUnit.NANOSECONDS.toMillis(maxStartTime - 
minStartTime), latestStartedNode);
+    }
+
+    /**
      * @param exchangeActions Exchange actions.
      * @return Map of cache names and start descriptors.
      */
@@ -2743,6 +2850,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
         }
+
         if (allReceived) {
             if (!awaitSingleMapUpdates())
                 return;
@@ -2977,8 +3085,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
         boolean detected = false;
 
-        long time = System.currentTimeMillis();
-
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
@@ -3003,8 +3109,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             cctx.exchange().scheduleResendPartitions();
         }
 
-        if (log.isInfoEnabled())
-            log.info("Detecting lost partitions performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+        timeBag.finishGlobalStage("Detect lost partitions");
     }
 
     /**
@@ -3100,6 +3205,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
         try {
+            initFut.get();
+
+            timeBag.finishGlobalStage("Waiting for all single messages");
+
             assert crd.isLocal();
 
             assert partHistSuppliers.isEmpty() : partHistSuppliers;
@@ -3123,12 +3232,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (log.isInfoEnabled())
                     log.info("Coordinator received all messages, try merge 
[ver=" + initialVersion() + ']');
 
-                long time = System.currentTimeMillis();
-
                 boolean finish = 
cctx.exchange().mergeExchangesOnCoordinator(this);
 
-                if (log.isInfoEnabled())
-                    log.info("Exchanges merging performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+                timeBag.finishGlobalStage("Exchanges merge");
 
                 if (!finish)
                     return;
@@ -3164,8 +3270,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
 
-            long time = System.currentTimeMillis();
-
             if (exchCtx.mergeExchanges()) {
                 synchronized (mux) {
                     if (mergedJoinExchMsgs != null) {
@@ -3199,8 +3303,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (log.isInfoEnabled())
-                log.info("Affinity changes (coordinator) applied in " + 
(System.currentTimeMillis() - time) + " ms.");
+            timeBag.finishGlobalStage("Affinity recalculation (crd)");
 
             Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
 
@@ -3233,6 +3336,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
+            timeBag.finishGlobalStage("Collect update counters and create 
affinity messages");
+
             validatePartitionsState();
 
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
@@ -3266,22 +3371,25 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
 
             // Recalculate new affinity based on partitions availability.
-            if (!exchCtx.mergeExchanges() && forceAffReassignment)
+            if (!exchCtx.mergeExchanges() && forceAffReassignment) {
                 idealAffDiff = 
cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this);
 
+                timeBag.finishGlobalStage("Ideal affinity diff calculation 
(enforced)");
+            }
+
             for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
                 if (!grpCtx.isLocal())
                     grpCtx.topology().applyUpdateCounters();
             }
 
+            timeBag.finishGlobalStage("Apply update counters");
+
             updateLastVersion(cctx.versions().last());
 
             cctx.versions().onExchange(lastVer.get().order());
 
             IgniteProductVersion minVer = 
exchCtx.events().discoveryCache().minimumNodeVersion();
 
-            time = System.currentTimeMillis();
-
             GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
                 minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 
0);
 
@@ -3298,8 +3406,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             msg.prepareMarshal(cctx);
 
-            if (log.isInfoEnabled())
-                log.info("Preparing Full Message performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+            timeBag.finishGlobalStage("Full message preparing");
 
             synchronized (mux) {
                 finishState = new FinishState(crd.id(), resTopVer, msg);
@@ -3310,8 +3417,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
 
-                time = System.currentTimeMillis();
-
                 IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> 
fut = cctx.affinity().initAffinityOnNodeLeft(this);
 
                 if (!fut.isDone()) {
@@ -3323,9 +3428,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
                 else
                     onAffinityInitialized(fut);
-
-                if (log.isInfoEnabled())
-                    log.info("Centralized affinity changes are performed in " 
+ (System.currentTimeMillis() - time) + " ms.");
             }
             else {
                 Set<ClusterNode> nodes;
@@ -3349,6 +3451,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             }
                         }
                     }
+                    else
+                        mergedJoinExchMsgs0 = Collections.emptyMap();
 
                     if (!F.isEmpty(sndResNodes))
                         nodes.addAll(sndResNodes);
@@ -3357,6 +3461,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (!nodes.isEmpty())
                     sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, 
joinedNodeAff);
 
+                timeBag.finishGlobalStage("Full message sending");
+
+                discoveryLag = calculateDiscoveryLag(
+                    msgs,
+                    mergedJoinExchMsgs0
+                );
+
                 partitionsSent = true;
 
                 if (!stateChangeExchange())
@@ -3413,6 +3524,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 cctx.discovery().sendCustomEvent(stateFinishMsg);
 
+                timeBag.finishGlobalStage("State finish message sending");
+
                 if (!centralizedAff)
                     onDone(exchCtx.events().topologyVersion(), null);
             }
@@ -3451,8 +3564,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * Validates that partition update counters and cache sizes for all caches 
are consistent.
      */
     private void validatePartitionsState() {
-        long time = System.currentTimeMillis();
-
         try {
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
@@ -3492,16 +3603,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             throw new IgniteException("Failed to validate partitions state", 
e);
         }
 
-        if (log.isInfoEnabled())
-            log.info("Partitions validation performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+        timeBag.finishGlobalStage("Validate partitions states");
     }
 
     /**
      *
      */
     private void assignPartitionsStates() {
-        long time = System.currentTimeMillis();
-
         try {
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
@@ -3526,8 +3634,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             throw new IgniteException("Failed to assign partition states", e);
         }
 
-        if (log.isInfoEnabled())
-            log.info("Partitions assignment performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+        timeBag.finishGlobalStage("Assign partitions states");
     }
 
     /**
@@ -3538,8 +3645,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         // Reserve at least 2 threads for system operations.
         int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
 
-        long time = System.currentTimeMillis();
-
         try {
             U.<CacheGroupContext, Void>doInParallel(
                 parallelismLvl,
@@ -3556,8 +3661,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             throw new IgniteException("Failed to finalize partition counters", 
e);
         }
 
-        if (log.isInfoEnabled())
-            log.info("Partition counters finalization performed in " + 
(System.currentTimeMillis() - time) + " ms.");
+        timeBag.finishGlobalStage("Finalize update counters");
     }
 
     /**
@@ -3800,6 +3904,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             assert exchId.equals(msg.exchangeId()) : msg;
             assert msg.lastVersion() != null : msg;
 
+            timeBag.finishGlobalStage("Waiting for Full message");
+
             if (checkCrd) {
                 assert node != null;
 
@@ -3875,8 +3981,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             AffinityTopologyVersion resTopVer = initialVersion();
 
-            long time = System.currentTimeMillis();
-
             if (exchCtx.mergeExchanges()) {
                 if (msg.resultTopologyVersion() != null && 
!initialVersion().equals(msg.resultTopologyVersion())) {
                     if (log.isInfoEnabled()) {
@@ -3920,8 +4024,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             else if (forceAffReassignment)
                 cctx.affinity().applyAffinityFromFullMessage(this, msg);
 
-            if (log.isInfoEnabled())
-                log.info("Affinity changes applied in " + 
(System.currentTimeMillis() - time) + " ms.");
+            timeBag.finishGlobalStage("Affinity recalculation");
 
             if (dynamicCacheStartExchange() && 
!F.isEmpty(exchangeGlobalExceptions)) {
                 assert cctx.localNode().isClient();
@@ -3963,8 +4066,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         partHistSuppliers.putAll(msg.partitionHistorySuppliers());
 
-        long time = System.currentTimeMillis();
-
         // Reserve at least 2 threads for system operations.
         int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
 
@@ -4013,9 +4114,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         partitionsReceived = true;
 
-        if (log.isInfoEnabled())
-            log.info("Full map updating for " + msg.partitions().size()
-                + " groups performed in " + (System.currentTimeMillis() - 
time) + " ms.");
+        timeBag.finishGlobalStage("Full map updating");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 403b862..3921106 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -102,6 +102,9 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     @GridDirectCollection(Integer.class)
     private Collection<Integer> grpsAffRequest;
 
+    /** Start time of exchange on node which sent this message in nanoseconds. 
*/
+    private long exchangeStartTime;
+
     /**
      * Exchange finish message, sent to new coordinator when it tries to
      * restore state after previous coordinator failed during exchange.
@@ -315,6 +318,20 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         return err;
     }
 
+    /**
+     * Start time of exchange on node which sent this message.
+     */
+    public long exchangeStartTime() {
+        return exchangeStartTime;
+    }
+
+    /**
+     * @param exchangeStartTime Start time of exchange.
+     */
+    public void exchangeStartTime(long exchangeStartTime) {
+        this.exchangeStartTime = exchangeStartTime;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
@@ -473,40 +490,47 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("finishMsg", finishMsg))
+                if (!writer.writeLong("exchangeStartTime", exchangeStartTime))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, 
MessageCollectionItemType.INT))
+                if (!writer.writeMessage("finishMsg", finishMsg))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, 
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByteArray("partHistCntrsBytes", 
partHistCntrsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partHistCntrsBytes", 
partHistCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
                 if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes))
                     return false;
 
                 writer.incrementState();
+
         }
 
         return true;
@@ -548,7 +572,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 9:
-                finishMsg = reader.readMessage("finishMsg");
+                exchangeStartTime = reader.readLong("exchangeStartTime");
 
                 if (!reader.isLastRead())
                     return false;
@@ -556,7 +580,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
-                grpsAffRequest = reader.readCollection("grpsAffRequest", 
MessageCollectionItemType.INT);
+                finishMsg = reader.readMessage("finishMsg");
 
                 if (!reader.isLastRead())
                     return false;
@@ -564,7 +588,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 11:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                grpsAffRequest = reader.readCollection("grpsAffRequest", 
MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -572,7 +596,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 12:
-                partHistCntrsBytes = 
reader.readByteArray("partHistCntrsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -580,7 +604,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 13:
-                partsBytes = reader.readByteArray("partsBytes");
+                partHistCntrsBytes = 
reader.readByteArray("partHistCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -588,12 +612,21 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 14:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 partsSizesBytes = reader.readByteArray("partsSizesBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -606,7 +639,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 11c6091..8653cb9 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1310,6 +1310,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                         if 
(cacheGroup.localStartVersion().equals(fut.initialVersion()))
                             
cacheGroup.topology().afterStateRestored(fut.initialVersion());
+
+                        fut.timeBag().finishLocalStage("Restore partition 
states " +
+                            "[grp=" + cacheGroup.cacheOrGroupName() + "]");
                     }
                     finally {
                         cctx.database().checkpointReadUnlock();
@@ -1318,6 +1321,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     return null;
                 }
             );
+
+            fut.timeBag().finishGlobalStage("Restore partition states");
         }
 
         if (cctx.kernalContext().query().moduleEnabled()) {
@@ -2040,8 +2045,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * @throws IgniteCheckedException If first checkpoint has failed.
      */
     @Override public void onStateRestored(AffinityTopologyVersion topVer) 
throws IgniteCheckedException {
-        long time = System.currentTimeMillis();
-
         IgniteThread cpThread = new IgniteThread(cctx.igniteInstanceName(), 
"db-checkpoint-thread", checkpointer);
 
         cpThread.start();
@@ -2052,9 +2055,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         if (chp != null)
             chp.cpBeginFut.get();
-
-        if (log.isInfoEnabled())
-            log.info("Checkpointer initilialzation performed in " + 
(System.currentTimeMillis() - time) + " ms.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
new file mode 100644
index 0000000..83cd7bc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright (C) 2008 The Guava Authors
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.DAYS;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * An object that measures elapsed time in nanoseconds. It is useful to 
measure elapsed time using
+ * this class instead of direct calls to {@link System#nanoTime} for a few 
reasons:
+ *
+ * <ul>
+ *   <li>An alternate time source can be substituted, for testing or 
performance reasons.
+ *   <li>As documented by {@code nanoTime}, the value returned has no absolute 
meaning, and can only
+ *       be interpreted as relative to another timestamp returned by {@code 
nanoTime} at a different
+ *       time. {@code Stopwatch} is a more effective abstraction because it 
exposes only these
+ *       relative values, not the absolute ones.
+ * </ul>
+ *
+ * <p>Basic usage:
+ *
+ * <pre>{@code
+ * Stopwatch stopwatch = Stopwatch.createStarted();
+ * doSomething();
+ * stopwatch.stop(); // optional
+ *
+ * Duration duration = stopwatch.elapsed();
+ *
+ * log.info("time: " + stopwatch); // formatted string like "12.3 ms"
+ * }</pre>
+ *
+ * <p>Stopwatch methods are not idempotent; it is an error to start or stop a 
stopwatch that is
+ * already in the desired state.
+ *
+ * <p>When testing code that uses this class, use {@link 
#createUnstarted(IgniteTicker)} or {@link
+ * #createStarted(IgniteTicker)} to supply a fake or mock ticker. This allows 
you to simulate any valid
+ * behavior of the stopwatch.
+ *
+ * <p><b>Note:</b> This class is not thread-safe.
+ *
+ * <p><b>Warning for Android users:</b> a stopwatch with default behavior may 
not continue to keep
+ * time while the device is asleep. Instead, create one like this:
+ *
+ * <pre>{@code
+ * Stopwatch.createStarted(
+ *      new Ticker() {
+ *        public long read() {
+ *          return android.os.SystemClock.elapsedRealtimeNanos();
+ *        }
+ *      });
+ * }</pre>
+ */
+@SuppressWarnings("GoodTime") // lots of violations
+public final class IgniteStopwatch {
+    /** Ticker. */
+    private final IgniteTicker ticker;
+    /** Is running. */
+    private boolean isRunning;
+    /** Elapsed nanos. */
+    private long elapsedNanos;
+    /** Start tick. */
+    private long startTick;
+
+    /**
+     * Creates (but does not start) a new stopwatch using {@link 
System#nanoTime} as its time source.
+     */
+    public static IgniteStopwatch createUnstarted() {
+        return new IgniteStopwatch();
+    }
+
+    /**
+     * Creates (but does not start) a new stopwatch, using the specified time 
source.
+     */
+    public static IgniteStopwatch createUnstarted(IgniteTicker ticker) {
+        return new IgniteStopwatch(ticker);
+    }
+
+    /**
+     * Creates (and starts) a new stopwatch using {@link System#nanoTime} as 
its time source.
+     */
+    public static IgniteStopwatch createStarted() {
+        return new IgniteStopwatch().start();
+    }
+
+    /**
+     * Creates (and starts) a new stopwatch, using the specified time source.
+     */
+    public static IgniteStopwatch createStarted(IgniteTicker ticker) {
+        return new IgniteStopwatch(ticker).start();
+    }
+
+    /**
+     * Default constructor.
+     */
+    IgniteStopwatch() {
+        this.ticker = IgniteTicker.systemTicker();
+    }
+
+    /**
+     * @param ticker Ticker.
+     */
+    IgniteStopwatch(@NotNull IgniteTicker ticker) {
+        this.ticker = ticker;
+    }
+
+    /**
+     * Returns {@code true} if {@link #start()} has been called on this 
stopwatch, and {@link #stop()}
+     * has not been called since the last call to {@code start()}.
+     */
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    /**
+     * Starts the stopwatch.
+     *
+     * @return this {@code Stopwatch} instance
+     * @throws IllegalStateException if the stopwatch is already running.
+     */
+    public IgniteStopwatch start() {
+        assert !isRunning : "This stopwatch is already running.";
+
+        isRunning = true;
+
+        startTick = ticker.read();
+
+        return this;
+    }
+
+    /**
+     * Stops the stopwatch. Future reads will return the fixed duration that 
had elapsed up to this
+     * point.
+     *
+     * @return this {@code Stopwatch} instance
+     * @throws IllegalStateException if the stopwatch is already stopped.
+     */
+    public IgniteStopwatch stop() {
+        long tick = ticker.read();
+
+        assert !isRunning : "This stopwatch is already running.";
+
+        isRunning = false;
+        elapsedNanos += tick - startTick;
+        return this;
+    }
+
+    /**
+     * Sets the elapsed time for this stopwatch to zero, and places it in a 
stopped state.
+     *
+     * @return this {@code Stopwatch} instance
+     */
+    public IgniteStopwatch reset() {
+        elapsedNanos = 0;
+
+        isRunning = false;
+
+        return this;
+    }
+
+    /**
+     *
+     */
+    private long elapsedNanos() {
+        return isRunning ? ticker.read() - startTick + elapsedNanos : 
elapsedNanos;
+    }
+
+    /**
+     * Returns the current elapsed time shown on this stopwatch, expressed in 
the desired time unit,
+     * with any fraction rounded down.
+     *
+     * <p><b>Note:</b> the overhead of measurement can be more than a 
microsecond, so it is generally
+     * not useful to specify {@link TimeUnit#NANOSECONDS} precision here.
+     *
+     * <p>It is generally not a good idea to use an ambiguous, unitless {@code 
long} to represent
+     * elapsed time. Therefore, we recommend using {@link #elapsed()} instead, 
which returns a
+     * strongly-typed {@link Duration} instance.
+     */
+    public long elapsed(TimeUnit desiredUnit) {
+        return desiredUnit.convert(elapsedNanos(), NANOSECONDS);
+    }
+
+    /**
+     * Returns the current elapsed time shown on this stopwatch as a {@link 
Duration}. Unlike {@link
+     * #elapsed(TimeUnit)}, this method does not lose any precision due to 
rounding.
+     */
+    public Duration elapsed() {
+        return Duration.ofNanos(elapsedNanos());
+    }
+
+    /**
+     * @param nanos Nanos.
+     */
+    private static TimeUnit chooseUnit(long nanos) {
+        if (DAYS.convert(nanos, NANOSECONDS) > 0) return DAYS;
+        if (HOURS.convert(nanos, NANOSECONDS) > 0) return HOURS;
+        if (MINUTES.convert(nanos, NANOSECONDS) > 0) return MINUTES;
+        if (SECONDS.convert(nanos, NANOSECONDS) > 0) return SECONDS;
+        if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) return MILLISECONDS;
+        if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) return MICROSECONDS;
+        return NANOSECONDS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java
new file mode 100644
index 0000000..1f7f41d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright (C) 2011 The Guava Authors
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * A time source; returns a time value representing the number of nanoseconds 
elapsed since some
+ * fixed but arbitrary point in time. Note that most users should use {@link 
IgniteStopwatch} instead of
+ * interacting with this class directly.
+ *
+ * <p><b>Warning:</b> this interface can only be used to measure elapsed time, 
not wall time.
+ */
+public abstract class IgniteTicker {
+    /** Constructor for use by subclasses. */
+    protected IgniteTicker() {}
+
+    /** Returns the number of nanoseconds elapsed since this ticker's fixed 
point of reference. */
+    public abstract long read();
+
+    /**
+     * A ticker that reads the current time using {@link System#nanoTime}.
+     */
+    public static IgniteTicker systemTicker() {
+        return SYSTEM_TICKER;
+    }
+
+    /** System ticker. */
+    private static final IgniteTicker SYSTEM_TICKER =
+        new IgniteTicker() {
+            @Override public long read() {
+                return System.nanoTime();
+            }
+        };
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java
new file mode 100644
index 0000000..0da1ee7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Utility class to measure and collect timings of some execution workflow.
+ */
+public class TimeBag {
+    /** Initial global stage. */
+    private final CompositeStage INITIAL_STAGE = new CompositeStage("", 0, new 
HashMap<>());
+
+    /** Lock. */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Global stopwatch. */
+    private final IgniteStopwatch globalStopwatch = 
IgniteStopwatch.createStarted();
+
+    /** Measurement unit. */
+    private final TimeUnit measurementUnit;
+
+    /** List of global stages (guarded by {@code lock}). */
+    private final List<CompositeStage> stages;
+
+    /** List of current local stages separated by threads (guarded by {@code 
lock}). */
+    private Map<String, List<Stage>> localStages;
+
+    /** Last seen global stage by thread. */
+    private final ThreadLocal<CompositeStage> tlLastSeenStage = 
ThreadLocal.withInitial(() -> INITIAL_STAGE);
+
+    /** Thread-local stopwatch. */
+    private final ThreadLocal<IgniteStopwatch> tlStopwatch = 
ThreadLocal.withInitial(IgniteStopwatch::createUnstarted);
+
+
+    /**
+     * Default constructor.
+     */
+    public TimeBag() {
+        this(TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param measurementUnit Measurement unit.
+     */
+    public TimeBag(TimeUnit measurementUnit) {
+        this.stages = new ArrayList<>();
+        this.localStages = new ConcurrentHashMap<>();
+        this.measurementUnit = measurementUnit;
+
+        this.stages.add(INITIAL_STAGE);
+    }
+
+    /**
+     *
+     */
+    private CompositeStage lastCompletedGlobalStage() {
+        assert !stages.isEmpty() : "No stages :(";
+
+        return stages.get(stages.size() - 1);
+    }
+
+    /**
+     * @param description Description.
+     */
+    public void finishGlobalStage(String description) {
+        lock.writeLock().lock();
+
+        try {
+            stages.add(
+                new CompositeStage(description, 
globalStopwatch.elapsed(measurementUnit), 
Collections.unmodifiableMap(localStages))
+            );
+
+            localStages = new ConcurrentHashMap<>();
+
+            globalStopwatch.reset().start();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param description Description.
+     */
+    public void finishLocalStage(String description) {
+        lock.readLock().lock();
+
+        try {
+            CompositeStage lastSeen = tlLastSeenStage.get();
+            CompositeStage lastCompleted = lastCompletedGlobalStage();
+            IgniteStopwatch localStopWatch = tlStopwatch.get();
+
+            Stage stage;
+
+            // We see this stage first time, get elapsed time from last 
completed global stage and start tracking local.
+            if (lastSeen != lastCompleted) {
+                stage = new Stage(description, 
globalStopwatch.elapsed(measurementUnit));
+
+                tlLastSeenStage.set(lastCompleted);
+            }
+            else
+                stage = new Stage(description, 
localStopWatch.elapsed(measurementUnit));
+
+            localStopWatch.reset().start();
+
+            // Associate local stage with current thread name.
+            String threadName = Thread.currentThread().getName();
+
+            localStages.computeIfAbsent(threadName, t -> new 
ArrayList<>()).add(stage);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @return Short name of desired measurement unit.
+     */
+    private String measurementUnitShort() {
+        switch (measurementUnit) {
+            case MILLISECONDS:
+                return "ms";
+            case SECONDS:
+                return "s";
+            case NANOSECONDS:
+                return "ns";
+            case MICROSECONDS:
+                return "mcs";
+            case HOURS:
+                return "h";
+            case MINUTES:
+                return "min";
+            case DAYS:
+                return "days";
+            default:
+                return "";
+        }
+    }
+
+    /**
+     * @return List of string representation of all stage timings.
+     */
+    public List<String> stagesTimings() {
+        lock.readLock().lock();
+
+        try {
+            List<String> timings = new ArrayList<>();
+
+            long totalTime = 0;
+
+            // Skip initial stage.
+            for (int i = 1; i < stages.size(); i++) {
+                CompositeStage stage = stages.get(i);
+
+                totalTime += stage.time();
+
+                timings.add(stage.toString());
+            }
+
+            // Add last stage with summary time of all global stages.
+            timings.add(new Stage("Total time", totalTime).toString());
+
+            return timings;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param maxPerCompositeStage Max count of local stages to collect per 
composite stage.
+     * @return List of string represenation of longest local stages per each 
composite stage.
+     */
+    public List<String> longestLocalStagesTimings(int maxPerCompositeStage) {
+        lock.readLock().lock();
+
+        try {
+            List<String> timings = new ArrayList<>();
+
+            for (int i = 1; i < stages.size(); i++) {
+                CompositeStage stage = stages.get(i);
+
+                if (!stage.localStages.isEmpty()) {
+                    PriorityQueue<Stage> stagesByTime = new PriorityQueue<>();
+
+                    for (Map.Entry<String, List<Stage>> threadAndStages : 
stage.localStages.entrySet()) {
+                        for (Stage locStage : threadAndStages.getValue())
+                            stagesByTime.add(locStage);
+                    }
+
+                    int stageCount = 0;
+                    while (!stagesByTime.isEmpty() && stageCount < 
maxPerCompositeStage) {
+                        stageCount++;
+
+                        Stage locStage = stagesByTime.poll();
+
+                        timings.add(locStage.toString() + " (parent=" + 
stage.description() + ")");
+                    }
+                }
+            }
+
+            return timings;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     *
+     */
+    private class CompositeStage extends Stage {
+        /** Local stages. */
+        private final Map<String, List<Stage>> localStages;
+
+        /**
+         * @param description Description.
+         * @param time Time.
+         * @param localStages Local stages.
+         */
+        public CompositeStage(String description, long time, Map<String, 
List<Stage>> localStages) {
+            super(description, time);
+
+            this.localStages = localStages;
+        }
+
+        /**
+         *
+         */
+        public Map<String, List<Stage>> localStages() {
+            return localStages;
+        }
+    }
+
+    /**
+     *
+     */
+    private class Stage implements Comparable<Stage> {
+        /** Description. */
+        private final String description;
+
+        /** Time. */
+        private final long time;
+
+        /**
+         * @param description Description.
+         * @param time Time.
+         */
+        public Stage(String description, long time) {
+            this.description = description;
+            this.time = time;
+        }
+
+        /**
+         *
+         */
+        public String description() {
+            return description;
+        }
+
+        /**
+         *
+         */
+        public long time() {
+            return time;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("stage=").append('"').append(description()).append('"');
+            sb.append(' ').append('(').append(time()).append(' 
').append(measurementUnitShort()).append(')');
+
+            return sb.toString();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull TimeBag.Stage o) {
+            if (o.time > time)
+                return -1;
+            if (o.time < time)
+                return 1;
+            return o.description.compareTo(description);
+        }
+    }
+}

Reply via email to