Author: jbellis
Date: Thu Sep 30 16:03:48 2010
New Revision: 1003127

URL: http://svn.apache.org/viewvc?rev=1003127&view=rev
Log:
re-organize JMX into.db, .net, .internal, .request
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1217

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 30 16:03:48 2010
@@ -1,6 +1,7 @@
 dev
  * add strategy options to describe_keyspace output (CASSANDRA-1560)
  * log warning when using randomly generated token (CASSANDRA-1552)
+ * re-organize JMX into .db, .net, .internal, .request (CASSANDRA-1217)
 
 
 0.7-beta2

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
 Thu Sep 30 16:03:48 2010
@@ -29,9 +29,10 @@ public class JMXConfigurableThreadPoolEx
                                                 long keepAliveTime, 
                                                 TimeUnit unit,
                                              BlockingQueue<Runnable> 
workQueue, 
-                                             NamedThreadFactory threadFactory) 
+                                             NamedThreadFactory threadFactory,
+                                             String jmxPath)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, jmxPath);
     }
     
 }
\ No newline at end of file

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 Thu Sep 30 16:03:48 2010
@@ -37,12 +37,17 @@ public class JMXEnabledThreadPoolExecuto
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
"internal");
+    }
+
+    public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
+    {
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
jmxPath);
     }
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority));
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority), "internal");
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -50,13 +55,14 @@ public class JMXEnabledThreadPoolExecuto
                                         long keepAliveTime,
                                         TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue,
-                                        NamedThreadFactory threadFactory)
+                                        NamedThreadFactory threadFactory,
+                                        String jmxPath)
     {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        mbeanName = "org.apache.cassandra.concurrent:type=" + threadFactory.id;
+        mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + 
threadFactory.id;
         try
         {
             mbs.registerMBean(this, new ObjectName(mbeanName));
@@ -69,7 +75,7 @@ public class JMXEnabledThreadPoolExecuto
 
     public JMXEnabledThreadPoolExecutor(Stage stage)
     {
-        this(stage + "_STAGE");
+        this(stage.getJmxName(), stage.getJmxType());
     }
 
     private void unregisterMBean()

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Thu Sep 
30 16:03:48 2010
@@ -28,7 +28,31 @@ public enum Stage
     STREAM,
     GOSSIP,
     RESPONSE,
-    AE_SERVICE,
+    ANTIENTROPY,
     MIGRATION,
-    MISC,
+    MISC;
+
+    public String getJmxType()
+    {
+        switch (this)
+        {
+            case ANTIENTROPY:
+            case GOSSIP:
+            case MIGRATION:
+            case MISC:
+            case STREAM:
+                return "internal";
+            case MUTATION:
+            case READ:
+            case RESPONSE:
+                return "request";
+            default:
+                throw new AssertionError("Unknown stage " + this);
+        }
+    }
+
+    public String getJmxName()
+    {
+        return toString().substring(0, 1) + 
toString().substring(1).toLowerCase() + "Stage";
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
Thu Sep 30 16:03:48 2010
@@ -19,9 +19,6 @@
 package org.apache.cassandra.concurrent;
 
 import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -49,7 +46,7 @@ public class StageManager
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
         stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
-        stages.put(Stage.AE_SERVICE, new 
JMXEnabledThreadPoolExecutor(Stage.AE_SERVICE));
+        stages.put(Stage.ANTIENTROPY, new 
JMXEnabledThreadPoolExecutor(Stage.ANTIENTROPY));
         stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
     }
@@ -65,7 +62,8 @@ public class StageManager
                                                 KEEPALIVE,
                                                 TimeUnit.SECONDS,
                                                 new 
LinkedBlockingQueue<Runnable>(),
-                                                new NamedThreadFactory(stage + 
"_STAGE"));
+                                                new 
NamedThreadFactory(stage.getJmxName()),
+                                                stage.getJmxType());
     }
     
     private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage 
stage, int numThreads)
@@ -77,7 +75,8 @@ public class StageManager
                                                      KEEPALIVE,
                                                      TimeUnit.SECONDS,
                                                      new 
LinkedBlockingQueue<Runnable>(),
-                                                     new 
NamedThreadFactory(stage + "_STAGE"));
+                                                     new 
NamedThreadFactory(stage.getJmxName()),
+                                                     stage.getJmxType());
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu 
Sep 30 16:03:48 2010
@@ -89,15 +89,17 @@ public class ColumnFamilyStore implement
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
-                                               new 
NamedThreadFactory("FLUSH-SORTER-POOL"));
+                                               new 
NamedThreadFactory("FlushSorter"),
+                                               "internal");
     private static final ExecutorService flushWriter
             = new JMXEnabledThreadPoolExecutor(1,
                                                
DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()),
-                                               new 
NamedThreadFactory("FLUSH-WRITER-POOL"));
-    public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+                                               new 
NamedThreadFactory("FlushWriter"),
+                                               "internal");
+    public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
     
     private Set<Memtable> memtablesPendingFlush = new 
ConcurrentSkipListSet<Memtable>();
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
Thu Sep 30 16:03:48 2010
@@ -92,7 +92,7 @@ public class HintedHandOffManager
         int hhPriority = System.getProperty("cassandra.compaction.priority") 
== null
                          ? Thread.NORM_PRIORITY
                          : 
Integer.parseInt(System.getProperty("cassandra.compaction.priority"));
-        executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL", 
hhPriority);
+        executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", 
hhPriority);
     }
 
     private static boolean sendMessage(InetAddress endpoint, String tableName, 
String cfName, byte[] key) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Thu 
Sep 30 16:03:48 2010
@@ -62,7 +62,7 @@ public class FailureDetector implements 
         try
         {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.net:type=FailureDetector"));
         }
         catch (Exception e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 Thu Sep 30 16:03:48 2010
@@ -76,7 +76,7 @@ public class DynamicEndpointSnitch exten
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=DynamicEndpointSnitch"));
         }
         catch (Exception e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java 
Thu Sep 30 16:03:48 2010
@@ -37,7 +37,7 @@ public class EndpointSnitchInfo implemen
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(new EndpointSnitchInfo(), new 
ObjectName("org.apache.cassandra.locator:type=EndpointSnitchInfo"));
+            mbs.registerMBean(new EndpointSnitchInfo(), new 
ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
         }
         catch (Exception e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu 
Sep 30 16:03:48 2010
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -102,7 +103,7 @@ public class MessagingService implements
         callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()));
         taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 
* DatabaseDescriptor.getRpcTimeout()));
 
-        streamExecutor_ = new 
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+        streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", 
DatabaseDescriptor.getCompactionThreadPriority());
         TimerTask logDropped = new TimerTask()
         {
             public void run()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Thu Sep 30 16:03:48 2010
@@ -216,7 +216,7 @@ public class AntiEntropyService
         for (Differencer differencer : differencers)
         {
             logger.info("Queueing comparison " + differencer);
-            StageManager.getStage(Stage.AE_SERVICE).execute(differencer);
+            StageManager.getStage(Stage.ANTIENTROPY).execute(differencer);
         }
     }
 
@@ -406,7 +406,7 @@ public class AntiEntropyService
                 for (MerkleTree.RowHash minrow : minrows)
                     range.addHash(minrow);
 
-            StageManager.getStage(Stage.AE_SERVICE).submit(this);
+            StageManager.getStage(Stage.ANTIENTROPY).submit(this);
             logger.debug("Validated " + validated + " rows into AEService tree 
for " + request);
         }
         

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Thu 
Sep 30 16:03:48 2010
@@ -38,6 +38,8 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import com.google.common.collect.Iterables;
+
 public class GCInspector
 {
     public static final GCInspector instance = new GCInspector();
@@ -149,7 +151,9 @@ public class GCInspector
     {
         // everything from o.a.c.concurrent
         logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", 
"Pending"));
-        for (ObjectName objectName : server.queryNames(new 
ObjectName("org.apache.cassandra.concurrent:type=*"), null))
+        Set<ObjectName> requests = server.queryNames(new 
ObjectName("org.apache.cassandra.request:type=*"), null);
+        Set<ObjectName> internal = server.queryNames(new 
ObjectName("org.apache.cassandra.internal:type=*"), null);
+        for (ObjectName objectName : Iterables.concat(requests, internal))
         {
             String poolName = objectName.getKeyProperty("type");
             IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, 
objectName, IExecutorMBean.class);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java 
Thu Sep 30 16:03:48 2010
@@ -177,10 +177,6 @@ public class StorageLoadBalancer impleme
     private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress, 
Double>();
     /* This map is a clone of the one above and is used for various 
calculations during LB operation */
     private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress, 
Double>();
-    /* This thread pool is used for initiating load balancing operations */
-    private ExecutorService lb_ = new 
JMXEnabledThreadPoolExecutor("LB-OPERATIONS");
-    /* This thread pool is used by target node to leave the ring. */
-    private ExecutorService lbOperations_ = new 
JMXEnabledThreadPoolExecutor("LB-TARGET");
 
     /* Timer is used to disseminate load information */
     private Timer loadTimer_ = new Timer(false);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu 
Sep 30 16:03:48 2010
@@ -72,7 +72,7 @@ public class StorageProxy implements Sto
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(new StorageProxy(), new 
ObjectName("org.apache.cassandra.service:type=StorageProxy"));
+            mbs.registerMBean(new StorageProxy(), new 
ObjectName("org.apache.cassandra.db:type=StorageProxy"));
         }
         catch (Exception e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Thu Sep 30 16:03:48 2010
@@ -123,8 +123,8 @@ public class StorageService implements I
         put(Verb.STREAM_REQUEST, Stage.STREAM);
         put(Verb.RANGE_SLICE, Stage.READ);
         put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
-        put(Verb.TREE_REQUEST, Stage.AE_SERVICE);
-        put(Verb.TREE_RESPONSE, Stage.AE_SERVICE);
+        put(Verb.TREE_REQUEST, Stage.ANTIENTROPY);
+        put(Verb.TREE_RESPONSE, Stage.ANTIENTROPY);
         put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -165,7 +165,8 @@ public class StorageService implements I
                                                                                
    StageManager.KEEPALIVE,
                                                                                
    TimeUnit.SECONDS,
                                                                                
    new LinkedBlockingQueue<Runnable>(),
-                                                                               
    new NamedThreadFactory("CONSISTENCY-MANAGER"));
+                                                                               
    new NamedThreadFactory("ReadRepair"),
+                                                                               
    "request");
 
     /* We use this interface to determine where replicas need to be placed */
     private Map<String, AbstractReplicationStrategy> replicationStrategies;
@@ -206,7 +207,7 @@ public class StorageService implements I
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.service:type=StorageService"));
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=StorageService"));
         }
         catch (Exception e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
Thu Sep 30 16:03:48 2010
@@ -34,7 +34,7 @@ import java.util.Set;
 public class StreamingService implements StreamingServiceMBean
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingService.class);
-    public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.service:type=StreamingService";
+    public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.net:type=StreamingService";
     public static final StreamingService instance = new StreamingService();
 
     private StreamingService()

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Sep 30 
16:03:48 2010
@@ -25,25 +25,17 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.management.MemoryUsage;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.cli.*;
+
 import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
 import org.apache.cassandra.concurrent.IExecutorMBean;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.dht.Token;
-
-import org.apache.commons.cli.*;
+import org.apache.cassandra.net.MessagingServiceMBean;
 
 public class NodeCmd {
     private static final String HOST_OPT_LONG = "host";
@@ -80,7 +72,7 @@ public class NodeCmd {
                 "tpstats, flush, drain, repair, decommission, move, 
loadbalance, removetoken [status|force]|[token], " +
                 "setcachecapacity [keyspace] [cfname] [keycachecapacity] 
[rowcachecapacity], " +
                 "getcompactionthreshold [keyspace] [cfname], 
setcompactionthreshold [cfname] [minthreshold] [maxthreshold], " +
-                "streams [host]");
+                "netstats [host]");
         String usage = String.format("java %s --host <arg> <command>%n", 
NodeCmd.class.getName());
         hf.printHelp(usage, "", options, header);
     }
@@ -188,7 +180,7 @@ public class NodeCmd {
         outs.println("ReleaseVersion: " + probe.getReleaseVersion());
     }
 
-    public void printStreamInfo(final InetAddress addr, PrintStream outs)
+    public void printNetworkStats(final InetAddress addr, PrintStream outs)
     {
         outs.println(String.format("Mode: %s", probe.getOperationMode()));
         Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations() 
: new HashSet<InetAddress>(){{add(addr);}};
@@ -240,6 +232,32 @@ public class NodeCmd {
                 outs.println(String.format("   Error retrieving file data for 
%s", host));
             }
         }
+
+        MessagingServiceMBean ms = probe.getMsProxy();
+        outs.print(String.format("%-25s", "Pool Name"));
+        outs.print(String.format("%10s", "Active"));
+        outs.print(String.format("%10s", "Pending"));
+        outs.print(String.format("%15s", "Completed"));
+        outs.println();
+
+        int pending;
+        long completed;
+
+        pending = 0;
+        for (int n : ms.getCommandPendingTasks().values())
+            pending += n;
+        completed = 0;
+        for (long n : ms.getCommandCompletedTasks().values())
+            completed += n;
+        outs.printf("%-25s%10s%10s%15s\n", "Commands", "n/a", pending, 
completed);
+
+        pending = 0;
+        for (int n : ms.getResponsePendingTasks().values())
+            pending += n;
+        completed = 0;
+        for (long n : ms.getResponseCompletedTasks().values())
+            completed += n;
+        outs.printf("%-25s%10s%10s%15s\n", "Responses", "n/a", pending, 
completed);
     }
     
     public void printColumnFamilyStats(PrintStream outs)
@@ -611,11 +629,11 @@ public class NodeCmd {
             }
             probe.setCompactionThreshold(ks, cf, minthreshold, maxthreshold);
         }
-        else if (cmdName.equals("streams"))
+        else if (cmdName.equals("netstats"))
         {
             // optional host
             String otherHost = arguments.length > 1 ? arguments[1] : null;
-            nodeCmd.printStreamInfo(otherHost == null ? null : 
InetAddress.getByName(otherHost), System.out);
+            nodeCmd.printNetworkStats(otherHost == null ? null : 
InetAddress.getByName(otherHost), System.out);
         }
         else if (cmdName.equals("version"))
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Sep 
30 16:03:48 2010
@@ -37,11 +37,14 @@ import javax.management.remote.JMXConnec
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
 import org.apache.cassandra.concurrent.IExecutorMBean;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamingService;
 import org.apache.cassandra.streaming.StreamingServiceMBean;
@@ -55,7 +58,7 @@ import static com.google.common.base.Cha
 public class NodeProbe
 {
     private static final String fmtUrl = 
"service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
-    private static final String ssObjName = 
"org.apache.cassandra.service:type=StorageService";
+    private static final String ssObjName = 
"org.apache.cassandra.db:type=StorageService";
     private static final int defaultPort = 8080;
     private String host;
     private int port;
@@ -429,6 +432,18 @@ public class NodeProbe
     {
         return ssProxy.exportSchema();
     }
+
+    public MessagingServiceMBean getMsProxy()
+    {
+        try
+        {
+            return JMX.newMBeanProxy(mbeanServerConn, new 
ObjectName("org.apache.cassandra.net:type=MessagingService"), 
MessagingServiceMBean.class);
+        }
+        catch (MalformedObjectNameException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
     
     private ColumnFamilyStoreMBean getCfsProxy(String ks, String cf) {
         ColumnFamilyStoreMBean cfsProxy = null;
@@ -485,8 +500,9 @@ class ThreadPoolProxyMBeanIterator imple
     public ThreadPoolProxyMBeanIterator(MBeanServerConnection mbeanServerConn) 
     throws MalformedObjectNameException, NullPointerException, IOException
     {
-        ObjectName query = new 
ObjectName("org.apache.cassandra.concurrent:type=*");
-        resIter = mbeanServerConn.queryNames(query, null).iterator();
+        Set<ObjectName> requests = mbeanServerConn.queryNames(new 
ObjectName("org.apache.cassandra.request:type=*"), null);
+        Set<ObjectName> internal = mbeanServerConn.queryNames(new 
ObjectName("org.apache.cassandra.internal:type=*"), null);
+        resIter = Iterables.concat(requests, internal).iterator();
         this.mbeanServerConn = mbeanServerConn;
     }
     

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Thu Sep 30 16:03:48 2010
@@ -238,7 +238,7 @@ public class AntiEntropyServiceTest exte
 
     void flushAES() throws Exception
     {
-        final ThreadPoolExecutor stage = 
StageManager.getStage(Stage.AE_SERVICE);
+        final ThreadPoolExecutor stage = 
StageManager.getStage(Stage.ANTIENTROPY);
         final Callable noop = new Callable<Object>()
         {
             public Boolean call()


Reply via email to