Author: jbellis
Date: Sun Sep 26 21:53:40 2010
New Revision: 1001530
URL: http://svn.apache.org/viewvc?rev=1001530&view=rev
Log:
JMX MessagingService pending and completed counts
patch by Nirmal Ranganathan; reviewed by jbellis for CASSANDRA-1533
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Sep 26 21:53:40 2010
@@ -92,6 +92,7 @@
* add describe_snitch to Thrift API (CASSANDRA-1490)
* MD5 authenticator compares plain text submitted password with MD5'd
saved property, instead of vice versa (CASSANDRA-1447)
+ * JMX MessagingService pending and completed counts (CASSANDRA-1533)
0.7-beta1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Sun
Sep 26 21:53:40 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
import java.io.IOError;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -28,13 +29,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import org.apache.cassandra.utils.GuidGe
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-public class MessagingService
+public class MessagingService implements MessagingServiceMBean
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
@@ -111,6 +111,16 @@ public class MessagingService
};
Timer timer = new Timer("DroppedMessagesLogger");
timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.concurrent:type=MESSAGING-SERVICE-POOL"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
public byte[] hash(String type, byte data[])
@@ -541,4 +551,36 @@ public class MessagingService
server.close();
}
}
+
+ public Map<String, Integer> getCommandPendingTasks()
+ {
+ Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry :
connectionManagers_.entrySet())
+ pendingTasks.put(entry.getKey().getHostAddress(),
entry.getValue().cmdCon.getPendingMessages());
+ return pendingTasks;
+ }
+
+ public Map<String, Long> getCommandCompletedTasks()
+ {
+ Map<String, Long> completedTasks = new HashMap<String, Long>();
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry :
connectionManagers_.entrySet())
+ completedTasks.put(entry.getKey().getHostAddress(),
entry.getValue().cmdCon.getCompletedMesssages());
+ return completedTasks;
+ }
+
+ public Map<String, Integer> getResponsePendingTasks()
+ {
+ Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry :
connectionManagers_.entrySet())
+ pendingTasks.put(entry.getKey().getHostAddress(),
entry.getValue().ackCon.getPendingMessages());
+ return pendingTasks;
+ }
+
+ public Map<String, Long> getResponseCompletedTasks()
+ {
+ Map<String, Long> completedTasks = new HashMap<String, Long>();
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry :
connectionManagers_.entrySet())
+ completedTasks.put(entry.getKey().getHostAddress(),
entry.getValue().ackCon.getCompletedMesssages());
+ return completedTasks;
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Sun Sep 26 21:53:40 2010
@@ -46,6 +46,7 @@ public class OutboundTcpConnection exten
private final BlockingQueue<ByteBuffer> queue = new
LinkedBlockingQueue<ByteBuffer>();
private DataOutputStream output;
private Socket socket;
+ private long completedCount;
public OutboundTcpConnection(InetAddress remoteEp)
{
@@ -89,6 +90,16 @@ public class OutboundTcpConnection exten
}
}
+ public int getPendingMessages()
+ {
+ return queue.size();
+ }
+
+ public long getCompletedMesssages()
+ {
+ return completedCount;
+ }
+
private void writeConnected(ByteBuffer bb)
{
try
@@ -130,6 +141,7 @@ public class OutboundTcpConnection exten
try
{
bb = queue.take();
+ completedCount++;
}
catch (InterruptedException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1001530&r1=1001529&r2=1001530&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Sun Sep 26 21:53:40 2010
@@ -24,8 +24,8 @@ import org.apache.cassandra.concurrent.S
class OutboundTcpConnectionPool
{
- private final OutboundTcpConnection cmdCon;
- private final OutboundTcpConnection ackCon;
+ public final OutboundTcpConnection cmdCon;
+ public final OutboundTcpConnection ackCon;
OutboundTcpConnectionPool(InetAddress remoteEp)
{