Repository: ignite Updated Branches: refs/heads/master 9cec13857 -> edcc1089a
IGNITE-10493 Refactor exchange timings measurement - Fixes #5688. Signed-off-by: Pavel Kovalenko <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/edcc1089 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/edcc1089 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/edcc1089 Branch: refs/heads/master Commit: edcc1089aaa8efa7ede38af2a5dfdb0ef00b7bc5 Parents: 9cec138 Author: Pavel Kovalenko <[email protected]> Authored: Thu Dec 20 15:51:03 2018 +0300 Committer: Pavel Kovalenko <[email protected]> Committed: Thu Dec 20 15:51:03 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 93 +++--- .../GridCachePartitionExchangeManager.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 211 +++++++++---- .../GridDhtPartitionsSingleMessage.java | 55 +++- .../GridCacheDatabaseSharedManager.java | 10 +- .../ignite/internal/util/IgniteStopwatch.java | 230 ++++++++++++++ .../ignite/internal/util/IgniteTicker.java | 52 ++++ .../apache/ignite/internal/util/TimeBag.java | 312 +++++++++++++++++++ 8 files changed, 852 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 07fbef1..61d88c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -800,15 +800,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ) throws IgniteCheckedException { assert exchActions != null && !exchActions.empty() : exchActions; - long time = System.currentTimeMillis(); - IgniteInternalFuture<?> res = cachesRegistry.update(exchActions); // Affinity did not change for existing caches. onCustomMessageNoAffinityChange(fut, crd, exchActions); - if (log.isInfoEnabled()) - log.info("Updating caches registry performed in " + (System.currentTimeMillis() - time) + " ms."); + fut.timeBag().finishGlobalStage("Update caches registry"); processCacheStartRequests(fut, crd, exchActions); @@ -871,8 +868,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final ExchangeDiscoveryEvents evts = fut.context().events(); - long time = U.currentTimeMillis(); - Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new LinkedHashMap<>(); for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) { @@ -954,15 +949,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } - if (log.isInfoEnabled()) - log.info("Caches starting performed in " + (U.currentTimeMillis() - time) + " ms."); - - time = U.currentTimeMillis(); + fut.timeBag().finishGlobalStage("Start caches"); initAffinityOnCacheGroupsStart(fut, exchActions, crd); - if (log.isInfoEnabled()) - log.info("Affinity initialization for started caches performed in " + (U.currentTimeMillis() - time) + " ms."); + fut.timeBag().finishGlobalStage("Affinity initialization on cache group start"); } /** @@ -998,6 +989,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } + fut.timeBag().finishLocalStage("Affinity initialization on cache group start " + + "[grp=" + grpDesc.cacheOrGroupName() + "]"); + return null; } ); @@ -1106,6 +1100,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap newAssignment = idealAssignment; aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); + + exchFut.timeBag().finishLocalStage("Affinity recalculate by change affinity message " + + "[grp=" + aff.cacheOrGroupName() + "]"); } }); } @@ -1190,6 +1187,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.clientEventTopologyChange(exchFut.firstEvent(), topVer); cctx.exchange().exchangerUpdateHeartbeat(); + + exchFut.timeBag().finishLocalStage("Affinity change by custom message " + + "[grp=" + aff.cacheOrGroupName() + "]"); } }); } @@ -1369,6 +1369,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion()); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity initialization (crd, new cache) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } } }); @@ -1380,6 +1383,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap initAffinity(cachesRegistry.group(aff.groupId()), aff, fut); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity initialization (new cache) " + + "[grp=" + aff.cacheOrGroupName() + "]"); } } }); @@ -1470,8 +1476,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new ConcurrentHashMap<>(); - long time = System.currentTimeMillis(); - forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { ExchangeDiscoveryEvents evts = fut.context().events(); @@ -1503,11 +1507,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap newAssignment = idealAssignment; aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + + fut.timeBag().finishLocalStage("Affinity applying from full message " + + "[grp=" + aff.cacheOrGroupName() + "]"); } }); - - if (log.isInfoEnabled()) - log.info("Affinity applying from full message performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -1532,8 +1536,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Long, ClusterNode> nodesByOrder = new ConcurrentHashMap<>(); - long time = System.currentTimeMillis(); - forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { ExchangeDiscoveryEvents evts = fut.context().events(); @@ -1571,11 +1573,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap calculateAndInit(evts, aff, evts.topologyVersion()); grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut); + + fut.timeBag().finishLocalStage("Affinity initialization (local join) " + + "[grp=" + grp.cacheOrGroupName() + "]"); } }); - - if (log.isInfoEnabled()) - log.info("Affinity initialization on local join performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -1590,8 +1592,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert fut.context().mergeExchanges(); assert evts.hasServerJoin() && !evts.hasServerLeft(); - long time = System.currentTimeMillis(); - WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd); this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; @@ -1604,10 +1604,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } - - if (log.isInfoEnabled()) - log.info("Affinity recalculation (on server join) performed in " - + (System.currentTimeMillis() - time) + " ms."); } /** @@ -1618,8 +1614,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol( final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - long time = System.currentTimeMillis(); - final ExchangeDiscoveryEvents evts = fut.context().events(); assert fut.context().mergeExchanges(); @@ -1627,10 +1621,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Map<Integer, CacheGroupAffinityMessage> result = onReassignmentEnforced(fut); - if (log.isInfoEnabled()) - log.info("Affinity recalculation (on server left) performed in " - + (System.currentTimeMillis() - time) + " ms."); - return result; } @@ -1646,14 +1636,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap { assert DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()); - long time = System.currentTimeMillis(); - Map<Integer, CacheGroupAffinityMessage> result = onReassignmentEnforced(fut); - if (log.isInfoEnabled()) - log.info("Affinity recalculation (custom message) performed in " - + (System.currentTimeMillis() - time) + " ms."); - return result; } @@ -1679,6 +1663,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (!cache.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) cache.affinity().initialize(topVer, assign); + + fut.timeBag().finishLocalStage("Affinity initialization (enforced) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); @@ -1716,11 +1703,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap calculateAndInit(fut.events(), grpHolder.affinity(), topVer); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("First node affinity initialization (node join) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); } - else + else { fetchAffinityOnJoin(fut); + + fut.timeBag().finishLocalStage("Affinity fetch"); + } } else waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd); @@ -1931,6 +1924,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cache.aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache()); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity centralized initialization (crd) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); } @@ -1940,6 +1936,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache()); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity centralized initialization " + + "[grp=" + aff.cacheOrGroupName() + "]"); } }); } @@ -2061,6 +2060,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert old == null : old; cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Coordinator affinity cache init " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); @@ -2139,6 +2141,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap affCache); cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity initialization (node join) " + + "[grp=" + grp.cacheOrGroupName() + "]"); } }); @@ -2177,6 +2182,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } cctx.exchange().exchangerUpdateHeartbeat(); + + fut.timeBag().finishLocalStage("Affinity initialization (crd, node join) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); @@ -2522,6 +2530,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (initAff) grpHolder.affinity().initialize(topVer, newAssignment0); + + fut.timeBag().finishLocalStage("Affinity recalculation (partitions availability) " + + "[grp=" + desc.cacheOrGroupName() + "]"); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 364950c..01c10aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2869,6 +2869,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + exchFut.timeBag().finishGlobalStage("Waiting in exchange queue"); + exchFut.init(newCrd); int dumpCnt = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5b9ebf1..dbc51f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.TimeBag; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -335,6 +336,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Future for wait all exchange listeners comepleted. */ private final GridFutureAdapter<?> afterLsnrCompleteFut = new GridFutureAdapter<>(); + /** Time bag to measure and store exchange stages times. */ + private final TimeBag timeBag; + + /** Start time of exchange. */ + private long startTime = System.nanoTime(); + + /** Discovery lag / Clocks discrepancy, calculated on coordinator when all single messages are received. */ + private T2<Long, UUID> discoveryLag; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -366,6 +376,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log = cctx.logger(getClass()); exchLog = cctx.logger(EXCHANGE_LOG); + timeBag = new TimeBag(); + initFut = new GridFutureAdapter<Boolean>() { @Override public IgniteLogger logger() { return log; @@ -660,6 +672,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @return Object to collect exchange timings. + */ + public TimeBag timeBag() { + return timeBag; + } + + /** * Starts activity. * * @param newCrd {@code True} if node become coordinator on this exchange. @@ -716,6 +735,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", allowMerge=" + exchCtx.mergeExchanges() + ']'); } + timeBag.finishGlobalStage("Exchange parameters initialization"); + ExchangeType exchange; if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { @@ -800,6 +821,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte updateTopologies(crdNode); + timeBag.finishGlobalStage("Determine exchange type"); + switch (exchange) { case ALL: { distributedExchange(); @@ -883,6 +906,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte finally { cctx.exchange().exchangerBlockingSectionEnd(); } + + timeBag.finishGlobalStage("Baseline change callback"); } cctx.exchange().exchangerBlockingSectionBegin(); @@ -894,6 +919,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.exchange().exchangerBlockingSectionEnd(); } + timeBag.finishGlobalStage("Components activation"); + IgniteInternalFuture<?> cachesRegistrationFut = cctx.cache().startCachesOnLocalJoin(initialVersion(), exchActions == null ? null : exchActions.localJoinContext()); @@ -1340,6 +1367,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + timeBag.finishGlobalStage("Preloading notification"); + cctx.exchange().exchangerBlockingSectionBegin(); try { @@ -1352,6 +1381,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.exchange().exchangerBlockingSectionEnd(); } + timeBag.finishGlobalStage("WAL history reservation"); + // Skipping wait on local join is available when all cluster nodes have the same protocol. boolean skipWaitOnLocalJoin = cctx.exchange().latch().canSkipJoiningNodes(initialVersion()) && localJoinExchange(); @@ -1442,6 +1473,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + timeBag.finishGlobalStage("After states restored callback"); + changeWalModeIfNeeded(); if (events().hasServerLeft()) @@ -1451,8 +1484,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { if (crd.isLocal()) { - if (remaining.isEmpty()) + if (remaining.isEmpty()) { + initFut.onDone(true); + onAllReceived(null); + } } else sendPartitions(crd); @@ -1674,6 +1710,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + timeBag.finishGlobalStage("Wait partitions release"); + if (releaseLatch == null) { assert !distributed : "Partitions release latch must be initialized in distributed mode."; @@ -1714,6 +1752,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte catch (IgniteCheckedException e) { U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage()); } + + timeBag.finishGlobalStage("Wait partitions release latch"); } /** @@ -1788,8 +1828,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { assert node != null; - long time = System.currentTimeMillis(); - GridDhtPartitionsSingleMessage msg; // Reset lost partitions before sending local partitions to coordinator. @@ -1824,6 +1862,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (localJoinExchange()) msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + msg.exchangeStartTime(startTime); + if (log.isTraceEnabled()) log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); @@ -1834,9 +1874,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isDebugEnabled()) log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); } - - if (log.isInfoEnabled()) - log.info("Sending Single Message performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -1992,6 +2029,28 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte super.onDone(null, null); } + /** + * Make a log message that contains given exchange timings. + * + * @param header Header of log message. + * @param timings Exchange stages timings. + * @return Log message with exchange timings and exchange version. + */ + private String exchangeTimingsLogMessage(String header, List<String> timings) { + StringBuilder timingsToLog = new StringBuilder(); + + timingsToLog.append(header).append(" ["); + timingsToLog.append("startVer=").append(initialVersion()); + timingsToLog.append(", resVer=").append(topologyVersion()); + + for (String stageTiming : timings) + timingsToLog.append(", ").append(stageTiming); + + timingsToLog.append(']'); + + return timingsToLog.toString(); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { assert res != null || err != null : "TopVer=" + res + ", err=" + err; @@ -2130,14 +2189,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (super.onDone(res, err)) { afterLsnrCompleteFut.onDone(); - if (log.isDebugEnabled()) { - log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + - ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); - } - else if (log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info("Completed partition exchange [localNode=" + cctx.localNodeId() + - ", exchange=" + shortInfo() + ", topVer=" + topologyVersion() + - ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + ", exchange=" + (log.isDebugEnabled() ? this : shortInfo()) + ", topVer=" + topologyVersion() + "]"); + + if (err == null) { + timeBag.finishGlobalStage("Exchange done"); + + // Collect all stages timings. + List<String> timings = timeBag.stagesTimings(); + + if (discoveryLag != null && discoveryLag.get1() != 0) + timings.add("Discovery lag=" + discoveryLag.get1() + + " ms, Latest started node id=" + discoveryLag.get2()); + + log.info(exchangeTimingsLogMessage("Exchange timings", timings)); + + List<String> localTimings = timeBag.longestLocalStagesTimings(3); + + log.info(exchangeTimingsLogMessage("Exchange longest local stages", localTimings)); + } } initFut.onDone(err == null); @@ -2175,6 +2246,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Calculates discovery lag (Maximal difference between exchange start times across all nodes). + * + * @param declared Single messages that were expected to be received during exchange. + * @param merged Single messages from nodes that were merged during exchange. + * + * @return Pair with discovery lag and node id which started exchange later than others. + */ + private T2<Long, UUID> calculateDiscoveryLag( + Map<UUID, GridDhtPartitionsSingleMessage> declared, + Map<UUID, GridDhtPartitionsSingleMessage> merged + ) { + Map<UUID, GridDhtPartitionsSingleMessage> msgs = new HashMap<>(declared); + + msgs.putAll(merged); + + long minStartTime = startTime; + long maxStartTime = startTime; + UUID latestStartedNode = cctx.localNodeId(); + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> msg : msgs.entrySet()) { + UUID nodeId = msg.getKey(); + long exchangeTime = msg.getValue().exchangeStartTime(); + + if (exchangeTime != 0) { + minStartTime = Math.min(minStartTime, exchangeTime); + maxStartTime = Math.max(maxStartTime, exchangeTime); + } + + if (maxStartTime == exchangeTime) + latestStartedNode = nodeId; + } + + return new T2<>(TimeUnit.NANOSECONDS.toMillis(maxStartTime - minStartTime), latestStartedNode); + } + + /** * @param exchangeActions Exchange actions. * @return Map of cache names and start descriptors. */ @@ -2743,6 +2850,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } + if (allReceived) { if (!awaitSingleMapUpdates()) return; @@ -2977,8 +3085,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void detectLostPartitions(AffinityTopologyVersion resTopVer) { boolean detected = false; - long time = System.currentTimeMillis(); - synchronized (cctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) return; @@ -3003,8 +3109,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.exchange().scheduleResendPartitions(); } - if (log.isInfoEnabled()) - log.info("Detecting lost partitions performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Detect lost partitions"); } /** @@ -3100,6 +3205,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) { try { + initFut.get(); + + timeBag.finishGlobalStage("Waiting for all single messages"); + assert crd.isLocal(); assert partHistSuppliers.isEmpty() : partHistSuppliers; @@ -3123,12 +3232,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isInfoEnabled()) log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); - long time = System.currentTimeMillis(); - boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); - if (log.isInfoEnabled()) - log.info("Exchanges merging performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Exchanges merge"); if (!finish) return; @@ -3164,8 +3270,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null; - long time = System.currentTimeMillis(); - if (exchCtx.mergeExchanges()) { synchronized (mux) { if (mergedJoinExchMsgs != null) { @@ -3199,8 +3303,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (log.isInfoEnabled()) - log.info("Affinity changes (coordinator) applied in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Affinity recalculation (crd)"); Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; @@ -3233,6 +3336,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + timeBag.finishGlobalStage("Collect update counters and create affinity messages"); + validatePartitionsState(); if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { @@ -3266,22 +3371,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } // Recalculate new affinity based on partitions availability. - if (!exchCtx.mergeExchanges() && forceAffReassignment) + if (!exchCtx.mergeExchanges() && forceAffReassignment) { idealAffDiff = cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this); + timeBag.finishGlobalStage("Ideal affinity diff calculation (enforced)"); + } + for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) { if (!grpCtx.isLocal()) grpCtx.topology().applyUpdateCounters(); } + timeBag.finishGlobalStage("Apply update counters"); + updateLastVersion(cctx.versions().last()); cctx.versions().onExchange(lastVer.get().order()); IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion(); - time = System.currentTimeMillis(); - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true, minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0); @@ -3298,8 +3406,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.prepareMarshal(cctx); - if (log.isInfoEnabled()) - log.info("Preparing Full Message performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Full message preparing"); synchronized (mux) { finishState = new FinishState(crd.id(), resTopVer, msg); @@ -3310,8 +3417,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (centralizedAff) { assert !exchCtx.mergeExchanges(); - time = System.currentTimeMillis(); - IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this); if (!fut.isDone()) { @@ -3323,9 +3428,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } else onAffinityInitialized(fut); - - if (log.isInfoEnabled()) - log.info("Centralized affinity changes are performed in " + (System.currentTimeMillis() - time) + " ms."); } else { Set<ClusterNode> nodes; @@ -3349,6 +3451,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } + else + mergedJoinExchMsgs0 = Collections.emptyMap(); if (!F.isEmpty(sndResNodes)) nodes.addAll(sndResNodes); @@ -3357,6 +3461,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!nodes.isEmpty()) sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); + timeBag.finishGlobalStage("Full message sending"); + + discoveryLag = calculateDiscoveryLag( + msgs, + mergedJoinExchMsgs0 + ); + partitionsSent = true; if (!stateChangeExchange()) @@ -3413,6 +3524,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.discovery().sendCustomEvent(stateFinishMsg); + timeBag.finishGlobalStage("State finish message sending"); + if (!centralizedAff) onDone(exchCtx.events().topologyVersion(), null); } @@ -3451,8 +3564,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Validates that partition update counters and cache sizes for all caches are consistent. */ private void validatePartitionsState() { - long time = System.currentTimeMillis(); - try { U.doInParallel( cctx.kernalContext().getSystemExecutorService(), @@ -3492,16 +3603,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte throw new IgniteException("Failed to validate partitions state", e); } - if (log.isInfoEnabled()) - log.info("Partitions validation performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Validate partitions states"); } /** * */ private void assignPartitionsStates() { - long time = System.currentTimeMillis(); - try { U.doInParallel( cctx.kernalContext().getSystemExecutorService(), @@ -3526,8 +3634,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte throw new IgniteException("Failed to assign partition states", e); } - if (log.isInfoEnabled()) - log.info("Partitions assignment performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Assign partitions states"); } /** @@ -3538,8 +3645,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte // Reserve at least 2 threads for system operations. int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - long time = System.currentTimeMillis(); - try { U.<CacheGroupContext, Void>doInParallel( parallelismLvl, @@ -3556,8 +3661,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte throw new IgniteException("Failed to finalize partition counters", e); } - if (log.isInfoEnabled()) - log.info("Partition counters finalization performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Finalize update counters"); } /** @@ -3800,6 +3904,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.equals(msg.exchangeId()) : msg; assert msg.lastVersion() != null : msg; + timeBag.finishGlobalStage("Waiting for Full message"); + if (checkCrd) { assert node != null; @@ -3875,8 +3981,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte AffinityTopologyVersion resTopVer = initialVersion(); - long time = System.currentTimeMillis(); - if (exchCtx.mergeExchanges()) { if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) { if (log.isInfoEnabled()) { @@ -3920,8 +4024,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (forceAffReassignment) cctx.affinity().applyAffinityFromFullMessage(this, msg); - if (log.isInfoEnabled()) - log.info("Affinity changes applied in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Affinity recalculation"); if (dynamicCacheStartExchange() && !F.isEmpty(exchangeGlobalExceptions)) { assert cctx.localNode().isClient(); @@ -3963,8 +4066,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partHistSuppliers.putAll(msg.partitionHistorySuppliers()); - long time = System.currentTimeMillis(); - // Reserve at least 2 threads for system operations. int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); @@ -4013,9 +4114,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partitionsReceived = true; - if (log.isInfoEnabled()) - log.info("Full map updating for " + msg.partitions().size() - + " groups performed in " + (System.currentTimeMillis() - time) + " ms."); + timeBag.finishGlobalStage("Full map updating"); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 403b862..3921106 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -102,6 +102,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectCollection(Integer.class) private Collection<Integer> grpsAffRequest; + /** Start time of exchange on node which sent this message in nanoseconds. */ + private long exchangeStartTime; + /** * Exchange finish message, sent to new coordinator when it tries to * restore state after previous coordinator failed during exchange. @@ -315,6 +318,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes return err; } + /** + * Start time of exchange on node which sent this message. + */ + public long exchangeStartTime() { + return exchangeStartTime; + } + + /** + * @param exchangeStartTime Start time of exchange. + */ + public void exchangeStartTime(long exchangeStartTime) { + this.exchangeStartTime = exchangeStartTime; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { @@ -473,40 +490,47 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 9: - if (!writer.writeMessage("finishMsg", finishMsg)) + if (!writer.writeLong("exchangeStartTime", exchangeStartTime)) return false; writer.incrementState(); case 10: - if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) + if (!writer.writeMessage("finishMsg", finishMsg)) return false; writer.incrementState(); case 11: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 12: - if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 13: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) return false; writer.incrementState(); case 14: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 15: if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes)) return false; writer.incrementState(); + } return true; @@ -548,7 +572,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 9: - finishMsg = reader.readMessage("finishMsg"); + exchangeStartTime = reader.readLong("exchangeStartTime"); if (!reader.isLastRead()) return false; @@ -556,7 +580,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 10: - grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); + finishMsg = reader.readMessage("finishMsg"); if (!reader.isLastRead()) return false; @@ -564,7 +588,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 11: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -572,7 +596,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 12: - partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -580,7 +604,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 13: - partsBytes = reader.readByteArray("partsBytes"); + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); if (!reader.isLastRead()) return false; @@ -588,12 +612,21 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 14: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: partsSizesBytes = reader.readByteArray("partsSizesBytes"); if (!reader.isLastRead()) return false; reader.incrementState(); + } return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class); @@ -606,7 +639,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 11c6091..8653cb9 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1310,6 +1310,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (cacheGroup.localStartVersion().equals(fut.initialVersion())) cacheGroup.topology().afterStateRestored(fut.initialVersion()); + + fut.timeBag().finishLocalStage("Restore partition states " + + "[grp=" + cacheGroup.cacheOrGroupName() + "]"); } finally { cctx.database().checkpointReadUnlock(); @@ -1318,6 +1321,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan return null; } ); + + fut.timeBag().finishGlobalStage("Restore partition states"); } if (cctx.kernalContext().query().moduleEnabled()) { @@ -2040,8 +2045,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @throws IgniteCheckedException If first checkpoint has failed. */ @Override public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException { - long time = System.currentTimeMillis(); - IgniteThread cpThread = new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer); cpThread.start(); @@ -2052,9 +2055,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (chp != null) chp.cpBeginFut.get(); - - if (log.isInfoEnabled()) - log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms."); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java new file mode 100644 index 0000000..83cd7bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2008 The Guava Authors + */ + +package org.apache.ignite.internal.util; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.jetbrains.annotations.NotNull; + +import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * An object that measures elapsed time in nanoseconds. It is useful to measure elapsed time using + * this class instead of direct calls to {@link System#nanoTime} for a few reasons: + * + * <ul> + * <li>An alternate time source can be substituted, for testing or performance reasons. + * <li>As documented by {@code nanoTime}, the value returned has no absolute meaning, and can only + * be interpreted as relative to another timestamp returned by {@code nanoTime} at a different + * time. {@code Stopwatch} is a more effective abstraction because it exposes only these + * relative values, not the absolute ones. + * </ul> + * + * <p>Basic usage: + * + * <pre>{@code + * Stopwatch stopwatch = Stopwatch.createStarted(); + * doSomething(); + * stopwatch.stop(); // optional + * + * Duration duration = stopwatch.elapsed(); + * + * log.info("time: " + stopwatch); // formatted string like "12.3 ms" + * }</pre> + * + * <p>Stopwatch methods are not idempotent; it is an error to start or stop a stopwatch that is + * already in the desired state. + * + * <p>When testing code that uses this class, use {@link #createUnstarted(IgniteTicker)} or {@link + * #createStarted(IgniteTicker)} to supply a fake or mock ticker. This allows you to simulate any valid + * behavior of the stopwatch. + * + * <p><b>Note:</b> This class is not thread-safe. + * + * <p><b>Warning for Android users:</b> a stopwatch with default behavior may not continue to keep + * time while the device is asleep. Instead, create one like this: + * + * <pre>{@code + * Stopwatch.createStarted( + * new Ticker() { + * public long read() { + * return android.os.SystemClock.elapsedRealtimeNanos(); + * } + * }); + * }</pre> + */ +@SuppressWarnings("GoodTime") // lots of violations +public final class IgniteStopwatch { + /** Ticker. */ + private final IgniteTicker ticker; + /** Is running. */ + private boolean isRunning; + /** Elapsed nanos. */ + private long elapsedNanos; + /** Start tick. */ + private long startTick; + + /** + * Creates (but does not start) a new stopwatch using {@link System#nanoTime} as its time source. + */ + public static IgniteStopwatch createUnstarted() { + return new IgniteStopwatch(); + } + + /** + * Creates (but does not start) a new stopwatch, using the specified time source. + */ + public static IgniteStopwatch createUnstarted(IgniteTicker ticker) { + return new IgniteStopwatch(ticker); + } + + /** + * Creates (and starts) a new stopwatch using {@link System#nanoTime} as its time source. + */ + public static IgniteStopwatch createStarted() { + return new IgniteStopwatch().start(); + } + + /** + * Creates (and starts) a new stopwatch, using the specified time source. + */ + public static IgniteStopwatch createStarted(IgniteTicker ticker) { + return new IgniteStopwatch(ticker).start(); + } + + /** + * Default constructor. + */ + IgniteStopwatch() { + this.ticker = IgniteTicker.systemTicker(); + } + + /** + * @param ticker Ticker. + */ + IgniteStopwatch(@NotNull IgniteTicker ticker) { + this.ticker = ticker; + } + + /** + * Returns {@code true} if {@link #start()} has been called on this stopwatch, and {@link #stop()} + * has not been called since the last call to {@code start()}. + */ + public boolean isRunning() { + return isRunning; + } + + /** + * Starts the stopwatch. + * + * @return this {@code Stopwatch} instance + * @throws IllegalStateException if the stopwatch is already running. + */ + public IgniteStopwatch start() { + assert !isRunning : "This stopwatch is already running."; + + isRunning = true; + + startTick = ticker.read(); + + return this; + } + + /** + * Stops the stopwatch. Future reads will return the fixed duration that had elapsed up to this + * point. + * + * @return this {@code Stopwatch} instance + * @throws IllegalStateException if the stopwatch is already stopped. + */ + public IgniteStopwatch stop() { + long tick = ticker.read(); + + assert !isRunning : "This stopwatch is already running."; + + isRunning = false; + elapsedNanos += tick - startTick; + return this; + } + + /** + * Sets the elapsed time for this stopwatch to zero, and places it in a stopped state. + * + * @return this {@code Stopwatch} instance + */ + public IgniteStopwatch reset() { + elapsedNanos = 0; + + isRunning = false; + + return this; + } + + /** + * + */ + private long elapsedNanos() { + return isRunning ? ticker.read() - startTick + elapsedNanos : elapsedNanos; + } + + /** + * Returns the current elapsed time shown on this stopwatch, expressed in the desired time unit, + * with any fraction rounded down. + * + * <p><b>Note:</b> the overhead of measurement can be more than a microsecond, so it is generally + * not useful to specify {@link TimeUnit#NANOSECONDS} precision here. + * + * <p>It is generally not a good idea to use an ambiguous, unitless {@code long} to represent + * elapsed time. Therefore, we recommend using {@link #elapsed()} instead, which returns a + * strongly-typed {@link Duration} instance. + */ + public long elapsed(TimeUnit desiredUnit) { + return desiredUnit.convert(elapsedNanos(), NANOSECONDS); + } + + /** + * Returns the current elapsed time shown on this stopwatch as a {@link Duration}. Unlike {@link + * #elapsed(TimeUnit)}, this method does not lose any precision due to rounding. + */ + public Duration elapsed() { + return Duration.ofNanos(elapsedNanos()); + } + + /** + * @param nanos Nanos. + */ + private static TimeUnit chooseUnit(long nanos) { + if (DAYS.convert(nanos, NANOSECONDS) > 0) return DAYS; + if (HOURS.convert(nanos, NANOSECONDS) > 0) return HOURS; + if (MINUTES.convert(nanos, NANOSECONDS) > 0) return MINUTES; + if (SECONDS.convert(nanos, NANOSECONDS) > 0) return SECONDS; + if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) return MILLISECONDS; + if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) return MICROSECONDS; + return NANOSECONDS; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java new file mode 100644 index 0000000..1f7f41d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTicker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2011 The Guava Authors + */ + +package org.apache.ignite.internal.util; + +/** + * A time source; returns a time value representing the number of nanoseconds elapsed since some + * fixed but arbitrary point in time. Note that most users should use {@link IgniteStopwatch} instead of + * interacting with this class directly. + * + * <p><b>Warning:</b> this interface can only be used to measure elapsed time, not wall time. + */ +public abstract class IgniteTicker { + /** Constructor for use by subclasses. */ + protected IgniteTicker() {} + + /** Returns the number of nanoseconds elapsed since this ticker's fixed point of reference. */ + public abstract long read(); + + /** + * A ticker that reads the current time using {@link System#nanoTime}. + */ + public static IgniteTicker systemTicker() { + return SYSTEM_TICKER; + } + + /** System ticker. */ + private static final IgniteTicker SYSTEM_TICKER = + new IgniteTicker() { + @Override public long read() { + return System.nanoTime(); + } + }; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/edcc1089/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java new file mode 100644 index 0000000..0da1ee7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeBag.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jetbrains.annotations.NotNull; + +/** + * Utility class to measure and collect timings of some execution workflow. + */ +public class TimeBag { + /** Initial global stage. */ + private final CompositeStage INITIAL_STAGE = new CompositeStage("", 0, new HashMap<>()); + + /** Lock. */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Global stopwatch. */ + private final IgniteStopwatch globalStopwatch = IgniteStopwatch.createStarted(); + + /** Measurement unit. */ + private final TimeUnit measurementUnit; + + /** List of global stages (guarded by {@code lock}). */ + private final List<CompositeStage> stages; + + /** List of current local stages separated by threads (guarded by {@code lock}). */ + private Map<String, List<Stage>> localStages; + + /** Last seen global stage by thread. */ + private final ThreadLocal<CompositeStage> tlLastSeenStage = ThreadLocal.withInitial(() -> INITIAL_STAGE); + + /** Thread-local stopwatch. */ + private final ThreadLocal<IgniteStopwatch> tlStopwatch = ThreadLocal.withInitial(IgniteStopwatch::createUnstarted); + + + /** + * Default constructor. + */ + public TimeBag() { + this(TimeUnit.MILLISECONDS); + } + + /** + * @param measurementUnit Measurement unit. + */ + public TimeBag(TimeUnit measurementUnit) { + this.stages = new ArrayList<>(); + this.localStages = new ConcurrentHashMap<>(); + this.measurementUnit = measurementUnit; + + this.stages.add(INITIAL_STAGE); + } + + /** + * + */ + private CompositeStage lastCompletedGlobalStage() { + assert !stages.isEmpty() : "No stages :("; + + return stages.get(stages.size() - 1); + } + + /** + * @param description Description. + */ + public void finishGlobalStage(String description) { + lock.writeLock().lock(); + + try { + stages.add( + new CompositeStage(description, globalStopwatch.elapsed(measurementUnit), Collections.unmodifiableMap(localStages)) + ); + + localStages = new ConcurrentHashMap<>(); + + globalStopwatch.reset().start(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param description Description. + */ + public void finishLocalStage(String description) { + lock.readLock().lock(); + + try { + CompositeStage lastSeen = tlLastSeenStage.get(); + CompositeStage lastCompleted = lastCompletedGlobalStage(); + IgniteStopwatch localStopWatch = tlStopwatch.get(); + + Stage stage; + + // We see this stage first time, get elapsed time from last completed global stage and start tracking local. + if (lastSeen != lastCompleted) { + stage = new Stage(description, globalStopwatch.elapsed(measurementUnit)); + + tlLastSeenStage.set(lastCompleted); + } + else + stage = new Stage(description, localStopWatch.elapsed(measurementUnit)); + + localStopWatch.reset().start(); + + // Associate local stage with current thread name. + String threadName = Thread.currentThread().getName(); + + localStages.computeIfAbsent(threadName, t -> new ArrayList<>()).add(stage); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @return Short name of desired measurement unit. + */ + private String measurementUnitShort() { + switch (measurementUnit) { + case MILLISECONDS: + return "ms"; + case SECONDS: + return "s"; + case NANOSECONDS: + return "ns"; + case MICROSECONDS: + return "mcs"; + case HOURS: + return "h"; + case MINUTES: + return "min"; + case DAYS: + return "days"; + default: + return ""; + } + } + + /** + * @return List of string representation of all stage timings. + */ + public List<String> stagesTimings() { + lock.readLock().lock(); + + try { + List<String> timings = new ArrayList<>(); + + long totalTime = 0; + + // Skip initial stage. + for (int i = 1; i < stages.size(); i++) { + CompositeStage stage = stages.get(i); + + totalTime += stage.time(); + + timings.add(stage.toString()); + } + + // Add last stage with summary time of all global stages. + timings.add(new Stage("Total time", totalTime).toString()); + + return timings; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param maxPerCompositeStage Max count of local stages to collect per composite stage. + * @return List of string represenation of longest local stages per each composite stage. + */ + public List<String> longestLocalStagesTimings(int maxPerCompositeStage) { + lock.readLock().lock(); + + try { + List<String> timings = new ArrayList<>(); + + for (int i = 1; i < stages.size(); i++) { + CompositeStage stage = stages.get(i); + + if (!stage.localStages.isEmpty()) { + PriorityQueue<Stage> stagesByTime = new PriorityQueue<>(); + + for (Map.Entry<String, List<Stage>> threadAndStages : stage.localStages.entrySet()) { + for (Stage locStage : threadAndStages.getValue()) + stagesByTime.add(locStage); + } + + int stageCount = 0; + while (!stagesByTime.isEmpty() && stageCount < maxPerCompositeStage) { + stageCount++; + + Stage locStage = stagesByTime.poll(); + + timings.add(locStage.toString() + " (parent=" + stage.description() + ")"); + } + } + } + + return timings; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * + */ + private class CompositeStage extends Stage { + /** Local stages. */ + private final Map<String, List<Stage>> localStages; + + /** + * @param description Description. + * @param time Time. + * @param localStages Local stages. + */ + public CompositeStage(String description, long time, Map<String, List<Stage>> localStages) { + super(description, time); + + this.localStages = localStages; + } + + /** + * + */ + public Map<String, List<Stage>> localStages() { + return localStages; + } + } + + /** + * + */ + private class Stage implements Comparable<Stage> { + /** Description. */ + private final String description; + + /** Time. */ + private final long time; + + /** + * @param description Description. + * @param time Time. + */ + public Stage(String description, long time) { + this.description = description; + this.time = time; + } + + /** + * + */ + public String description() { + return description; + } + + /** + * + */ + public long time() { + return time; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("stage=").append('"').append(description()).append('"'); + sb.append(' ').append('(').append(time()).append(' ').append(measurementUnitShort()).append(')'); + + return sb.toString(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull TimeBag.Stage o) { + if (o.time > time) + return -1; + if (o.time < time) + return 1; + return o.description.compareTo(description); + } + } +}
