exchange time info
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6bcaf5d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6bcaf5d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6bcaf5d Branch: refs/heads/ignite-2.1.2-exchange Commit: e6bcaf5dda92f49feb791d06c94748eef7a10695 Parents: b544678 Author: sboikov <[email protected]> Authored: Tue Jun 20 08:59:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Jun 20 08:59:36 2017 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 110 ++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e6bcaf5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 230a25c..0e05aea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -18,9 +18,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -31,6 +34,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -75,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -201,6 +206,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private boolean clientOnlyExchange; + /** */ + @GridToStringExclude + private long createTs; + + /** */ + @GridToStringExclude + private long initTime; + + /** */ + private long sndTs; + + /** */ + private long rcvTs; + /** Init timestamp. Used to track the amount of time spent to complete the future. */ private long initTs; @@ -311,6 +330,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.topologyVersion() != null; assert exchActions == null || !exchActions.empty(); + createTs = U.currentTimeMillis(); + dummy = false; forcePreload = false; reassign = false; @@ -592,6 +613,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte updateTopologies(crdNode); + initTime = U.currentTimeMillis() - initTs; + switch (exchange) { case ALL: { distributedExchange(); @@ -889,8 +912,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (remaining.isEmpty()) onAllReceived(); } - else + else { + sndTs = U.currentTimeMillis(); + sendPartitions(crd); + } initDone(); } @@ -1177,6 +1203,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert !nodes.contains(cctx.localNode()); + sndTs = U.currentTimeMillis(); + if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -1211,6 +1239,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode()); } + /** */ + static ThreadLocal<DateFormat> format = new ThreadLocal<DateFormat>() { + @Override protected DateFormat initialValue() { + return new SimpleDateFormat("HH:mm:ss.SSS"); + } + }; + + /** + * @param time Time. + * @return Time string. + */ + private static String formatTime(long time) { + return format.get().format(new Date(time)); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { boolean realExchange = !dummy && !forcePreload; @@ -1306,6 +1349,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (super.onDone(res, err) && realExchange) { + boolean crd; + + synchronized (this) { + crd = this.crd != null && this.crd.isLocal(); + } + + long curTime = U.currentTimeMillis(); + + if (!crd) { + exchLog.info("Completed exchange [topVer=" + topologyVersion() + + ", crd=false" + + ", futCreateTs=" + formatTime(createTs) + + ", futInitTs=" + formatTime(initTs) + + ", initWaitTime=" + (initTs - createTs) + + ", initDuration=" + initTime + + ", sndTs=" + formatTime(sndTs) + + ", rcvTs=" + formatTime(rcvTs) + + ", rcvWaitTime=" + (rcvTs - sndTs) + + ", endTs=" + formatTime(curTime) + + ", durationFromInit=" + (curTime - initTs) + + ", durationFromCreate=" + (curTime - createTs) + + ']'); + } + else { + int rcvd = cnt.get(); + + exchLog.info("Completed exchange [topVer=" + topologyVersion() + + ", crd=true" + + ", futCreateTs=" + formatTime(createTs) + + ", futInitTs=" + formatTime(initTs) + + ", initWaitTime=" + (initTs - createTs) + + ", initDuration=" + initTime + + ", rcvdCnt=" + rcvd + + ", beforeInitCnt=" + beforeInitCnt.get() + + ", minProcTime=" + minTime.get() + + ", maxProcTime=" + maxTime.get() + + ", avgProcTime=" + (rcvd > 0 ? (totTime.get()/ (float)rcvd) : 0) + + ", sndTs=" + formatTime(sndTs) + + ", endTs=" + formatTime(curTime) + + ", durationFromInit=" + (curTime - initTs) + + ", durationFromCreate=" + (curTime - createTs) + + ']'); + } + if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); @@ -1397,6 +1484,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount()); } else { + if (!initFut.isDone()) + beforeInitCnt.incrementAndGet(); + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> f) { try { @@ -1415,6 +1505,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + private AtomicInteger beforeInitCnt = new AtomicInteger(); + private AtomicInteger cnt = new AtomicInteger(); + private GridAtomicLong minTime = new GridAtomicLong(Long.MAX_VALUE); + private GridAtomicLong maxTime = new GridAtomicLong(); + private GridAtomicLong totTime = new GridAtomicLong(); + /** * @param node Sender node. * @param msg Message. @@ -1444,7 +1540,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (updateSingleMap) { try { + cnt.incrementAndGet(); + + long start = U.currentTimeMillis(); + updatePartitionSingleMap(node, msg); + + long time = U.currentTimeMillis() - start; + + minTime.setIfLess(time); + maxTime.setIfGreater(time); + totTime.addAndGet(time); } finally { synchronized (this) { @@ -1903,6 +2009,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + rcvTs = U.currentTimeMillis(); + updatePartitionFullMap(msg); if (exchangeOnChangeGlobalState && !F.isEmpty(msg.getExceptionsMap()))
