Separate pool for hint delivery tasks in HintedHandoffManager
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9587cd2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9587cd2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9587cd2 Branch: refs/heads/cassandra-2.2 Commit: c9587cd2bd1ac60d8cd8552592ac16f9c7ddd3b2 Parents: 45bd07f Author: Sam Tunnicliffe <[email protected]> Authored: Tue Aug 4 17:46:15 2015 +0100 Committer: Sam Tunnicliffe <[email protected]> Committed: Fri Aug 7 15:36:40 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../JMXEnabledScheduledThreadPoolExecutor.java | 137 ------------------- ...EnabledScheduledThreadPoolExecutorMBean.java | 26 ---- .../cassandra/db/HintedHandOffManager.java | 25 +++- 4 files changed, 20 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a8cf796..fe060af 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.17 + * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129) * Don't cast expected bf size to an int (CASSANDRA-9959) * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793) * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java deleted file mode 100644 index 64d9267..0000000 --- a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.concurrent; - -import java.lang.management.ManagementFactory; -import java.util.List; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.cassandra.metrics.ThreadPoolMetrics; - -/** - * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor. - */ -public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean -{ - private final String mbeanName; - private final ThreadPoolMetrics metrics; - - public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath) - { - super(corePoolSize, threadFactory); - - metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id); - - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id; - - try - { - mbs.registerMBean(this, new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - private void unregisterMBean() - { - try - { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - // release metrics - metrics.release(); - } - - @Override - public synchronized void shutdown() - { - // synchronized, because there is no way to access super.mainLock, which would be - // the preferred way to make this threadsafe - if (!isShutdown()) - unregisterMBean(); - - super.shutdown(); - } - - @Override - public synchronized List<Runnable> shutdownNow() - { - // synchronized, because there is no way to access super.mainLock, which would be - // the preferred way to make this threadsafe - if (!isShutdown()) - unregisterMBean(); - - return super.shutdownNow(); - } - - /** - * Get the number of completed tasks - */ - public long getCompletedTasks() - { - return getCompletedTaskCount(); - } - - /** - * Get the number of tasks waiting to be executed - */ - public long getPendingTasks() - { - return getTaskCount() - getCompletedTaskCount(); - } - - public int getTotalBlockedTasks() - { - return (int) metrics.totalBlocked.count(); - } - - public int getCurrentlyBlockedTasks() - { - return (int) metrics.currentBlocked.count(); - } - - public int getCoreThreads() - { - return getCorePoolSize(); - } - - public void setCoreThreads(int number) - { - setCorePoolSize(number); - } - - public int getMaximumThreads() - { - return getMaximumPoolSize(); - } - - public void setMaximumThreads(int number) - { - setMaximumPoolSize(number); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java deleted file mode 100644 index d9c45e3..0000000 --- a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.concurrent; - -/** - * @see org.apache.cassandra.metrics.ThreadPoolMetrics - */ -@Deprecated -public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean -{ -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index c8c3845..dab0f75 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -37,7 +37,8 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -56,7 +57,9 @@ import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.*; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@ -102,9 +105,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>(); - private final JMXEnabledScheduledThreadPoolExecutor executor = - new JMXEnabledScheduledThreadPoolExecutor( + // To keep metrics consistent with earlier versions, where periodic tasks were run on a shared executor, + // we run them on this executor and so keep counts separate from those for hint delivery tasks. See CASSANDRA-9129 + private final DebuggableScheduledThreadPoolExecutor executor = + new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HintedHandoffManager", Thread.MIN_PRIORITY)); + + // Non-scheduled executor to run the actual hint delivery tasks. + // Per CASSANDRA-9129, this is where the values displayed in nodetool tpstats + // and via the HintedHandoff mbean are obtained. + private final ThreadPoolExecutor hintDeliveryExecutor = + new JMXEnabledThreadPoolExecutor( DatabaseDescriptor.getMaxHintsThread(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "internal"); @@ -242,7 +256,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } }; executor.submit(runnable).get(); - } @VisibleForTesting @@ -534,7 +547,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean logger.debug("Scheduling delivery of Hints to {}", to); - executor.execute(new Runnable() + hintDeliveryExecutor.execute(new Runnable() { public void run() {
