Author: thorsten
Date: Wed May 6 22:01:26 2009
New Revision: 772441
URL: http://svn.apache.org/viewvc?rev=772441&view=rev
Log:
DROIDS-46
MultiThreadedTaskMaster (WorkRunner) memory leak
DUE-TO: Mingfai Ma
Apply patch that had been put together by Ryan.
Thanks Ryan and Mingfai.
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
URL:
http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
---
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
(original)
+++
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
Wed May 6 22:01:26 2009
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.droids.api;
-
-
-/**
- * A pausable task master interface
- */
-public interface PausableTaskMaster<T extends Task> extends TaskMaster<T> {
- void pause();
- void resume();
-}
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
URL:
http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
---
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
(original)
+++
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
Wed May 6 22:01:26 2009
@@ -28,7 +28,6 @@
public enum ExecutionState {
INITALIZED,
RUNNING,
- PAUSED,
COMPLETE
};
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL:
http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
---
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
(original)
+++
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Wed May 6 22:01:26 2009
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,131 +16,227 @@
*/
package org.apache.droids.impl;
+import java.io.IOException;
import java.util.Date;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.droids.api.DelayTimer;
import org.apache.droids.api.Droid;
-import org.apache.droids.api.PausableTaskMaster;
import org.apache.droids.api.Task;
import org.apache.droids.api.TaskExceptionHandler;
import org.apache.droids.api.TaskExceptionResult;
+import org.apache.droids.api.TaskMaster;
import org.apache.droids.api.TaskQueue;
import org.apache.droids.api.WorkMonitor;
import org.apache.droids.api.Worker;
-import org.apache.droids.helper.Loggable;
+import org.apache.droids.exception.DroidsException;
-public class MultiThreadedTaskMaster<T extends Task>
- extends Loggable implements PausableTaskMaster<T>
-{
+/**
+ * This task master provides a base implementation that support multithreaded
task processing powered by a
+ * ThreadPoolExecutor.
+ * <p/>
+ * By default, the ThreadPoolExecutor uses a bounded blocking queue with a
size as same as the maxThreads. As long as
+ * there is any outstanding task in the Task Queue and the Executor has spare
capacity, a new Thread will be created to
+ * poll and handle one task from the Task Queue. User may set to use another
'pool' implementation, and the nextTask()
+ * method may be overriden by sub-class.
+ * <p/>
+ * This Task Master doesn't support: pausing, monitoring etc.
TaskExceptionHandler is unimplemented.
+ * <p/>
+ * And there is no failure handling mechanism, e.g. if the JVM of Task Master
is crashed, any task polled from the Task Queue
+ * will be loss. (this point makes sense only if the Task Queue is persistent)
+ *
+ * @param <T>
+ */
+public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
+ protected final Log log = LogFactory.getLog(MultiThreadedTaskMaster.class);
private static final long KEEP_ALIVE = 50000L;
-
- private PausableThreadPoolExecutor pool = null;
- private int maxThreads = 0;
- private TaskQueue<T> queue = null;
- private Droid<T> droid = null;
- private DelayTimer delayTimer = null;
- private WorkMonitor<T> monitor = null;
-
- private Date startedWorking = null;
- private Date finishedWorking = null;
- private T lastCompletedTask = null;
- private ExecutionState state = ExecutionState.INITALIZED;
+ private static final long TICKLE_TIME = 100L;
+
+ protected ThreadPoolExecutor pool = null;
+ protected int maxThreads = 0;
+ protected TaskQueue<T> queue = null;
+ protected Droid<T> droid = null;
+ protected DelayTimer delayTimer = null;
+ protected WorkMonitor<T> monitor = null;
+
+ protected Date startedWorking = null;
+ protected Date finishedWorking = null;
+ protected T lastCompletedTask = null;
+ protected volatile ExecutionState state = ExecutionState.INITALIZED;
+ protected AtomicLong completedCount = new AtomicLong();
+ protected TaskExceptionHandler exHandler;
- private TaskExceptionHandler exHandler;
-
/**
* The queue has been initialized
*/
- public void processAllTasks(final TaskQueue<T> queue, final Droid<T> droid)
- {
+ public void processAllTasks(final TaskQueue<T> queue, final Droid<T> droid) {
+ if( state == ExecutionState.RUNNING ) {
+ // TODO? throw an error?
+ log.info("already processing tasks" );
+ return;
+ }
+
this.queue = queue;
this.droid = droid;
this.startedWorking = new Date();
this.finishedWorking = null;
this.state = ExecutionState.RUNNING;
- if( !queue.hasNext() ) {
- log.info( "no tasks. finishing now" );
+ if (!queue.hasNext()) {
+ log.info("no tasks. finishing now");
terminate();
return;
}
- int n = getMaxThreads();
+ // start the pool
+ int poolSize = getMaxThreads();
+ // user may set a thread pool before calling the processAllTasks
+ this.pool = this.pool != null ? this.pool : new
ThreadPoolExecutor(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new
ArrayBlockingQueue<Runnable>(poolSize));
+ this.pool.setRejectedExecutionHandler(new
ThreadPoolExecutor.DiscardPolicy());//task is not yet run, there is no impact
to discard fail to submit task
if (log.isInfoEnabled()) {
- log.info("Number of concurrent workers: " + n);
+ log.info("processAllTasks() - created ThreadPoolExecutor - poolSize: " +
poolSize);
}
- // start the pool
- this.pool = new PausableThreadPoolExecutor(n, n, KEEP_ALIVE,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
- startWorkers();
- log.info("Finshed invocation, waiting for workers to finish.");
- }
-
-
- private void finishedRunner(WorkerRunner runner)
- {
- synchronized (this) {
- lastCompletedTask = runner.task;
- pool.remove(runner);
- if (log.isDebugEnabled()) {
- log.debug("Worker '" + runner.task.getId() + "' has finished.");
- }
-
- boolean terminate = false;
-
- Exception ex = runner.exception;
- if (ex != null) {
- TaskExceptionResult result = TaskExceptionResult.WARN;
- if (exHandler != null) {
- result = exHandler.handleException(ex);
- }
- switch (result) {
- case WARN:
- log.warn(ex.toString(), ex);
- break;
- case FATAL:
- log.warn(ex.getMessage(), ex);
- terminate = true;
- break;
+ // process tasks in a new thread
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute( new Runnable() {
+ @Override
+ public void run() {
+ while (queue.hasNext() || pool.getActiveCount() > 0) {
+ if (log.isTraceEnabled()) {
+ log.trace("processAllTasks() - pool.activeCount: " +
pool.getActiveCount() + ", pool.maximumPoolSize(): " +
pool.getMaximumPoolSize());
+ }
+
+ if (queue.hasNext() && pool.getActiveCount() <
pool.getMaximumPoolSize()) {
+ nextTask(pool, queue);
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("processAllTasks() - no task or thread pool is full,
to check again in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
+ Thread.sleep(TICKLE_TIME);
+ }
+ catch (InterruptedException e) {
+ log.error(e);
+ }
+ }
}
+ state = ExecutionState.COMPLETE;
+ log.info("Finshed invocation, waiting for workers to finish.");
}
-
- if (terminate) {
- terminate();
- }
- else {
- int cnt = startWorkers();
+ });
+ }
+
+ /**
+ * This method is designed for sub-classing
+ * <p/>
+ * TODO: refactor and create a specialized thread that
+ * - construct with the delayTimer and transparently apply the delay
+ * - construct with the Task and transparent set the lastCompleteTask
+ * <p/>
+ * TODO: consider to provide all variables as argument so that the method
overrider needs not to call the parent
+ * variables
+ * <p/>
+ * TODO: caller of this method could use the Future to track the task
completion and perform the termination and
+ * other checking.
+ *
+ * @param executor
+ * @param queue
+ * @return
+ */
+ protected Future nextTask(ExecutorService executor, final TaskQueue<?
extends T> queue) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ String threadName = Thread.currentThread().getName();
+ T task = queue.next();
+ long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
+ if (log.isDebugEnabled())
+ log.debug("run() - begin - thread: " + threadName + ", task: " +
task.getId() + ", delay: " + delay);
- // shutdown the queue...
- if( cnt == 0 && !queue.hasNext() ) {
- if( pool.getActiveCount() > 1 ) {
- // keep running
+ Worker<T> worker = null;
+ Exception ex = null;
+ try {
+ if( delay > 0 ) {
+ Thread.sleep(delay); // gets the current thread
}
- else {
+ worker = droid.getNewWorker();
+ if( monitor != null ) {
+ monitor.beforeExecute( task, worker );
+ }
+ worker.execute(task);
+ lastCompletedTask = task;
+ } catch (DroidsException e) {
+ ex = e;
+ } catch (IOException e) {
+ ex = e;
+ } catch (InterruptedException e) {
+ ex = e;
+ }
+ finally {
+ // Handle any exceptions
+ boolean terminate = false;
+ if (ex != null) {
+ try {
+ TaskExceptionResult result = TaskExceptionResult.WARN;
+ if (exHandler != null) {
+ result = exHandler.handleException(ex);
+ }
+ switch (result) {
+ case WARN:
+ log.warn(ex.toString(), ex);
+ break;
+ case FATAL:
+ log.warn(ex.getMessage(), ex);
+ terminate = true;
+ break;
+ case IGNORE: break; // nothing
+ }
+ }
+ catch( Exception e2 ) {
+ log.error( e2.getMessage(), e2 );
+ }
+ }
+
+ if( monitor != null ) {
+ monitor.afterExecute( task, worker, ex );
+ }
+
+ completedCount.incrementAndGet();
+ if (log.isInfoEnabled()) {
+ log.info("run() - done - completedCount: " + completedCount + ",
thread: " + threadName +
+ ", task: " + task.getId() + ", queue.size: " + queue.getSize()
+
+ ", pool.activeCount: " + pool.getActiveCount());
+ }
+
+ if( terminate ) {
+ terminate();
+ }
+ else if (!queue.hasNext() && pool.getActiveCount() == 1) { //TODO it
isn't a very good idea to check the activeCount inside the thread. an
alternative way is to use the future to track the thread status and do the
termination
+ log.info("run() - no more queued task and active threads, set to
terminate");
terminate();
}
}
}
- }
+ });
}
-
- private void terminate()
- {
- if( pool != null ) {
+
+
+ private void terminate() {
+ if (pool != null) {
shutdownAndAwaitTermination();
}
-
+
long elapsed = System.currentTimeMillis() - startedWorking.getTime();
if (log.isInfoEnabled()) {
- log.info("All threads have finished. (elapsed: " + elapsed + ")" );
+ log.info("All threads have finished. (elapsed: " + elapsed + ")");
}
finishedWorking = new Date();
state = ExecutionState.COMPLETE;
@@ -148,86 +244,32 @@
}
/**
- * Will start a new worker.
- * @return the id of the worker we just started.
+ * Set the maximum allowed thread count. If the new value is less
+ * than the current value, excess existing threads will be terminated
+ * when they become idle.
+ *
+ * @param count
*/
- private int startWorkers(){
-
- int cnt = 0;
- try {
- while( queue.hasNext() && cnt++ < maxThreads ) {
-// // checking the "activeCount" can be expensive...
-// // The getActiveCount is often wrong for only a single thread!
-// // the javadocs say the number is approxiate, so in the case we only
-// // see one active worker, we will assume there are none...
-// int activeWorkers = pool.getActiveCount();
-// if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
-// activeWorkers = 0;
-// }
-// if( activeWorkers >= getMaxThreads() ) {
-// return; // don't make a new runner...
-// }
- pool.execute( new WorkerRunner() );
- }
- }
- catch( RejectedExecutionException ex ) {
- log.trace( "rejected", ex );
+ public void setMaxThreads( int count ) {
+ if( pool != null ) {
+ pool.setMaximumPoolSize( count );
+
+ // NOTE, the main loop should fill up any spaces
+ // within TICKLE_TIME
}
-
-// if( cnt == 0 && !queue.hasNext() && pool.getActiveCount() == 0 ) {
-// terminate(); // nothing to do, we must be done...
-// }
- return cnt;
- }
-
-
- public final void setExceptionHandler(TaskExceptionHandler exHandler) {
- this.exHandler = exHandler;
- }
-
-
- public DelayTimer getDelayTimer() {
- return delayTimer;
- }
-
-
- public void setDelayTimer(DelayTimer delayTimer) {
- this.delayTimer = delayTimer;
- }
-
- public WorkMonitor<T> getMonitor() {
- return monitor;
- }
-
- public void setMonitor(WorkMonitor<T> monitor) {
- this.monitor = monitor;
+ maxThreads = count;
}
/**
- * Adjust number of allowed threads
- * @param maxThreads
- */
- public void setMaxThreads(int maxThreads) {
- if( pool != null && maxThreads != this.maxThreads ) {
- pool.setMaximumPoolSize( maxThreads );
- pool.setCorePoolSize( maxThreads );
- if( state == ExecutionState.RUNNING ) {
- startWorkers(); // fill up the queue with new workers...
- }
- }
- this.maxThreads = maxThreads;
- }
-
- /**
* Get number of maximum allowed threads
+ *
* @return the number of maximum threads that we allow
*/
public int getMaxThreads() {
return maxThreads;
}
- public ExecutionState getExecutionState()
- {
+ public ExecutionState getExecutionState() {
return state;
}
@@ -236,9 +278,9 @@
* If it is not working by asking nice to shutdown just kill all
* threads.
*/
- protected void shutdownAndAwaitTermination() {
+ public void shutdownAndAwaitTermination() {
log.info("SHUTTING DOWN");
- if( pool == null ) {
+ if (pool == null) {
return;
}
pool.shutdown(); // Disable new tasks from being submitted
@@ -252,7 +294,7 @@
log.info("Pool did not terminate");
}
}
- }
+ }
catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
@@ -260,55 +302,27 @@
Thread.currentThread().interrupt();
}
}
-
- /**
- * Wrapper to call finishedWorker() after
- */
- class WorkerRunner implements Runnable {
- T task;
- Exception exception;
-
- @Override
- public void run() {
- try {
- task = queue.next();
- if( task != null ) {
- if( delayTimer != null ) {
- long delay = delayTimer.getDelayMillis();
- if( delay > 0 ) {
- try {
- Thread.sleep( delay );
- }
- catch (InterruptedException e) {} // we don't really care....
- }
- }
- Worker<T> worker = droid.getNewWorker();
- try {
- if( monitor != null ) {
- monitor.beforeExecute(task, worker);
- }
- worker.execute( task );
- if( monitor != null ) {
- monitor.afterExecute(task, worker, null);
- }
- }
- catch (Exception ex) {
- exception = ex;
- if( monitor != null ) {
- monitor.afterExecute(task, worker, ex);
- }
- }
- }
- }
- finally {
- finishedRunner( this );
- }
- }
+
+ public void awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ log.info("awaitTermination()");
+ pool.awaitTermination(timeout, unit);
+ }
+
+ public final void setExceptionHandler(TaskExceptionHandler exHandler) {
+ this.exHandler = exHandler;
+ }
+
+ public DelayTimer getDelayTimer() {
+ return delayTimer;
+ }
+
+ public void setDelayTimer(DelayTimer delayTimer) {
+ this.delayTimer = delayTimer;
}
public long getCompletedTasks() {
- return (pool==null) ? 0 : pool.getCompletedTaskCount();
+ return completedCount.get();
}
public Date getFinishedWorking() {
@@ -323,65 +337,12 @@
return startedWorking;
}
- public void awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
- pool.awaitTermination(timeout, unit);
- }
-
-
- /**
- * "pause" support
- */
- private class PausableThreadPoolExecutor extends ThreadPoolExecutor {
- ReentrantLock pauseLock = new ReentrantLock();
- Condition unpaused = pauseLock.newCondition();
-
- public PausableThreadPoolExecutor(
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- pauseLock.lock();
- try {
- while (state == ExecutionState.PAUSED) unpaused.await();
- } catch(InterruptedException ie) {
- t.interrupt();
- } finally {
- pauseLock.unlock();
- }
- }
+ public WorkMonitor<T> getMonitor() {
+ return monitor;
}
- public void pause() {
- if( pool == null ) {
- return;
- }
- pool.pauseLock.lock();
- try {
- state = ExecutionState.PAUSED;
- } finally {
- pool.pauseLock.unlock();
- }
+ public void setMonitor(WorkMonitor<T> monitor) {
+ this.monitor = monitor;
}
- public void resume() {
- if( pool == null ) {
- return;
- }
- pool.pauseLock.lock();
- try {
- state = ExecutionState.RUNNING;
- pool.unpaused.signalAll();
- } finally {
- pool.pauseLock.unlock();
- }
- }
}
-
-