Author: jbellis
Date: Thu Sep 30 16:03:48 2010
New Revision: 1003127
URL: http://svn.apache.org/viewvc?rev=1003127&view=rev
Log:
re-organize JMX into.db, .net, .internal, .request
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1217
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 30 16:03:48 2010
@@ -1,6 +1,7 @@
dev
* add strategy options to describe_keyspace output (CASSANDRA-1560)
* log warning when using randomly generated token (CASSANDRA-1552)
+ * re-organize JMX into .db, .net, .internal, .request (CASSANDRA-1217)
0.7-beta2
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
Thu Sep 30 16:03:48 2010
@@ -29,9 +29,10 @@ public class JMXConfigurableThreadPoolEx
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable>
workQueue,
- NamedThreadFactory threadFactory)
+ NamedThreadFactory threadFactory,
+ String jmxPath)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, jmxPath);
}
}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Thu Sep 30 16:03:48 2010
@@ -37,12 +37,17 @@ public class JMXEnabledThreadPoolExecuto
public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
"internal");
+ }
+
+ public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
+ {
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
jmxPath);
}
public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority));
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority), "internal");
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -50,13 +55,14 @@ public class JMXEnabledThreadPoolExecuto
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
- NamedThreadFactory threadFactory)
+ NamedThreadFactory threadFactory,
+ String jmxPath)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
super.prestartAllCoreThreads();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbeanName = "org.apache.cassandra.concurrent:type=" + threadFactory.id;
+ mbeanName = "org.apache.cassandra." + jmxPath + ":type=" +
threadFactory.id;
try
{
mbs.registerMBean(this, new ObjectName(mbeanName));
@@ -69,7 +75,7 @@ public class JMXEnabledThreadPoolExecuto
public JMXEnabledThreadPoolExecutor(Stage stage)
{
- this(stage + "_STAGE");
+ this(stage.getJmxName(), stage.getJmxType());
}
private void unregisterMBean()
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Thu Sep
30 16:03:48 2010
@@ -28,7 +28,31 @@ public enum Stage
STREAM,
GOSSIP,
RESPONSE,
- AE_SERVICE,
+ ANTIENTROPY,
MIGRATION,
- MISC,
+ MISC;
+
+ public String getJmxType()
+ {
+ switch (this)
+ {
+ case ANTIENTROPY:
+ case GOSSIP:
+ case MIGRATION:
+ case MISC:
+ case STREAM:
+ return "internal";
+ case MUTATION:
+ case READ:
+ case RESPONSE:
+ return "request";
+ default:
+ throw new AssertionError("Unknown stage " + this);
+ }
+ }
+
+ public String getJmxName()
+ {
+ return toString().substring(0, 1) +
toString().substring(1).toLowerCase() + "Stage";
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
Thu Sep 30 16:03:48 2010
@@ -19,9 +19,6 @@
package org.apache.cassandra.concurrent;
import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -49,7 +46,7 @@ public class StageManager
// the rest are all single-threaded
stages.put(Stage.STREAM, new
JMXEnabledThreadPoolExecutor(Stage.STREAM));
stages.put(Stage.GOSSIP, new
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
- stages.put(Stage.AE_SERVICE, new
JMXEnabledThreadPoolExecutor(Stage.AE_SERVICE));
+ stages.put(Stage.ANTIENTROPY, new
JMXEnabledThreadPoolExecutor(Stage.ANTIENTROPY));
stages.put(Stage.MIGRATION, new
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
}
@@ -65,7 +62,8 @@ public class StageManager
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory(stage +
"_STAGE"));
+ new
NamedThreadFactory(stage.getJmxName()),
+ stage.getJmxType());
}
private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads)
@@ -77,7 +75,8 @@ public class StageManager
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
- new
NamedThreadFactory(stage + "_STAGE"));
+ new
NamedThreadFactory(stage.getJmxName()),
+ stage.getJmxType());
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Sep 30 16:03:48 2010
@@ -89,15 +89,17 @@ public class ColumnFamilyStore implement
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
- new
NamedThreadFactory("FLUSH-SORTER-POOL"));
+ new
NamedThreadFactory("FlushSorter"),
+ "internal");
private static final ExecutorService flushWriter
= new JMXEnabledThreadPoolExecutor(1,
DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()),
- new
NamedThreadFactory("FLUSH-WRITER-POOL"));
- public static final ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+ new
NamedThreadFactory("FlushWriter"),
+ "internal");
+ public static final ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
private Set<Memtable> memtablesPendingFlush = new
ConcurrentSkipListSet<Memtable>();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Thu Sep 30 16:03:48 2010
@@ -92,7 +92,7 @@ public class HintedHandOffManager
int hhPriority = System.getProperty("cassandra.compaction.priority")
== null
? Thread.NORM_PRIORITY
:
Integer.parseInt(System.getProperty("cassandra.compaction.priority"));
- executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL",
hhPriority);
+ executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff",
hhPriority);
}
private static boolean sendMessage(InetAddress endpoint, String tableName,
String cfName, byte[] key) throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Thu
Sep 30 16:03:48 2010
@@ -62,7 +62,7 @@ public class FailureDetector implements
try
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
+ mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.net:type=FailureDetector"));
}
catch (Exception e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
Thu Sep 30 16:03:48 2010
@@ -76,7 +76,7 @@ public class DynamicEndpointSnitch exten
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+ mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.db:type=DynamicEndpointSnitch"));
}
catch (Exception e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
Thu Sep 30 16:03:48 2010
@@ -37,7 +37,7 @@ public class EndpointSnitchInfo implemen
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(new EndpointSnitchInfo(), new
ObjectName("org.apache.cassandra.locator:type=EndpointSnitchInfo"));
+ mbs.registerMBean(new EndpointSnitchInfo(), new
ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
}
catch (Exception e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu
Sep 30 16:03:48 2010
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -102,7 +103,7 @@ public class MessagingService implements
callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()));
taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1
* DatabaseDescriptor.getRpcTimeout()));
- streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+ streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming",
DatabaseDescriptor.getCompactionThreadPriority());
TimerTask logDropped = new TimerTask()
{
public void run()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Thu Sep 30 16:03:48 2010
@@ -216,7 +216,7 @@ public class AntiEntropyService
for (Differencer differencer : differencers)
{
logger.info("Queueing comparison " + differencer);
- StageManager.getStage(Stage.AE_SERVICE).execute(differencer);
+ StageManager.getStage(Stage.ANTIENTROPY).execute(differencer);
}
}
@@ -406,7 +406,7 @@ public class AntiEntropyService
for (MerkleTree.RowHash minrow : minrows)
range.addHash(minrow);
- StageManager.getStage(Stage.AE_SERVICE).submit(this);
+ StageManager.getStage(Stage.ANTIENTROPY).submit(this);
logger.debug("Validated " + validated + " rows into AEService tree
for " + request);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Thu
Sep 30 16:03:48 2010
@@ -38,6 +38,8 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import com.google.common.collect.Iterables;
+
public class GCInspector
{
public static final GCInspector instance = new GCInspector();
@@ -149,7 +151,9 @@ public class GCInspector
{
// everything from o.a.c.concurrent
logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active",
"Pending"));
- for (ObjectName objectName : server.queryNames(new
ObjectName("org.apache.cassandra.concurrent:type=*"), null))
+ Set<ObjectName> requests = server.queryNames(new
ObjectName("org.apache.cassandra.request:type=*"), null);
+ Set<ObjectName> internal = server.queryNames(new
ObjectName("org.apache.cassandra.internal:type=*"), null);
+ for (ObjectName objectName : Iterables.concat(requests, internal))
{
String poolName = objectName.getKeyProperty("type");
IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server,
objectName, IExecutorMBean.class);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Thu Sep 30 16:03:48 2010
@@ -177,10 +177,6 @@ public class StorageLoadBalancer impleme
private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress,
Double>();
/* This map is a clone of the one above and is used for various
calculations during LB operation */
private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress,
Double>();
- /* This thread pool is used for initiating load balancing operations */
- private ExecutorService lb_ = new
JMXEnabledThreadPoolExecutor("LB-OPERATIONS");
- /* This thread pool is used by target node to leave the ring. */
- private ExecutorService lbOperations_ = new
JMXEnabledThreadPoolExecutor("LB-TARGET");
/* Timer is used to disseminate load information */
private Timer loadTimer_ = new Timer(false);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu
Sep 30 16:03:48 2010
@@ -72,7 +72,7 @@ public class StorageProxy implements Sto
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(new StorageProxy(), new
ObjectName("org.apache.cassandra.service:type=StorageProxy"));
+ mbs.registerMBean(new StorageProxy(), new
ObjectName("org.apache.cassandra.db:type=StorageProxy"));
}
catch (Exception e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Thu Sep 30 16:03:48 2010
@@ -123,8 +123,8 @@ public class StorageService implements I
put(Verb.STREAM_REQUEST, Stage.STREAM);
put(Verb.RANGE_SLICE, Stage.READ);
put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
- put(Verb.TREE_REQUEST, Stage.AE_SERVICE);
- put(Verb.TREE_RESPONSE, Stage.AE_SERVICE);
+ put(Verb.TREE_REQUEST, Stage.ANTIENTROPY);
+ put(Verb.TREE_RESPONSE, Stage.ANTIENTROPY);
put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -165,7 +165,8 @@ public class StorageService implements I
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
-
new NamedThreadFactory("CONSISTENCY-MANAGER"));
+
new NamedThreadFactory("ReadRepair"),
+
"request");
/* We use this interface to determine where replicas need to be placed */
private Map<String, AbstractReplicationStrategy> replicationStrategies;
@@ -206,7 +207,7 @@ public class StorageService implements I
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.service:type=StorageService"));
+ mbs.registerMBean(this, new
ObjectName("org.apache.cassandra.db:type=StorageService"));
}
catch (Exception e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Thu Sep 30 16:03:48 2010
@@ -34,7 +34,7 @@ import java.util.Set;
public class StreamingService implements StreamingServiceMBean
{
private static final Logger logger =
LoggerFactory.getLogger(StreamingService.class);
- public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.service:type=StreamingService";
+ public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.net:type=StreamingService";
public static final StreamingService instance = new StreamingService();
private StreamingService()
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Sep 30
16:03:48 2010
@@ -25,25 +25,17 @@ import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.cli.*;
+
import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
import org.apache.cassandra.concurrent.IExecutorMBean;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.dht.Token;
-
-import org.apache.commons.cli.*;
+import org.apache.cassandra.net.MessagingServiceMBean;
public class NodeCmd {
private static final String HOST_OPT_LONG = "host";
@@ -80,7 +72,7 @@ public class NodeCmd {
"tpstats, flush, drain, repair, decommission, move,
loadbalance, removetoken [status|force]|[token], " +
"setcachecapacity [keyspace] [cfname] [keycachecapacity]
[rowcachecapacity], " +
"getcompactionthreshold [keyspace] [cfname],
setcompactionthreshold [cfname] [minthreshold] [maxthreshold], " +
- "streams [host]");
+ "netstats [host]");
String usage = String.format("java %s --host <arg> <command>%n",
NodeCmd.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -188,7 +180,7 @@ public class NodeCmd {
outs.println("ReleaseVersion: " + probe.getReleaseVersion());
}
- public void printStreamInfo(final InetAddress addr, PrintStream outs)
+ public void printNetworkStats(final InetAddress addr, PrintStream outs)
{
outs.println(String.format("Mode: %s", probe.getOperationMode()));
Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations()
: new HashSet<InetAddress>(){{add(addr);}};
@@ -240,6 +232,32 @@ public class NodeCmd {
outs.println(String.format(" Error retrieving file data for
%s", host));
}
}
+
+ MessagingServiceMBean ms = probe.getMsProxy();
+ outs.print(String.format("%-25s", "Pool Name"));
+ outs.print(String.format("%10s", "Active"));
+ outs.print(String.format("%10s", "Pending"));
+ outs.print(String.format("%15s", "Completed"));
+ outs.println();
+
+ int pending;
+ long completed;
+
+ pending = 0;
+ for (int n : ms.getCommandPendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getCommandCompletedTasks().values())
+ completed += n;
+ outs.printf("%-25s%10s%10s%15s\n", "Commands", "n/a", pending,
completed);
+
+ pending = 0;
+ for (int n : ms.getResponsePendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getResponseCompletedTasks().values())
+ completed += n;
+ outs.printf("%-25s%10s%10s%15s\n", "Responses", "n/a", pending,
completed);
}
public void printColumnFamilyStats(PrintStream outs)
@@ -611,11 +629,11 @@ public class NodeCmd {
}
probe.setCompactionThreshold(ks, cf, minthreshold, maxthreshold);
}
- else if (cmdName.equals("streams"))
+ else if (cmdName.equals("netstats"))
{
// optional host
String otherHost = arguments.length > 1 ? arguments[1] : null;
- nodeCmd.printStreamInfo(otherHost == null ? null :
InetAddress.getByName(otherHost), System.out);
+ nodeCmd.printNetworkStats(otherHost == null ? null :
InetAddress.getByName(otherHost), System.out);
}
else if (cmdName.equals("version"))
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Sep
30 16:03:48 2010
@@ -37,11 +37,14 @@ import javax.management.remote.JMXConnec
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.cache.JMXInstrumentedCacheMBean;
import org.apache.cassandra.concurrent.IExecutorMBean;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.cassandra.streaming.StreamingServiceMBean;
@@ -55,7 +58,7 @@ import static com.google.common.base.Cha
public class NodeProbe
{
private static final String fmtUrl =
"service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
- private static final String ssObjName =
"org.apache.cassandra.service:type=StorageService";
+ private static final String ssObjName =
"org.apache.cassandra.db:type=StorageService";
private static final int defaultPort = 8080;
private String host;
private int port;
@@ -429,6 +432,18 @@ public class NodeProbe
{
return ssProxy.exportSchema();
}
+
+ public MessagingServiceMBean getMsProxy()
+ {
+ try
+ {
+ return JMX.newMBeanProxy(mbeanServerConn, new
ObjectName("org.apache.cassandra.net:type=MessagingService"),
MessagingServiceMBean.class);
+ }
+ catch (MalformedObjectNameException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
private ColumnFamilyStoreMBean getCfsProxy(String ks, String cf) {
ColumnFamilyStoreMBean cfsProxy = null;
@@ -485,8 +500,9 @@ class ThreadPoolProxyMBeanIterator imple
public ThreadPoolProxyMBeanIterator(MBeanServerConnection mbeanServerConn)
throws MalformedObjectNameException, NullPointerException, IOException
{
- ObjectName query = new
ObjectName("org.apache.cassandra.concurrent:type=*");
- resIter = mbeanServerConn.queryNames(query, null).iterator();
+ Set<ObjectName> requests = mbeanServerConn.queryNames(new
ObjectName("org.apache.cassandra.request:type=*"), null);
+ Set<ObjectName> internal = mbeanServerConn.queryNames(new
ObjectName("org.apache.cassandra.internal:type=*"), null);
+ resIter = Iterables.concat(requests, internal).iterator();
this.mbeanServerConn = mbeanServerConn;
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1003127&r1=1003126&r2=1003127&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Thu Sep 30 16:03:48 2010
@@ -238,7 +238,7 @@ public class AntiEntropyServiceTest exte
void flushAES() throws Exception
{
- final ThreadPoolExecutor stage =
StageManager.getStage(Stage.AE_SERVICE);
+ final ThreadPoolExecutor stage =
StageManager.getStage(Stage.ANTIENTROPY);
final Callable noop = new Callable<Object>()
{
public Boolean call()