This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to annotated tag org.apache.sling.commons.threads-3.2.0 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git
commit 606bc12b8e7027eec074fc754fe0f38aeaa7d2b4 Author: Justin Edelson <[email protected]> AuthorDate: Thu Oct 18 21:53:20 2012 +0000 SLING-2564 - adding JMX monitoring of Sling Thread Pools git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/commons/threads@1399878 13f79535-47bb-0310-9956-ffa450edef68 --- pom.xml | 3 +- .../threads/impl/DefaultThreadPoolManager.java | 64 +++++++-- .../commons/threads/impl/ThreadPoolMBeanImpl.java | 147 +++++++++++++++++++ .../sling/commons/threads/jmx/ThreadPoolMBean.java | 159 +++++++++++++++++++++ 4 files changed, 364 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index b9e5d2a..3b69b95 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,8 @@ org.apache.sling.commons.threads.impl.Activator </Bundle-Activator> <Export-Package> - org.apache.sling.commons.threads;version=3.2.0 + org.apache.sling.commons.threads;version=3.2.0, + org.apache.sling.commons.threads.jmx;version=1.0.0 </Export-Package> <Private-Package> org.apache.sling.commons.threads.impl diff --git a/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java b/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java index ee86399..331d668 100644 --- a/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java +++ b/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.sling.commons.threads.impl; import java.util.Dictionary; import java.util.HashMap; +import java.util.Hashtable; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; @@ -28,8 +29,10 @@ import org.apache.sling.commons.threads.ThreadPoolConfig; import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPoolPolicy; import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority; import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.commons.threads.jmx.ThreadPoolMBean; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.cm.ConfigurationException; import org.osgi.service.cm.ManagedServiceFactory; import org.slf4j.Logger; @@ -69,12 +72,15 @@ public class DefaultThreadPoolManager public void destroy() { this.logger.debug("Disposing all thread pools"); + final Map<String, Entry> localCopy = new HashMap<String, Entry>(this.pools.size()); synchronized ( this.pools ) { - for (final Entry entry : this.pools.values()) { - entry.shutdown(); - } + localCopy.putAll(this.pools); this.pools.clear(); } + for (final Entry entry : localCopy.values()) { + entry.unregisterMBean(); + entry.shutdown(); + } this.logger.info("Stopped Apache Sling Thread Pool Manager"); } @@ -119,17 +125,22 @@ public class DefaultThreadPoolManager public ThreadPool get(final String name) { final String poolName = (name == null ? DEFAULT_THREADPOOL_NAME : name); Entry entry = null; + boolean created = false; synchronized (this.pools) { entry = this.pools.get(poolName); if ( entry == null ) { this.logger.debug("Creating new pool with name {}", poolName); final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig(); - entry = new Entry(null, config, poolName); + entry = new Entry(null, config, poolName, bundleContext); + created = true; this.pools.put(poolName, entry); } - return entry.incUsage(); } + if (created) { + entry.registerMBean(); + } + return entry.incUsage(); } /** @@ -137,15 +148,20 @@ public class DefaultThreadPoolManager */ public void release(ThreadPool pool) { if ( pool instanceof ThreadPoolFacade ) { + Entry removedEntry = null; synchronized ( this.pools ) { final Entry entry = this.pools.get(pool.getName()); if ( entry != null ) { entry.decUsage(); if ( !entry.isUsed() ) { + removedEntry = entry; this.pools.remove(pool.getName()); } } } + if ( removedEntry != null ) { + removedEntry.unregisterMBean(); + } } } @@ -176,10 +192,11 @@ public class DefaultThreadPoolManager final String name = "ThreadPool-" + UUID.randomUUID().toString() + (label == null ? "" : " (" + label + ")"); - final Entry entry = new Entry(null, config, name); + final Entry entry = new Entry(null, config, name, bundleContext); synchronized ( this.pools ) { this.pools.put(name, entry); } + entry.registerMBean(); return entry.incUsage(); } @@ -210,6 +227,7 @@ public class DefaultThreadPoolManager throw new ConfigurationException(ModifiableThreadPoolConfig.PROPERTY_NAME, "Property is missing or empty."); } this.logger.debug("Updating {} with {}", pid, properties); + Entry createdEntry = null; synchronized ( this.pools ) { final ThreadPoolConfig config = this.createConfig(properties); @@ -241,9 +259,13 @@ public class DefaultThreadPoolManager foundEntry.update(config, name, pid); } else { // create - this.pools.put(name, new Entry(pid, config, name)); + createdEntry = new Entry(pid, config, name, bundleContext); + this.pools.put(name, createdEntry); } } + if ( createdEntry != null ) { + createdEntry.registerMBean(); + } } /** @@ -275,6 +297,8 @@ public class DefaultThreadPoolManager } protected static final class Entry { + private static final Logger logger = LoggerFactory.getLogger(Entry.class); + /** The configuration pid. (might be null for anonymous pools.*/ private volatile String pid; @@ -290,10 +314,15 @@ public class DefaultThreadPoolManager /** The corresponding pool - might be null if unused. */ private volatile ThreadPoolFacade pool; - public Entry(final String pid, final ThreadPoolConfig config, final String name) { + private ServiceRegistration mbeanRegistration; + + private BundleContext bundleContext; + + public Entry(final String pid, final ThreadPoolConfig config, final String name, final BundleContext bundleContext) { this.pid = pid; this.config = config; this.name = name; + this.bundleContext = bundleContext; } public String getPid() { @@ -352,5 +381,24 @@ public class DefaultThreadPoolManager } return null; } + + protected void unregisterMBean() { + if ( this.mbeanRegistration != null ) { + this.mbeanRegistration.unregister(); + this.mbeanRegistration = null; + } + } + + protected void registerMBean() { + try { + final Dictionary<String, String> mbeanProps = new Hashtable<String, String>(); + mbeanProps.put("jmx.objectname", "org.apache.sling:type=threads,service=ThreadPool,name=" + this.name); + + final ThreadPoolMBeanImpl mbean = new ThreadPoolMBeanImpl(this); + this.mbeanRegistration = bundleContext.registerService(ThreadPoolMBean.class.getName(), mbean, mbeanProps); + } catch (Throwable t) { + logger.warn("Unable to register Thread Pool MBean", t); + } + } } } diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java new file mode 100644 index 0000000..fe96b97 --- /dev/null +++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sling.commons.threads.impl; + +import java.util.concurrent.ThreadPoolExecutor; + +import javax.management.NotCompliantMBeanException; +import javax.management.StandardMBean; + +import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager.Entry; +import org.apache.sling.commons.threads.jmx.ThreadPoolMBean; + +class ThreadPoolMBeanImpl extends StandardMBean implements ThreadPoolMBean { + + private final Entry entry; + + ThreadPoolMBeanImpl(Entry entry) throws NotCompliantMBeanException { + super(ThreadPoolMBean.class); + this.entry = entry; + } + + public String getBlockPolicy() { + return this.entry.getConfig().getBlockPolicy().name(); + } + + public int getExecutorActiveCount() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getActiveCount(); + } else { + return -1; + } + } + + public long getExecutorCompletedTaskCount() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getCompletedTaskCount(); + } else { + return -1; + } + } + + public int getExecutorCorePoolSize() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getCorePoolSize(); + } else { + return -1; + } + } + + public int getExecutorLargestPoolSize() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getLargestPoolSize(); + } else { + return -1; + } + } + + public int getExecutorMaximumPoolSize() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getMaximumPoolSize(); + } else { + return -1; + } + } + + public int getExecutorPoolSize() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getPoolSize(); + } else { + return -1; + } + } + + public long getExecutorTaskCount() { + final ThreadPoolExecutor tpe = this.entry.getExecutor(); + if ( tpe != null ) { + return tpe.getTaskCount(); + } else { + return -1; + } + } + + public long getKeepAliveTime() { + return this.entry.getConfig().getKeepAliveTime(); + } + + public int getMaxPoolSize() { + return this.entry.getConfig().getMaxPoolSize(); + } + + public int getMinPoolSize() { + return this.entry.getConfig().getMinPoolSize(); + } + + public String getName() { + return this.entry.getName(); + } + + public String getPid() { + return this.entry.getPid(); + } + + public String getPriority() { + return this.entry.getConfig().getPriority().name(); + } + + public int getQueueSize() { + return this.entry.getConfig().getQueueSize(); + } + + public int getShutdownWaitTimeMs() { + return this.entry.getConfig().getShutdownWaitTimeMs(); + } + + public boolean isDaemon() { + return this.entry.getConfig().isDaemon(); + } + + public boolean isShutdownGraceful() { + return this.entry.getConfig().isShutdownGraceful(); + } + + public boolean isUsed() { + return this.entry.isUsed(); + } + +} diff --git a/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java b/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java new file mode 100644 index 0000000..bdee6b3 --- /dev/null +++ b/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sling.commons.threads.jmx; + +/** + * This is the management interface for a Sling Thread Pool. + */ +public interface ThreadPoolMBean { + + /** + * Retrieve the block policy of the thread pool. + * + * @return the block policy + */ + String getBlockPolicy(); + + /** + * Retrieve the active count from the pool's Executor. + * + * @return the active count or -1 if the thread pool does not have an Executor + */ + int getExecutorActiveCount(); + + /** + * Retrieve the completed task count from the pool's Executor. + * + * @return the completed task count or -1 if the thread pool does not have an Executor + */ + long getExecutorCompletedTaskCount(); + + /** + * Retrieve the core pool size from the pool's Executor. + * + * @return the core pool size or -1 if the thread pool does not have an Executor + */ + int getExecutorCorePoolSize(); + + /** + * Retrieve the largest pool size from the pool's Executor. + * + * @return the largest pool size or -1 if the thread pool does not have an Executor + */ + int getExecutorLargestPoolSize(); + + /** + * Retrieve the maximum pool size from the pool's Executor. + * + * @return the maximum pool size or -1 if the thread pool does not have an Executor + */ + int getExecutorMaximumPoolSize(); + + + /** + * Retrieve the pool size from the pool's Executor. + * + * @return the pool size or -1 if the thread pool does not have an Executor + */ + int getExecutorPoolSize(); + + + /** + * Retrieve the task count from the pool's Executor. + * + * @return the task count or -1 if the thread pool does not have an Executor + */ + long getExecutorTaskCount(); + + /** + * Return the configured keep alive time. + * + * @return The configured keep alive time. + */ + long getKeepAliveTime(); + + /** + * Return the configured maximum pool size. + * + * @return The configured maximum pool size. + */ + int getMaxPoolSize(); + + /** + * Return the minimum pool size. + * + * @return The minimum pool size. + */ + int getMinPoolSize(); + + /** + * Return the name of the thread pool + * + * @return the name + */ + String getName(); + + /** + * Return the configuration pid of the thread pool. + * + * @return the pid + */ + String getPid(); + + /** + * Return the configured priority of the thread pool. + * + * @return the priority + */ + String getPriority(); + + /** + * Return the configured queue size. + * + * @return The configured queue size. + */ + int getQueueSize(); + + /** + * Return the configured shutdown wait time in milliseconds. + * + * @return The configured shutdown wait time. + */ + int getShutdownWaitTimeMs(); + + /** + * Return whether or not the thread pool creates daemon threads. + * + * @return The daemon configuration. + */ + boolean isDaemon(); + + /** + * Return whether or not the thread pool is configured to shutdown gracefully. + * + * @return The graceful shutdown configuration. + */ + boolean isShutdownGraceful(); + + /** + * Return whether or not the thread pool is in use. + * + * @return The used state of the pool. + */ + boolean isUsed(); + +} -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
