Author: jbellis
Date: Sun Sep 26 21:53:40 2010
New Revision: 1001530

URL: http://svn.apache.org/viewvc?rev=1001530&view=rev
Log:
JMX MessagingService pending and completed counts
patch by Nirmal Ranganathan; reviewed by jbellis for CASSANDRA-1533

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Sep 26 21:53:40 2010
@@ -92,6 +92,7 @@
  * add describe_snitch to Thrift API (CASSANDRA-1490)
  * MD5 authenticator compares plain text submitted password with MD5'd
    saved property, instead of vice versa (CASSANDRA-1447)
+ * JMX MessagingService pending and completed counts (CASSANDRA-1533)
 
 
 0.7-beta1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Sun 
Sep 26 21:53:40 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 
 import java.io.IOError;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -28,13 +29,12 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import org.apache.cassandra.utils.GuidGe
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-public class MessagingService
+public class MessagingService implements MessagingServiceMBean
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
@@ -111,6 +111,16 @@ public class MessagingService
         };
         Timer timer = new Timer("DroppedMessagesLogger");
         timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, 
LOG_DROPPED_INTERVAL_IN_MS);
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.concurrent:type=MESSAGING-SERVICE-POOL"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public byte[] hash(String type, byte data[])
@@ -541,4 +551,36 @@ public class MessagingService
             server.close();
         }
     }
+
+    public Map<String, Integer> getCommandPendingTasks()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers_.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().cmdCon.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getCommandCompletedTasks()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, Long>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers_.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().cmdCon.getCompletedMesssages());
+        return completedTasks;
+    }
+
+    public Map<String, Integer> getResponsePendingTasks()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers_.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().ackCon.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getResponseCompletedTasks()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, Long>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers_.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().ackCon.getCompletedMesssages());
+        return completedTasks;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
Sun Sep 26 21:53:40 2010
@@ -46,6 +46,7 @@ public class OutboundTcpConnection exten
     private final BlockingQueue<ByteBuffer> queue = new 
LinkedBlockingQueue<ByteBuffer>();
     private DataOutputStream output;
     private Socket socket;
+    private long completedCount;
 
     public OutboundTcpConnection(InetAddress remoteEp)
     {
@@ -89,6 +90,16 @@ public class OutboundTcpConnection exten
         }
     }
 
+    public int getPendingMessages()
+    {
+        return queue.size();
+    }
+
+    public long getCompletedMesssages()
+    {
+        return completedCount;
+    }
+
     private void writeConnected(ByteBuffer bb)
     {
         try
@@ -130,6 +141,7 @@ public class OutboundTcpConnection exten
         try
         {
             bb = queue.take();
+            completedCount++;
         }
         catch (InterruptedException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 Sun Sep 26 21:53:40 2010
@@ -24,8 +24,8 @@ import org.apache.cassandra.concurrent.S
 
 class OutboundTcpConnectionPool
 {
-    private final OutboundTcpConnection cmdCon;
-    private final OutboundTcpConnection ackCon;
+    public final OutboundTcpConnection cmdCon;
+    public final OutboundTcpConnection ackCon;
 
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {


Reply via email to