Repository: ignite Updated Branches: refs/heads/ignite-6149 0feca3163 -> 5c7f6a5e3
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c7f6a5e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c7f6a5e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c7f6a5e Branch: refs/heads/ignite-6149 Commit: 5c7f6a5e3bf419130bddac2af4f33db53f9fe657 Parents: 0feca31 Author: sboikov <[email protected]> Authored: Fri Sep 15 13:14:01 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 15 13:14:01 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 3 + .../mvcc/CacheCoordinatorsSharedManager.java | 119 ++++++++++++++++++- 2 files changed, 117 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5c7f6a5e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 3922b39..f02ae81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1274,6 +1274,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { log.info(msg); ctx.cache().context().database().dumpStatistics(log); + + // TODO IGNITE-3478. + ctx.cache().context().coordinators().dumpStatistics(log); } catch (IgniteClientDisconnectedException ignore) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/5c7f6a5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 7b666d2..9273cdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -32,7 +33,6 @@ import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -49,6 +49,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; +import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -98,10 +99,21 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** Topology version when local node was assigned as coordinator. */ private long crdVer; + /** */ + private StatCounter[] statCntrs; + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { super.start0(); + statCntrs = new StatCounter[5]; + + statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); + statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); + statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); + statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); + statCntrs[4] = new StatCounter("TotalRequests"); + cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); @@ -109,6 +121,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log) { + log.info("Mvcc coordinator statistics: "); + + for (StatCounter cntr : statCntrs) + cntr.dumpInfo(log); + } + + /** * @param tx Transaction. * @return Counter. */ @@ -296,6 +318,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); + statCntrs[0].update(res.activeTransactions()); + try { cctx.gridIO().sendToGridTopic(node, MSG_TOPIC, @@ -351,11 +375,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param nodeId Sender node ID. * @param msg Message. */ - private void processCoordinatorQueryVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { + private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { MvccVersionFuture fut = verFuts.remove(msg.futureId()); - if (fut != null) + if (fut != null) { + statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000); + fut.onResponse(msg); + } else { if (cctx.discovery().alive(nodeId)) U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); @@ -378,6 +405,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { onTxDone(msg.txId()); + statCntrs[2].update(); + if (!msg.skipResponse()) { try { cctx.gridIO().sendToGridTopic(nodeId, @@ -402,8 +431,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { WaitAckFuture fut = ackFuts.remove(msg.futureId()); - if (fut != null) + if (fut != null) { + statCntrs[3].update((System.nanoTime() - fut.startTime) * 1000); + fut.onResponse(); + } else { if (cctx.discovery().alive(nodeId)) U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); @@ -640,6 +672,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** */ public final ClusterNode crd; + /** */ + final long startTime = System.nanoTime(); + /** * @param id Future ID. * @param crd Coordinator. @@ -688,6 +723,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** */ private final ClusterNode crd; + /** */ + final long startTime = System.nanoTime(); + /** * @param id Future ID. * @param crd Coordinator. @@ -748,6 +786,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private class CoordinatorMessageListener implements GridMessageListener { /** {@inheritDoc} */ @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + statCntrs[4].update(); + MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg; if (msg0.waitForCoordinatorInit()) { @@ -777,7 +817,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager else if (msg instanceof CoordinatorQueryVersionRequest) processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); else if (msg instanceof MvccCoordinatorVersionResponse) - processCoordinatorQueryVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); + processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); else if (msg instanceof CoordinatorWaitTxsRequest) processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); else @@ -789,4 +829,73 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager return "CoordinatorMessageListener[]"; } } + /** + * + */ + static class StatCounter { + /** */ + final String name; + + /** */ + final LongAdder8 cntr = new LongAdder8(); + + public StatCounter(String name) { + this.name = name; + } + + void update() { + cntr.increment(); + } + + void update(GridLongList arg) { + throw new UnsupportedOperationException(); + } + + void update(long arg) { + throw new UnsupportedOperationException(); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ']'); + } + } + + /** + * + */ + static class CounterWithAvg extends StatCounter { + /** */ + final LongAdder8 total = new LongAdder8(); + + /** */ + final String avgName; + + CounterWithAvg(String name, String avgName) { + super(name); + + this.avgName = avgName; + } + + @Override void update(GridLongList arg) { + update(arg != null ? arg.size() : 0); + } + + @Override void update(long add) { + cntr.increment(); + + total.add(add); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + long totalSum = total.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']'); + } + } + }
