This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to annotated tag org.apache.sling.commons.threads-2.0.2-incubator in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git
commit a5761c4442ccf6e02c504b32d03d5826f7fd60ef Author: Carsten Ziegeler <[email protected]> AuthorDate: Mon Feb 18 17:13:36 2008 +0000 Use thread pool configuration object to be extensible. git-svn-id: https://svn.apache.org/repos/asf/incubator/sling/trunk/sling/threads@628820 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/sling/threads/ThreadPool.java | 2 +- .../org/apache/sling/threads/ThreadPoolConfig.java | 202 +++++++++++++++++++++ .../apache/sling/threads/ThreadPoolManager.java | 26 +-- .../sling/threads/impl/DefaultThreadPool.java | 78 ++++---- .../threads/impl/DefaultThreadPoolManager.java | 65 +------ .../sling/threads/impl/ExtendedThreadFactory.java | 30 +-- 6 files changed, 263 insertions(+), 140 deletions(-) diff --git a/src/main/java/org/apache/sling/threads/ThreadPool.java b/src/main/java/org/apache/sling/threads/ThreadPool.java index c849ed5..db8eb0b 100644 --- a/src/main/java/org/apache/sling/threads/ThreadPool.java +++ b/src/main/java/org/apache/sling/threads/ThreadPool.java @@ -40,5 +40,5 @@ public interface ThreadPool { */ void shutdown(); - int getMaxPoolSize(); + ThreadPoolConfig getConfiguration(); } diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java new file mode 100644 index 0000000..a5792db --- /dev/null +++ b/src/main/java/org/apache/sling/threads/ThreadPoolConfig.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sling.threads; + +import java.util.concurrent.ThreadFactory; + +/** + * The ThreadPool configuration. + * + * @version $Id$ + */ +public final class ThreadPoolConfig { + + /** The thread pool policies. */ + public enum ThreadPoolPolicy { + ABORT, + DISCARD, + DISCARDOLDEST, + RUN + }; + + public enum ThreadPriority { + NORM, + MIN, + MAX + }; + + /** The min pool size. */ + private int minPoolSize = 5; + + /** The max pool size. */ + private int maxPoolSize = 5; + + /** The queue size */ + private int queueSize = -1; + + /** The keep alive time. */ + private long keepAliveTime = 60000L; + + /** The thread pool policy. Default is RUN. */ + private ThreadPoolPolicy blockPolicy = ThreadPoolPolicy.RUN; + + private boolean shutdownGraceful = false; + + private int shutdownWaitTimeMs = -1; + + private ThreadFactory factory; + + private ThreadPriority priority = ThreadPriority.NORM; + + private boolean isDaemon = false; + + /** Can this configuration still be changed? */ + private boolean isWritable = true; + + /** + * Create a new default configuration. + */ + public ThreadPoolConfig() { + // nothing to do + } + + /** + * Clone an existing configuration + * @param copy The config to clone + */ + public ThreadPoolConfig(ThreadPoolConfig copy) { + this.minPoolSize = copy.minPoolSize; + this.maxPoolSize = copy.maxPoolSize; + this.queueSize = copy.queueSize; + this.keepAliveTime = copy.keepAliveTime; + this.blockPolicy = copy.blockPolicy; + this.shutdownGraceful = copy.shutdownGraceful; + this.shutdownWaitTimeMs = copy.shutdownWaitTimeMs; + this.factory = copy.factory; + this.priority = copy.priority; + this.isDaemon = copy.isDaemon; + } + + protected void checkWritable() { + if ( !isWritable ) { + throw new IllegalStateException("ThreadPoolConfig is read-only."); + } + } + + /** + * Make the configuration read-only. + */ + public void makeReadOnly() { + this.isWritable = false; + } + + public int getMinPoolSize() { + return minPoolSize; + } + + public void setMinPoolSize(int minPoolSize) { + this.checkWritable(); + this.minPoolSize = minPoolSize; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public void setMaxPoolSize(int maxPoolSize) { + this.checkWritable(); + this.maxPoolSize = maxPoolSize; + } + + public int getQueueSize() { + return queueSize; + } + + public void setQueueSize(int queueSize) { + this.checkWritable(); + this.queueSize = queueSize; + } + + public long getKeepAliveTime() { + return keepAliveTime; + } + + public void setKeepAliveTime(long keepAliveTime) { + this.checkWritable(); + this.keepAliveTime = keepAliveTime; + } + + public ThreadPoolPolicy getBlockPolicy() { + return blockPolicy; + } + + public void setBlockPolicy(ThreadPoolPolicy blockPolicy) { + this.checkWritable(); + this.blockPolicy = blockPolicy; + if ( blockPolicy == null ) { + throw new IllegalArgumentException("Policy must not be null."); + } + } + + public boolean isShutdownGraceful() { + return shutdownGraceful; + } + + public void setShutdownGraceful(boolean shutdownGraceful) { + this.checkWritable(); + this.shutdownGraceful = shutdownGraceful; + } + + public int getShutdownWaitTimeMs() { + return shutdownWaitTimeMs; + } + + public void setShutdownWaitTimeMs(int shutdownWaitTimeMs) { + this.checkWritable(); + this.shutdownWaitTimeMs = shutdownWaitTimeMs; + } + + public ThreadFactory getFactory() { + return factory; + } + + public void setFactory(ThreadFactory factory) { + this.checkWritable(); + this.factory = factory; + } + + public ThreadPriority getPriority() { + return priority; + } + + public void setPriority(ThreadPriority priority) { + this.checkWritable(); + if ( priority == null ) { + throw new IllegalArgumentException("Priority must not be null."); + } + this.priority = priority; + } + + public boolean isDaemon() { + return isDaemon; + } + + public void setDaemon(boolean isDaemon) { + this.checkWritable(); + this.isDaemon = isDaemon; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java index 62610b4..db7c4e6 100644 --- a/src/main/java/org/apache/sling/threads/ThreadPoolManager.java +++ b/src/main/java/org/apache/sling/threads/ThreadPoolManager.java @@ -16,7 +16,6 @@ */ package org.apache.sling.threads; -import java.util.concurrent.ThreadFactory; /** * The <cod>ThreadPoolManager</code> manages thread pools. @@ -28,17 +27,6 @@ public interface ThreadPoolManager { /** The default thread pool name */ String DEFAULT_THREADPOOL_NAME = "default"; - /** The thread pool policies. */ - enum ThreadPoolPolicy { - ABORT, - DISCARD, - DISCARDOLDEST, - RUN - }; - - /** The default policy */ - ThreadPoolPolicy DEFAULT_BLOCK_POLICY = ThreadPoolPolicy.RUN; - /** * Add a new pool. * If a pool with the same name already exists, the new pool is not added @@ -61,18 +49,8 @@ public interface ThreadPoolManager { * If a pool with the same name already exists, no new pool is created * and <code>null</code> is returned. * @param name Name must not be null. - * @param blockPolicy The thread pool policy or null for the default. - * @param factory A thread factory or null for the default favtory. + * @param config The thread pool configuration. */ ThreadPool create(String name, - int minPoolSize, - int maxPoolSize, - final int queueSize, - long keepAliveTime, - ThreadPoolPolicy blockPolicy, - final boolean shutdownGraceful, - final int shutdownWaitTimeMs, - final ThreadFactory factory, - final int priority, - final boolean isDaemon); + ThreadPoolConfig config); } diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java index 37289bd..d66d810 100644 --- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java +++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPool.java @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.sling.threads.ThreadPool; +import org.apache.sling.threads.ThreadPoolConfig; import org.apache.sling.threads.ThreadPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +50,7 @@ public class DefaultThreadPool /** The executor. */ protected ThreadPoolExecutor executor; - /** Should we wait for running jobs to terminate on shutdown ? */ - protected final boolean shutdownGraceful; - - /** How long to wait for running jobs to terminate on disposition */ - protected final int shutdownWaitTimeMs; + protected final ThreadPoolConfig configuration; /** * Create a new thread pool. @@ -61,16 +58,7 @@ public class DefaultThreadPool * is used */ public DefaultThreadPool(final String name, - int minPoolSize, - int maxPoolSize, - final int queueSize, - long keepAliveTime, - ThreadPoolManager.ThreadPoolPolicy blockPolicy, - final boolean shutdownGraceful, - final int shutdownWaitTimeMs, - final ThreadFactory factory, - final int priority, - final boolean isDaemon) { + ThreadPoolConfig origConfig) { this.logger.info("ThreadPool [{}] initializing ...", name); // name @@ -80,42 +68,41 @@ public class DefaultThreadPool this.name = DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME; } + this.configuration = new ThreadPoolConfig(origConfig); + // factory final ThreadFactory delegateThreadFactory; - if (factory == null) { + if (this.configuration.getFactory() == null) { logger.warn("No ThreadFactory is configured. Will use JVM default thread factory." + ExtendedThreadFactory.class.getName()); delegateThreadFactory = Executors.defaultThreadFactory(); } else { - delegateThreadFactory = factory; + delegateThreadFactory = this.configuration.getFactory(); } // Min pool size - // make sure we have enough threads for the default thread pool as we - // need one for ourself - if (DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME.equals(name) - && ((minPoolSize > 0) && (minPoolSize < DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE))) { - minPoolSize = DefaultThreadPoolManager.DEFAULT_MIN_POOL_SIZE; - } else if (minPoolSize < 1) { - minPoolSize = 1; + if (this.configuration.getMinPoolSize() < 1) { + this.configuration.setMinPoolSize(1); this.logger.warn("min-pool-size < 1 for pool \"" + name + "\". Set to 1"); } // Max pool size - maxPoolSize = (maxPoolSize < 0) ? Integer.MAX_VALUE : maxPoolSize; + if ( this.configuration.getMaxPoolSize() < 0 ) { + this.configuration.setMaxPoolSize(Integer.MAX_VALUE); + } // Set priority and daemon flag - final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, priority, isDaemon); + final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(delegateThreadFactory, this.configuration.getPriority(), this.configuration.isDaemon()); // Keep alive time - if (keepAliveTime < 0) { - keepAliveTime = 1000; + if (this.configuration.getKeepAliveTime() < 0) { + this.configuration.setKeepAliveTime(1000); this.logger.warn("keep-alive-time-ms < 0 for pool \"" + name + "\". Set to 1000"); } // Queue final BlockingQueue<Runnable> queue; - if (queueSize != 0) { - if (queueSize > 0) { - queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueSize); + if (this.configuration.getQueueSize() != 0) { + if (this.configuration.getQueueSize() > 0) { + queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize()); } else { queue = new LinkedBlockingQueue<Runnable>(); } @@ -123,11 +110,8 @@ public class DefaultThreadPool queue = new SynchronousQueue<Runnable>(); } - if ( blockPolicy == null ) { - blockPolicy = DefaultThreadPoolManager.DEFAULT_BLOCK_POLICY; - } RejectedExecutionHandler handler = null; - switch (blockPolicy) { + switch (this.configuration.getBlockPolicy()) { case ABORT : handler = new ThreadPoolExecutor.AbortPolicy(); break; @@ -141,15 +125,14 @@ public class DefaultThreadPool handler = new ThreadPoolExecutor.AbortPolicy(); break; } - this.shutdownGraceful = shutdownGraceful; - this.shutdownWaitTimeMs = shutdownWaitTimeMs; - this.executor = new ThreadPoolExecutor(minPoolSize, - maxPoolSize, - keepAliveTime, + this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(), + this.configuration.getMaxPoolSize(), + this.configuration.getKeepAliveTime(), TimeUnit.MILLISECONDS, queue, threadFactory, handler); + this.configuration.makeReadOnly(); this.logger.info("ThreadPool [{}] initialized.", name); } @@ -160,11 +143,12 @@ public class DefaultThreadPool return name; } + /** - * @see org.apache.sling.threads.ThreadPool#getMaxPoolSize() + * @see org.apache.sling.threads.ThreadPool#getConfiguration() */ - public int getMaxPoolSize() { - return this.executor.getMaximumPoolSize(); + public ThreadPoolConfig getConfiguration() { + return this.configuration; } /** @@ -186,17 +170,17 @@ public class DefaultThreadPool */ public void shutdown() { if ( this.executor != null ) { - if (shutdownGraceful) { + if (this.configuration.isShutdownGraceful()) { this.executor.shutdown(); } else { this.executor.shutdownNow(); } try { - if (this.shutdownWaitTimeMs > 0) { - if (!this.executor.awaitTermination(this.shutdownWaitTimeMs, TimeUnit.MILLISECONDS)) { + if (this.configuration.getShutdownWaitTimeMs() > 0) { + if (!this.executor.awaitTermination(this.configuration.getShutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) { logger.warn("running commands have not terminated within " - + this.shutdownWaitTimeMs + + this.configuration.getShutdownWaitTimeMs() + "ms. Will shut them down by interruption"); this.executor.shutdownNow(); } diff --git a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java index fcaa35e..72cdc32 100644 --- a/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java +++ b/src/main/java/org/apache/sling/threads/impl/DefaultThreadPoolManager.java @@ -18,9 +18,9 @@ package org.apache.sling.threads.impl; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ThreadFactory; import org.apache.sling.threads.ThreadPool; +import org.apache.sling.threads.ThreadPoolConfig; import org.apache.sling.threads.ThreadPoolManager; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; @@ -37,30 +37,6 @@ import org.slf4j.LoggerFactory; */ public class DefaultThreadPoolManager implements ThreadPoolManager { - /** The default queue size */ - protected final static int DEFAULT_QUEUE_SIZE = -1; - - /** The default maximum pool size */ - protected final static int DEFAULT_MAX_POOL_SIZE = 5; - - /** The default minimum pool size */ - protected final static int DEFAULT_MIN_POOL_SIZE = 5; - - /** The default thread priority */ - protected final static int DEFAULT_THREAD_PRIORITY = Thread.NORM_PRIORITY; - - /** The default daemon mode */ - protected final static boolean DEFAULT_DAEMON_MODE = false; - - /** The default keep alive time */ - protected final static long DEFAULT_KEEP_ALIVE_TIME = 60000L; - - /** The default way to shutdown gracefully */ - protected final static boolean DEFAULT_SHUTDOWN_GRACEFUL = false; - - /** The default shutdown waittime time */ - protected final static int DEFAULT_SHUTDOWN_WAIT_TIME = -1; - /** By default we use the logger for this class. */ protected Logger logger = LoggerFactory.getLogger(getClass()); @@ -74,16 +50,7 @@ public class DefaultThreadPoolManager implements ThreadPoolManager { this.logger.info("Starting thread pool manager."); final ThreadPool defaultPool = new DefaultThreadPool( DEFAULT_THREADPOOL_NAME, - DEFAULT_MIN_POOL_SIZE, - DEFAULT_MAX_POOL_SIZE, - DEFAULT_QUEUE_SIZE, - DEFAULT_KEEP_ALIVE_TIME, - DEFAULT_BLOCK_POLICY, - DEFAULT_SHUTDOWN_GRACEFUL, - DEFAULT_SHUTDOWN_WAIT_TIME, - null, - DEFAULT_THREAD_PRIORITY, - DEFAULT_DAEMON_MODE); + new ThreadPoolConfig()); synchronized ( this.pools ) { this.pools.put(defaultPool.getName(), defaultPool); } @@ -141,39 +108,23 @@ public class DefaultThreadPoolManager implements ThreadPoolManager { } /** - * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, int, int, int, long, org.apache.sling.threads.ThreadPoolManager.ThreadPoolPolicy, boolean, int, java.util.concurrent.ThreadFactory, int, boolean) + * @see org.apache.sling.threads.ThreadPoolManager#create(java.lang.String, org.apache.sling.threads.ThreadPoolConfig) */ public ThreadPool create(String name, - int minPoolSize, - int maxPoolSize, - int queueSize, - long keepAliveTime, - ThreadPoolPolicy blockPolicy, - boolean shutdownGraceful, - int shutdownWaitTimeMs, - ThreadFactory factory, - int priority, - boolean isDaemon) { + ThreadPoolConfig config) { if ( name == null ) { throw new IllegalArgumentException("Name must not be null."); } + if ( config == null ) { + throw new IllegalArgumentException("Config must not be null."); + } synchronized ( this.pools ) { ThreadPool pool = this.pools.get(name); if ( pool != null ) { // pool already exists return null; } - pool = new DefaultThreadPool(name, - minPoolSize, - maxPoolSize, - queueSize, - keepAliveTime, - blockPolicy, - shutdownGraceful, - shutdownWaitTimeMs, - factory, - priority, - isDaemon); + pool = new DefaultThreadPool(name, config); this.pools.put(name, pool); return pool; } diff --git a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java index 7a030a7..45a17a7 100644 --- a/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java +++ b/src/main/java/org/apache/sling/threads/impl/ExtendedThreadFactory.java @@ -18,12 +18,14 @@ package org.apache.sling.threads.impl; import java.util.concurrent.ThreadFactory; +import org.apache.sling.threads.ThreadPoolConfig; + /** * This class is responsible to create new Thread instances. * It's a very basic implementation. * - * @version $Id$ + * @version $Id: DefaultThreadFactory.java 628678 2008-02-18 10:40:12Z cziegeler $ */ public final class ExtendedThreadFactory implements ThreadFactory { @@ -39,21 +41,27 @@ public final class ExtendedThreadFactory implements ThreadFactory { /** * Create a new wrapper for a thread factory handling the * - * @param priority One of {@link Thread#MIN_PRIORITY}, {@link - * Thread#NORM_PRIORITY}, {@link Thread#MAX_PRIORITY} + * @param priority A non null value. * @param isDaemon Whether new {@link Thread}s should run as daemons. */ public ExtendedThreadFactory(final ThreadFactory factory, - final int priority, + final ThreadPoolConfig.ThreadPriority priority, final boolean isDaemon) { this.isDaemon = isDaemon; - if( ( Thread.MAX_PRIORITY == priority ) || - ( Thread.MIN_PRIORITY == priority ) || - ( Thread.NORM_PRIORITY == priority ) ) { - this.priority = priority; - } else { - throw new IllegalStateException("Unknown priority " + priority); - } + if ( priority == null ) { + throw new IllegalStateException("Prioriy must not be null."); + } + switch ( priority ) { + case NORM : this.priority = Thread.NORM_PRIORITY; + break; + case MIN : this.priority = Thread.MIN_PRIORITY; + break; + case MAX : this.priority = Thread.MAX_PRIORITY; + break; + default: // this can never happen + this.priority = Thread.NORM_PRIORITY; + break; + } this.factory = factory; } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
