Author: jbellis
Date: Wed Dec 23 19:18:46 2009
New Revision: 893601
URL: http://svn.apache.org/viewvc?rev=893601&view=rev
Log:
abstract DTPE from JMXETPE. patch by jbellis; reviewed by eevans for
CASSANDRA-599
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
(contents, props changed)
- copied, changed from r893600,
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Wed Dec 23 19:18:46 2009
@@ -1,161 +1,71 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.concurrent;
-
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.concurrent.*;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.log4j.Logger;
-
-/**
- * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It
provides an implementation
- * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class
to log any unexpected
- * Runtime Exceptions.
- */
-
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
implements DebuggableThreadPoolExecutorMBean
-{
- private static Logger logger_ =
Logger.getLogger(DebuggableThreadPoolExecutor.class);
- private final String mbeanName;
-
- public DebuggableThreadPoolExecutor(String threadPoolName)
- {
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
- }
-
- public DebuggableThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- NamedThreadFactory threadFactory)
- {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
- super.prestartAllCoreThreads();
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbeanName = "org.apache.cassandra.concurrent:type=" + threadFactory.id;
- try
- {
- mbs.registerMBean(this, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- if (maximumPoolSize > 1)
- {
- this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
- }
- else
- {
- // preserve task serialization. this is more complicated than it
needs to be,
- // since TPE rejects if queue.offer reports a full queue.
- // the easiest option (since most of TPE.execute deals with
private members)
- // appears to be to wrap the given queue class with one whose offer
- // simply delegates to put(). this would be ugly, since it
violates both
- // the spirit and letter of queue.offer, but effective.
- // so far, though, all our serialized executors use unbounded
queues,
- // so actually implementing this has not been necessary.
- this.setRejectedExecutionHandler(new RejectedExecutionHandler()
- {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor)
- {
- throw new AssertionError("Blocking serialized executor is
not yet implemented");
- }
- });
- }
- }
-
- private void unregisterMBean()
- {
- try
- {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new
ObjectName(mbeanName));
- }
- catch (Exception ex)
- {
- // don't let it get in the way, but notify.
- logger_.error(ex.getMessage(), ex);
- }
- }
-
- @Override
- public void shutdown()
- {
- unregisterMBean();
- super.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow()
- {
- unregisterMBean();
- return super.shutdownNow();
- }
-
- /**
- * Get the number of completed tasks
- */
- public long getCompletedTasks()
- {
- return getCompletedTaskCount();
- }
-
- /**
- * Get the number of tasks waiting to be executed
- */
- public long getPendingTasks()
- {
- return getTaskCount() - getCompletedTaskCount();
- }
-
- public void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r,t);
-
- // exceptions wrapped by FutureTask
- if (r instanceof FutureTask)
- {
- try
- {
- ((FutureTask) r).get();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- logger_.error("Error in executor futuretask", e);
- }
- }
-
- // exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
- if (t != null)
- {
- logger_.error("Error in ThreadPoolExecutor", t);
- }
- }
-
-}
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+{
+ protected static Logger logger =
Logger.getLogger(JMXEnabledThreadPoolExecutor.class);
+
+ public DebuggableThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+ }
+
+ public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+
+ if (maximumPoolSize > 1)
+ {
+ this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
+ }
+ else
+ {
+ // preserve task serialization. this is more complicated than it
needs to be,
+ // since TPE rejects if queue.offer reports a full queue.
+ // the easiest option (since most of TPE.execute deals with
private members)
+ // appears to be to wrap the given queue class with one whose offer
+ // simply delegates to put(). this would be ugly, since it
violates both
+ // the spirit and letter of queue.offer, but effective.
+ // so far, though, all our serialized executors use unbounded
queues,
+ // so actually implementing this has not been necessary.
+ this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+ {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor)
+ {
+ throw new AssertionError("Blocking serialized executor is
not yet implemented");
+ }
+ });
+ }
+ }
+
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+
+ // exceptions wrapped by FutureTask
+ if (r instanceof FutureTask)
+ {
+ try
+ {
+ ((FutureTask) r).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ logger.error("Error in executor futuretask", e);
+ }
+ }
+
+ // exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
+ if (t != null)
+ {
+ logger.error("Error in ThreadPoolExecutor", t);
+ }
+ }
+}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=893601&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Wed Dec 23 19:18:46 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It
provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class
to log any unexpected
+ * Runtime Exceptions.
+ */
+
+public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor
implements JMXEnabledThreadPoolExecutorMBean
+{
+ private final String mbeanName;
+
+ public JMXEnabledThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+ }
+
+ public JMXEnabledThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ NamedThreadFactory threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super.prestartAllCoreThreads();
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbeanName = "org.apache.cassandra.concurrent:type=" + threadFactory.id;
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void unregisterMBean()
+ {
+ try
+ {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new
ObjectName(mbeanName));
+ }
+ catch (Exception ex)
+ {
+ // don't let it get in the way, but notify.
+ logger.error(ex.getMessage(), ex);
+ }
+ }
+
+ @Override
+ public void shutdown()
+ {
+ unregisterMBean();
+ super.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ unregisterMBean();
+ return super.shutdownNow();
+ }
+
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks()
+ {
+ return getCompletedTaskCount();
+ }
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks()
+ {
+ return getTaskCount() - getCompletedTaskCount();
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
(from r893600,
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java&r1=893600&r2=893601&rev=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
Wed Dec 23 19:18:46 2009
@@ -18,6 +18,6 @@
*/
package org.apache.cassandra.concurrent;
-public interface DebuggableThreadPoolExecutorMBean extends IExecutorMBean
+public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
{
}
\ No newline at end of file
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
Wed Dec 23 19:18:46 2009
@@ -30,12 +30,12 @@
public class MultiThreadedStage implements IStage
{
private String name_;
- private DebuggableThreadPoolExecutor executorService_;
+ private JMXEnabledThreadPoolExecutor executorService_;
public MultiThreadedStage(String name, int numThreads)
{
name_ = name;
- executorService_ = new DebuggableThreadPoolExecutor( numThreads,
+ executorService_ = new JMXEnabledThreadPoolExecutor( numThreads,
numThreads,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
Wed Dec 23 19:18:46 2009
@@ -21,10 +21,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.net.*;
/**
* This class is an implementation of the <i>IStage</i> interface. In
particular
@@ -34,12 +32,12 @@
public class SingleThreadedStage implements IStage
{
- protected DebuggableThreadPoolExecutor executorService_;
+ protected JMXEnabledThreadPoolExecutor executorService_;
private String name_;
public SingleThreadedStage(String name)
{
- executorService_ = new DebuggableThreadPoolExecutor(name);
+ executorService_ = new JMXEnabledThreadPoolExecutor(name);
name_ = name;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Dec 23 19:18:46 2009
@@ -36,23 +36,19 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.*;
-import java.net.InetAddress;
+
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.utils.*;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.collections.IteratorUtils;
-import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
-import org.apache.commons.collections.iterators.FilterIterator;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.google.common.collect.Iterators;
@@ -79,20 +75,20 @@
*/
private static NonBlockingHashMap<String, Set<Memtable>>
memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
private static ExecutorService flushSorter_
- = new DebuggableThreadPoolExecutor(1,
+ = new JMXEnabledThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FLUSH-SORTER-POOL"));
private static ExecutorService flushWriter_
- = new
DebuggableThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
+ = new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
DatabaseDescriptor.getAllDataFileLocations().length,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
new
NamedThreadFactory("FLUSH-WRITER-POOL"));
- private static ExecutorService commitLogUpdater_ = new
DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+ private static ExecutorService commitLogUpdater_ = new
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
private static final int KEY_RANGE_FILE_BUFFER_SIZE = 256 * 1024;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 23 19:18:46 2009
@@ -29,7 +29,7 @@
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -45,8 +45,6 @@
import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.commons.collections.PredicateUtils;
-import com.sun.corba.se.impl.logging.POASystemException;
-
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
@@ -70,7 +68,7 @@
}
}
- private ExecutorService compactor_ = new
DebuggableThreadPoolExecutor("COMPACTION-POOL");
+ private ExecutorService compactor_ = new
JMXEnabledThreadPoolExecutor("COMPACTION-POOL");
/**
* Call this whenever a compaction might be needed on the given
columnfamily.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Wed Dec 23 19:18:46 2009
@@ -24,14 +24,13 @@
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.io.IOException;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import java.net.InetAddress;
@@ -79,7 +78,7 @@
private static final Lock lock_ = new ReentrantLock();
private static final Logger logger_ =
Logger.getLogger(HintedHandOffManager.class);
final static long INTERVAL_IN_MS = 3600 * 1000;
- private final ExecutorService executor_ = new
DebuggableThreadPoolExecutor("HINTED-HANDOFF-POOL");
+ private final ExecutorService executor_ = new
JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
final Timer timer = new Timer("HINTED-HANDOFF-TIMER");
public static final String HINTS_CF = "HintsColumnFamily";
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Wed Dec 23 19:18:46 2009
@@ -38,7 +38,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
public class MessagingService
@@ -144,7 +143,7 @@
callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 *
DatabaseDescriptor.getRpcTimeout() );
taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 *
DatabaseDescriptor.getRpcTimeout() );
- messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor(
maxSize,
+ messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor(
maxSize,
maxSize,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
@@ -152,7 +151,7 @@
new NamedThreadFactory("MESSAGING-SERVICE-POOL")
);
- messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor(
maxSize,
+ messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(
maxSize,
maxSize,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
@@ -160,7 +159,7 @@
new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")
);
- streamExecutor_ = new
DebuggableThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+ streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
/* register the response verb handler */
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Wed Dec 23 19:18:46 2009
@@ -25,7 +25,7 @@
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.Token;
@@ -191,9 +191,9 @@
/* This map is a clone of the one above and is used for various
calculations during LB operation */
private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress,
Double>();
/* This thread pool is used for initiating load balancing operations */
- private ExecutorService lb_ = new
DebuggableThreadPoolExecutor("LB-OPERATIONS");
+ private ExecutorService lb_ = new
JMXEnabledThreadPoolExecutor("LB-OPERATIONS");
/* This thread pool is used by target node to leave the ring. */
- private ExecutorService lbOperations_ = new
DebuggableThreadPoolExecutor("LB-TARGET");
+ private ExecutorService lbOperations_ = new
JMXEnabledThreadPoolExecutor("LB-TARGET");
/* Timer is used to disseminate load information */
private Timer loadTimer_ = new Timer(false);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Dec 23 19:18:46 2009
@@ -137,7 +137,7 @@
private SystemTable.StorageMetadata storageMetadata_;
/* This thread pool does consistency checks when the client doesn't care
about consistency */
- private ExecutorService consistencyManager_ = new
DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
+ private ExecutorService consistencyManager_ = new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
DatabaseDescriptor.getConsistencyThreads(),
Integer.MAX_VALUE,
TimeUnit.SECONDS,
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java?rev=893601&r1=893600&r2=893601&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
Wed Dec 23 19:18:46 2009
@@ -23,8 +23,8 @@
import java.util.*;
import java.util.concurrent.ExecutorService;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+
import org.apache.log4j.Logger;
@@ -37,7 +37,7 @@
private static final double gb_ = 1024*1024*1024d;
private static final double tb_ = 1024*1024*1024*1024d;
- private static ExecutorService deleter_ = new
DebuggableThreadPoolExecutor("FILEUTILS-DELETE-POOL");
+ private static ExecutorService deleter_ = new
JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
public static void shutdown()
{