Repository: ignite
Updated Branches:
  refs/heads/ignite-6149 0feca3163 -> 5c7f6a5e3


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c7f6a5e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c7f6a5e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c7f6a5e

Branch: refs/heads/ignite-6149
Commit: 5c7f6a5e3bf419130bddac2af4f33db53f9fe657
Parents: 0feca31
Author: sboikov <[email protected]>
Authored: Fri Sep 15 13:14:01 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 15 13:14:01 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../mvcc/CacheCoordinatorsSharedManager.java    | 119 ++++++++++++++++++-
 2 files changed, 117 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5c7f6a5e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 3922b39..f02ae81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1274,6 +1274,9 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                                 log.info(msg);
 
                             
ctx.cache().context().database().dumpStatistics(log);
+
+                            // TODO IGNITE-3478.
+                            
ctx.cache().context().coordinators().dumpStatistics(log);
                         }
                         catch (IgniteClientDisconnectedException ignore) {
                             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c7f6a5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 7b666d2..9273cdd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -98,10 +99,21 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     /** Topology version when local node was assigned as coordinator. */
     private long crdVer;
 
+    /** */
+    private StatCounter[] statCntrs;
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
+        statCntrs = new StatCounter[5];
+
+        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", 
"avgTxs");
+        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", 
"avgFutTime");
+        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", 
"avgFutTime");
+        statCntrs[4] = new StatCounter("TotalRequests");
+
         cctx.gridEvents().addLocalEventListener(new 
CacheCoordinatorDiscoveryListener(),
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
@@ -109,6 +121,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     }
 
     /**
+     * @param log Logger.
+     */
+    public void dumpStatistics(IgniteLogger log) {
+        log.info("Mvcc coordinator statistics: ");
+
+        for (StatCounter cntr : statCntrs)
+            cntr.dumpInfo(log);
+    }
+
+    /**
      * @param tx Transaction.
      * @return Counter.
      */
@@ -296,6 +318,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), 
msg.futureId());
 
+        statCntrs[0].update(res.activeTransactions());
+
         try {
             cctx.gridIO().sendToGridTopic(node,
                 MSG_TOPIC,
@@ -351,11 +375,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryVersionResponse(UUID nodeId, 
MvccCoordinatorVersionResponse msg) {
+    private void processCoordinatorVersionResponse(UUID nodeId, 
MvccCoordinatorVersionResponse msg) {
         MvccVersionFuture fut = verFuts.remove(msg.futureId());
 
-        if (fut != null)
+        if (fut != null) {
+            statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000);
+
             fut.onResponse(msg);
+        }
         else {
             if (cctx.discovery().alive(nodeId))
                 U.warn(log, "Failed to find query version future [node=" + 
nodeId + ", msg=" + msg + ']');
@@ -378,6 +405,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private void processCoordinatorTxAckRequest(UUID nodeId, 
CoordinatorTxAckRequest msg) {
         onTxDone(msg.txId());
 
+        statCntrs[2].update();
+
         if (!msg.skipResponse()) {
             try {
                 cctx.gridIO().sendToGridTopic(nodeId,
@@ -402,8 +431,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private void processCoordinatorAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
         WaitAckFuture fut = ackFuts.remove(msg.futureId());
 
-        if (fut != null)
+        if (fut != null) {
+            statCntrs[3].update((System.nanoTime() - fut.startTime) * 1000);
+
             fut.onResponse();
+        }
         else {
             if (cctx.discovery().alive(nodeId))
                 U.warn(log, "Failed to find tx ack future [node=" + nodeId + 
", msg=" + msg + ']');
@@ -640,6 +672,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         /** */
         public final ClusterNode crd;
 
+        /** */
+        final long startTime = System.nanoTime();
+
         /**
          * @param id Future ID.
          * @param crd Coordinator.
@@ -688,6 +723,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         /** */
         private final ClusterNode crd;
 
+        /** */
+        final long startTime = System.nanoTime();
+
         /**
          * @param id Future ID.
          * @param crd Coordinator.
@@ -748,6 +786,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private class CoordinatorMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
         @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            statCntrs[4].update();
+
             MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
 
             if (msg0.waitForCoordinatorInit()) {
@@ -777,7 +817,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, 
(CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)
-                processCoordinatorQueryVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
+                processCoordinatorVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
             else if (msg instanceof CoordinatorWaitTxsRequest)
                 processCoordinatorWaitTxsRequest(nodeId, 
(CoordinatorWaitTxsRequest)msg);
             else
@@ -789,4 +829,73 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             return "CoordinatorMessageListener[]";
         }
     }
+    /**
+     *
+     */
+    static class StatCounter {
+        /** */
+        final String name;
+
+        /** */
+        final LongAdder8 cntr = new LongAdder8();
+
+        public StatCounter(String name) {
+            this.name = name;
+        }
+
+        void update() {
+            cntr.increment();
+        }
+
+        void update(GridLongList arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void update(long arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    static class CounterWithAvg extends StatCounter {
+        /** */
+        final LongAdder8 total = new LongAdder8();
+
+        /** */
+        final String avgName;
+
+        CounterWithAvg(String name, String avgName) {
+            super(name);
+
+            this.avgName = avgName;
+        }
+
+        @Override void update(GridLongList arg) {
+            update(arg != null ? arg.size() : 0);
+        }
+
+        @Override void update(long add) {
+            cntr.increment();
+
+            total.add(add);
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+            long totalSum = total.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + 
((float)totalSum / totalCnt) + ']');
+        }
+    }
+
 }

Reply via email to