chirino 2003/11/16 15:12:07
Modified: modules/core/src/java/org/apache/geronimo/connector/work
GeronimoWorkManager.java WorkManagerUtil.java
WorkerContext.java
modules/core/src/java/org/apache/geronimo/connector/work/pool
AbstractWorkExecutorPool.java
ScheduleWorkExecutorPool.java
StartWorkExecutorPool.java
SyncWorkExecutorPool.java
TimedOutPooledExecutor.java
modules/core/src/test/org/apache/geronimo/connector/work
PooledWorkManagerTest.java
Log:
PR: GERONIMO-73
Submitted by: Gianny DAMOUR
Applied the latest patch..
Revision Changes Path
1.2 +80 -45
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java
Index: GeronimoWorkManager.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- GeronimoWorkManager.java 16 Nov 2003 22:42:20 -0000 1.1
+++ GeronimoWorkManager.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -63,43 +63,45 @@
import javax.resource.spi.work.WorkManager;
import javax.resource.spi.work.WorkRejectedException;
+import org.apache.geronimo.connector.work.pool.WorkExecutorPool;
import org.apache.geronimo.kernel.management.State;
import org.apache.geronimo.kernel.service.GeronimoMBeanContext;
import org.apache.geronimo.kernel.service.GeronimoMBeanInfo;
import org.apache.geronimo.kernel.service.GeronimoMBeanTarget;
import org.apache.geronimo.kernel.service.GeronimoOperationInfo;
import org.apache.geronimo.kernel.service.GeronimoParameterInfo;
-import org.apache.geronimo.connector.work.pool.WorkExecutorPool;
/**
* WorkManager implementation which uses under the cover three
WorkExecutorPool
* - one for each synchronization policy - in order to dispatch the
submitted
* Work instances.
+ * <P>
+ * A WorkManager is a component of the JCA specifications, which allows a
+ * Resource Adapter to submit tasks to an Application Server for execution.
*
- * @version $Revision$ $Date$
+* @version $Revision$ $Date$
*/
public class GeronimoWorkManager implements WorkManager, GeronimoMBeanTarget
{
/**
* Pool of threads used by this WorkManager in order to process
- * synchronously the submitted Work instances
+ * the Work instances submitted via the doWork methods.
*/
- private WorkExecutorPool m_syncWorkExecutorPool;
+ private WorkExecutorPool syncWorkExecutorPool;
/**
* Pool of threads used by this WorkManager in order to process
* the Work instances submitted via the startWork methods.
*/
- private WorkExecutorPool m_startWorkExecutorPool;
-
+ private WorkExecutorPool startWorkExecutorPool;
+
/**
* Pool of threads used by this WorkManager in order to process
- * scheduled Work instances
+ * the Work instances submitted via the scheduleWork methods.
*/
- private WorkExecutorPool m_scheduledWorkExecutorPool;
-
+ private WorkExecutorPool scheduledWorkExecutorPool;
private GeronimoMBeanContext geronimoMBeanContext;
-
+
/**
* Create a WorkManager.
*/
@@ -142,63 +144,77 @@
* @param anExecutorPool An executor.
*/
public void setSyncExecutor(WorkExecutorPool anExecutorPool) {
- m_syncWorkExecutorPool = anExecutorPool;
+ syncWorkExecutorPool = anExecutorPool;
}
/**
- * Set the executor in charge of the processing of synchronous until
start
+ * Sets the executor in charge of the processing of synchronous until
start
* works.
* @param anExecutorPool An executor.
*/
public void setStartExecutor(WorkExecutorPool anExecutorPool) {
- m_startWorkExecutorPool = anExecutorPool;
+ startWorkExecutorPool = anExecutorPool;
}
-
+
/**
* Set the executor in charge of the processing of asynchronous works.
* @param anExecutorPool An executor.
*/
public void setAsyncExecutor(WorkExecutorPool anExecutorPool) {
- m_scheduledWorkExecutorPool = anExecutorPool;
+ scheduledWorkExecutorPool = anExecutorPool;
}
-
+
/* (non-Javadoc)
* @see
javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work)
*/
public void doWork(Work work) throws WorkException {
- checkStateBeforeAccept(m_syncWorkExecutorPool, "synchronous");
- m_syncWorkExecutorPool.executeWork(new WorkerContext(work));
+ checkStateBeforeAccept(syncWorkExecutorPool, "synchronous");
+ syncWorkExecutorPool.executeWork(new WorkerContext(work));
}
/* (non-Javadoc)
* @see
javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work, long,
javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener)
*/
- public void doWork(Work work, long startTimeout, ExecutionContext
execContext, WorkListener workListener) throws WorkException {
- checkStateBeforeAccept(m_syncWorkExecutorPool, "synchronous");
- WorkerContext workWrapper = new WorkerContext(work, startTimeout,
execContext, workListener);
+ public void doWork(
+ Work work,
+ long startTimeout,
+ ExecutionContext execContext,
+ WorkListener workListener)
+ throws WorkException {
+ checkStateBeforeAccept(syncWorkExecutorPool, "synchronous");
+ WorkerContext workWrapper =
+ new WorkerContext(work, startTimeout, execContext, workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
- m_syncWorkExecutorPool.executeWork(workWrapper);
+ syncWorkExecutorPool.executeWork(workWrapper);
}
/* (non-Javadoc)
* @see
javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work)
*/
public long startWork(Work work) throws WorkException {
- checkStateBeforeAccept(m_startWorkExecutorPool, "synchronous until
start");
+ checkStateBeforeAccept(startWorkExecutorPool,
+ "synchronous until start");
WorkerContext workWrapper = new WorkerContext(work);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
- m_startWorkExecutorPool.executeWork(workWrapper);
+ startWorkExecutorPool.executeWork(workWrapper);
return System.currentTimeMillis() - workWrapper.getAcceptedTime();
}
/* (non-Javadoc)
* @see
javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work,
long, javax.resource.spi.work.ExecutionContext,
javax.resource.spi.work.WorkListener)
*/
- public long startWork(Work work, long startTimeout, ExecutionContext
execContext, WorkListener workListener) throws WorkException {
- checkStateBeforeAccept(m_startWorkExecutorPool, "synchronous until
start");
- WorkerContext workWrapper = new WorkerContext(work, startTimeout,
execContext, workListener);
+ public long startWork(
+ Work work,
+ long startTimeout,
+ ExecutionContext execContext,
+ WorkListener workListener)
+ throws WorkException {
+ checkStateBeforeAccept(startWorkExecutorPool,
+ "synchronous until start");
+ WorkerContext workWrapper =
+ new WorkerContext(work, startTimeout, execContext, workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
- m_startWorkExecutorPool.executeWork(workWrapper);
+ startWorkExecutorPool.executeWork(workWrapper);
return System.currentTimeMillis() - workWrapper.getAcceptedTime();
}
@@ -206,38 +222,57 @@
* @see
javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work)
*/
public void scheduleWork(Work work) throws WorkException {
- checkStateBeforeAccept(m_scheduledWorkExecutorPool, "asynchronous");
+ checkStateBeforeAccept(scheduledWorkExecutorPool, "asynchronous");
WorkerContext workWrapper = new WorkerContext(work);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
- m_scheduledWorkExecutorPool.executeWork(workWrapper);
+ scheduledWorkExecutorPool.executeWork(workWrapper);
}
/* (non-Javadoc)
* @see
javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work,
long, javax.resource.spi.work.ExecutionContext,
javax.resource.spi.work.WorkListener)
*/
- public void scheduleWork(Work work, long startTimeout, ExecutionContext
execContext, WorkListener workListener) throws WorkException {
- checkStateBeforeAccept(m_scheduledWorkExecutorPool, "asynchronous");
- WorkerContext workWrapper = new WorkerContext(work, startTimeout,
execContext, workListener);
+ public void scheduleWork(
+ Work work,
+ long startTimeout,
+ ExecutionContext execContext,
+ WorkListener workListener)
+ throws WorkException {
+ checkStateBeforeAccept(scheduledWorkExecutorPool, "asynchronous");
+ WorkerContext workWrapper =
+ new WorkerContext(work, startTimeout, execContext, workListener);
workWrapper.setThreadPriority(Thread.currentThread().getPriority());
- m_scheduledWorkExecutorPool.executeWork(workWrapper);
+ scheduledWorkExecutorPool.executeWork(workWrapper);
}
/**
- * This method MUST be called prior to accept a Work instance. It ensures
- * that the state of this WorkManager is running.
- *
+ * This helper method MUST be called prior to accept a Work instance. It
+ * ensures that the state of this WorkManager is running and that the
+ * provided work executor is defined.
+ *
+ * @param aPool Work executor, which will accept the Work instance.
+ * @param aType "Label" of this work executor. It is only used to
+ * create an more accurate message when the provided Work executor is not
+ * defined (null).
+ *
* @throws WorkRejectedException Indicates that this WorkManager is not
* running and hence that a work can not be accepted.
*/
- private void checkStateBeforeAccept(WorkExecutorPool aPool, String
aType) throws WorkRejectedException {
+ private void checkStateBeforeAccept(WorkExecutorPool aPool,
+ String aType) throws WorkRejectedException {
+ if ( !(State.RUNNING_INDEX == getState()) ) {
+ throw new WorkRejectedException(getClass() + " is not running.",
+ WorkException.INTERNAL);
+ } else if ( null == aPool ) {
+ throw new WorkRejectedException(getClass() + " is partially" +
+ " running. Its " + aType + " work facilities are unmounted.",
+ WorkException.INTERNAL);
+ }
+ }
+
+
+ public int getState() throws WorkRejectedException {
try {
- if (!(geronimoMBeanContext.getState() == State.RUNNING_INDEX)) {
- throw new WorkRejectedException("WorkManager is not
running.", WorkException.INTERNAL);
- } else if (null == aPool) {
- throw new WorkRejectedException(
- "WorkManager is not partially" + " running. Its " +
aType + " work facilities are unmounted.",
- WorkException.INTERNAL);
- }
+ return geronimoMBeanContext.getState();
} catch (Exception e) {
throw new WorkRejectedException("WorkManager is not ready.",
WorkException.INTERNAL);
}
1.2 +1 -3
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/WorkManagerUtil.java
Index: WorkManagerUtil.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/WorkManagerUtil.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- WorkManagerUtil.java 16 Nov 2003 22:42:20 -0000 1.1
+++ WorkManagerUtil.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -56,11 +56,9 @@
package org.apache.geronimo.connector.work;
-import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.geronimo.kernel.jmx.JMXUtil;
-import org.apache.geronimo.kernel.jmx.MBeanProxyFactory;
/**
* WorkManager helper.
1.2 +90 -76
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/WorkerContext.java
Index: WorkerContext.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/WorkerContext.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- WorkerContext.java 16 Nov 2003 22:42:20 -0000 1.1
+++ WorkerContext.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -78,82 +78,82 @@
public class WorkerContext implements Work
{
- private Log m_log = LogFactory.getLog(WorkerContext.class);
+ private Log log = LogFactory.getLog(WorkerContext.class);
/**
- * Null WorkListener used as the default WorkListener.
+ * Null WorkListener used as the default WorkListener.
*/
private static final WorkListener NULL_WORK_LISTENER = new WorkAdapter();
/**
- * Priority of the thread which will execute this work.
+ * Priority of the thread, which will execute this work.
*/
- private int m_threadPriority;
+ private int threadPriority;
/**
* Actual work to be executed.
*/
- private Work m_adaptee;
+ private Work adaptee;
/**
* Indicates if this work has been accepted.
*/
- private boolean m_isAccepted;
+ private boolean isAccepted;
/**
- * System.currentTimeMillis() when the Work has been accepted.
+ * System.currentTimeMillis() when the wrapped Work has been accepted.
*/
- private long m_acceptedTime;
+ private long acceptedTime;
/**
* Number of times that the execution of this work has been tried.
*/
- private int m_nbRetry;
+ private int nbRetry;
/**
- * time duration (in milliseconds) within which the execution of the
Work
+ * Time duration (in milliseconds) within which the execution of the
Work
* instance must start.
*/
- private long m_startTimeOut;
+ private long startTimeOut;
/**
- * execution context of the actual work to be executed.
+ * Execution context of the actual work to be executed.
*/
- private ExecutionContext m_executionContext;
+ private ExecutionContext executionContext;
/**
* Listener to be notified during the life-cycle of the work treatment.
*/
- private WorkListener m_workListener = NULL_WORK_LISTENER;
+ private WorkListener workListener = NULL_WORK_LISTENER;
/**
- * Work exception if any.
+ * Work exception, if any.
*/
- private WorkException m_workException;
+ private WorkException workException;
/**
- * A latch which is released when the work is started.
+ * A latch, which is released when the work is started.
*/
- private Latch m_startLatch = new Latch();
+ private Latch startLatch = new Latch();
/**
- * A latch which is released when the work is completed.
+ * A latch, which is released when the work is completed.
*/
- private Latch m_endLatch = new Latch();
+ private Latch endLatch = new Latch();
/**
* Create a WorkWrapper.
*
- * @param aWork Work wrapped by this instance.
+ * @param aWork Work to be wrapped.
*/
public WorkerContext(Work aWork) {
- m_adaptee = aWork;
+ adaptee = aWork;
}
/**
* Create a WorkWrapper with the specified execution context.
*
- * @param aWork Work wrapped by this instance.
+ * @param aWork Work to be wrapped.
* @param aStartTimeout a time duration (in milliseconds) within which
the
* execution of the Work instance must start.
* @param execContext an object containing the execution context with
which
@@ -165,11 +165,11 @@
public WorkerContext(Work aWork, long aStartTimeout,
ExecutionContext execContext,
WorkListener workListener) {
- m_adaptee = aWork;
- m_startTimeOut = aStartTimeout;
- m_executionContext = execContext;
+ adaptee = aWork;
+ startTimeOut = aStartTimeout;
+ executionContext = execContext;
if ( null != workListener ) {
- m_workListener = workListener;
+ this.workListener = workListener;
}
}
@@ -177,58 +177,65 @@
* @see javax.resource.spi.work.Work#release()
*/
public void release() {
- m_adaptee.release();
+ adaptee.release();
}
/**
- * Defines the thread priority level of the thread which will be
dispatched
+ * Defines the thread priority level of the thread, which will be
dispatched
* to process this work. This priority level must be the same one for a
- * given resource adapter.
+ * given resource adapter.
+ *
+ * @param aPriority Priority of the thread to be used to process the
wrapped
+ * Work instance.
*/
public void setThreadPriority(int aPriority) {
- m_threadPriority = aPriority;
+ threadPriority = aPriority;
}
/**
- * Gets the thread priority level of the thread which will be dispatched
+ * Gets the priority level of the thread, which will be dispatched
* to process this work. This priority level must be the same one for a
- * given resource adapter.
+ * given resource adapter.
+ *
+ * @return The priority level of the thread to be dispatched to
+ * process the wrapped Work instance.
*/
public int getThreadPriority() {
- return m_threadPriority;
+ return threadPriority;
}
/**
- * Used by a Work executor in order to notify this work that it has been
- * accepted.
+ * Call-back method used by a Work executor in order to notify this
+ * instance that the wrapped Work instance has been accepted.
*
* @param anObject Object on which the event initially occurred. It
should
* be the work executor.
*/
public synchronized void workAccepted(Object anObject) {
- m_isAccepted = true;
- m_acceptedTime = System.currentTimeMillis();
- m_workListener.workAccepted(new WorkEvent(anObject,
- WorkEvent.WORK_ACCEPTED, m_adaptee, null));
+ isAccepted = true;
+ acceptedTime = System.currentTimeMillis();
+ workListener.workAccepted(new WorkEvent(anObject,
+ WorkEvent.WORK_ACCEPTED, adaptee, null));
}
/**
- * System.currentTimeMillis() when the Work has been accepted.
+ * System.currentTimeMillis() when the Work has been accepted. This
method
+ * can be used to compute the duration of a work.
*
- * @return when the work has ben accepted.
+ * @return When the work has been accepted.
*/
public synchronized long getAcceptedTime() {
- return m_acceptedTime;
+ return acceptedTime;
}
/**
* Gets the time duration (in milliseconds) within which the execution
of
* the Work instance must start.
*
- * @return time out duration.
+ * @return Time out duration.
*/
public long getStartTimeout() {
- return m_startTimeOut;
+ return startTimeOut;
}
/**
@@ -236,40 +243,47 @@
* accepted but not started has timed out. This method MUST be called
prior
* to retry the execution of a Work.
*
- * @return true if the work has timed out and false otherwise.
+ * @return true if the Work has timed out and false otherwise.
*/
public synchronized boolean isTimedOut() {
- assert !m_isAccepted: "The work is not accepted.";
+ assert !isAccepted: "The work is not accepted.";
// A value of 0 means that the work never times out.
- if ( 0 == m_startTimeOut ) {
+ if ( 0 == startTimeOut ) {
return false;
}
boolean isTimeout =
- System.currentTimeMillis() > m_acceptedTime + m_startTimeOut;
- if ( m_log.isDebugEnabled() ) {
- m_log.debug(this + " accepted at " + m_acceptedTime +
- (isTimeout? " has timed out.":" has not timed out. ") +
- m_nbRetry + " retries have been performed.");
+ System.currentTimeMillis() > acceptedTime + startTimeOut;
+ if ( log.isDebugEnabled() ) {
+ log.debug(
+ this
+ + " accepted at "
+ + acceptedTime
+ + (isTimeout ? " has timed out." : " has not timed out.
")
+ + nbRetry
+ + " retries have been performed.");
}
if ( isTimeout ) {
- m_workException = new WorkRejectedException("Time out.",
+ workException = new WorkRejectedException(this + " has timed
out.",
WorkException.START_TIMED_OUT);
- m_workListener.workRejected(
- new WorkEvent(this, WorkEvent.WORK_REJECTED, m_adaptee,
- m_workException));
+ workListener.workRejected(
+ new WorkEvent(
+ this,
+ WorkEvent.WORK_REJECTED,
+ adaptee,
+ workException));
return true;
}
- m_nbRetry++;
+ nbRetry++;
return isTimeout;
}
/**
- * Gets the WorkException, if any, thrown during this work execution.
+ * Gets the WorkException, if any, thrown during the execution.
*
* @return WorkException, if any.
*/
public synchronized WorkException getWorkException() {
- return m_workException;
+ return workException;
}
/* (non-Javadoc)
@@ -279,27 +293,27 @@
if ( isTimedOut() ) {
// In case of a time out, one releases the start and end latches
// to prevent a dead-lock.
- m_startLatch.release();
- m_endLatch.release();
+ startLatch.release();
+ endLatch.release();
return;
}
// Implementation note: the work listener is notified prior to
release
// the start lock. This behavior is intentional and seems to be the
// more conservative.
- m_workListener.workStarted(
- new WorkEvent(this, WorkEvent.WORK_STARTED, m_adaptee, null));
- m_startLatch.release();
+ workListener.workStarted(
+ new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null));
+ startLatch.release();
try {
- m_adaptee.run();
- m_workListener.workCompleted(
- new WorkEvent(this, WorkEvent.WORK_COMPLETED, m_adaptee,
null));
+ adaptee.run();
+ workListener.workCompleted(
+ new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee,
null));
} catch (Throwable e) {
- m_workException = new WorkCompletedException(e);
- m_workListener.workRejected(
- new WorkEvent(this, WorkEvent.WORK_REJECTED, m_adaptee,
- m_workException));
+ workException = new WorkCompletedException(e);
+ workListener.workRejected(
+ new WorkEvent(this, WorkEvent.WORK_REJECTED, adaptee,
+ workException));
} finally {
- m_endLatch.release();
+ endLatch.release();
}
}
@@ -311,7 +325,7 @@
* work execution.
*/
public synchronized Latch provideStartLatch() {
- return m_startLatch;
+ return startLatch;
}
/**
@@ -322,11 +336,11 @@
* work execution.
*/
public synchronized Latch provideEndLatch() {
- return m_endLatch;
+ return endLatch;
}
public String toString() {
- return "Work :" + m_adaptee;
+ return "Work :" + adaptee;
}
}
1.2 +57 -45
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/AbstractWorkExecutorPool.java
Index: AbstractWorkExecutorPool.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/AbstractWorkExecutorPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- AbstractWorkExecutorPool.java 16 Nov 2003 22:42:20 -0000 1.1
+++ AbstractWorkExecutorPool.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -66,7 +66,7 @@
import EDU.oswego.cs.dl.util.concurrent.Channel;
/**
- * Based class for WorkExecutorPool. A sub-class defines the synchronization
+ * Based class for WorkExecutorPool. Sub-classes define the synchronization
* policy (should the call block until the end of the work; or when it starts
* et cetera).
*
@@ -80,63 +80,70 @@
/**
* A timed out pooled executor.
*/
- private TimedOutPooledExecutor m_pooledExecutor;
+ private TimedOutPooledExecutor pooledExecutor;
/**
- * Creates a pool with the specified minimum and maximum sizes.
+ * Creates a pool with the specified minimum and maximum sizes. The
Channel
+ * used to enqueue the submitted Work instances is queueless synchronous
+ * one.
*
* @param aMinSize Minimum size of the work executor pool.
* @param aMaxSize Maximum size of the work executor pool.
- * @param aRetryDuration Duration (in milliseconds) to wait prior to
retry
- * the execution of a Work.
*/
public AbstractWorkExecutorPool(int aMinSize, int aMaxSize) {
- m_pooledExecutor = new TimedOutPooledExecutor();
- m_pooledExecutor.setMinimumPoolSize(aMinSize);
- m_pooledExecutor.setMaximumPoolSize(aMaxSize);
- m_pooledExecutor.waitWhenBlocked();
+ pooledExecutor = new TimedOutPooledExecutor();
+ pooledExecutor.setMinimumPoolSize(aMinSize);
+ pooledExecutor.setMaximumPoolSize(aMaxSize);
+ pooledExecutor.waitWhenBlocked();
}
/**
- * Creates a pool with the specified minimum and maximum sizes.
+ * Creates a pool with the specified minimum and maximum sizes and using
the
+ * specified Channel to enqueue the submitted Work instances.
*
- * @param Queue to be used on top of the pool.
+ * @param aChannel Queue to be used as the queueing facility of this
pool.
* @param aMinSize Minimum size of the work executor pool.
* @param aMaxSize Maximum size of the work executor pool.
- * @param aRetryDuration Duration (in milliseconds) to wait prior to
retry
- * the execution of a Work.
*/
- public AbstractWorkExecutorPool(Channel aChannel, int aMinSize, int
aMaxSize) {
- m_pooledExecutor = new TimedOutPooledExecutor(aChannel);
- m_pooledExecutor.setMinimumPoolSize(aMinSize);
- m_pooledExecutor.setMaximumPoolSize(aMaxSize);
- m_pooledExecutor.waitWhenBlocked();
+ public AbstractWorkExecutorPool(
+ Channel aChannel,
+ int aMinSize, int aMaxSize) {
+ pooledExecutor = new TimedOutPooledExecutor(aChannel);
+ pooledExecutor.setMinimumPoolSize(aMinSize);
+ pooledExecutor.setMaximumPoolSize(aMaxSize);
+ pooledExecutor.waitWhenBlocked();
}
/**
* Delegates the work execution to the pooled executor.
*
- * @see
EDU.oswego.cs.dl.util.concurrent.PooledExecutor#execute(java.lang.Runnable)
+ * @param aWork Work to be executed.
*/
protected void execute(WorkerContext aWork) throws InterruptedException {
- m_pooledExecutor.execute(aWork);
+ pooledExecutor.execute(aWork);
}
/**
- * @see
org.apache.geronimo.workmanagement.WorkExecutorPool#execute(org.apache.geronimo.workmanagement.WorkWrapper)
+ * Execute the specified Work.
+ *
+ * @param aWork Work to be executed.
+ *
+ * @exception WorkException Indicates that the Work execution has been
+ * unsuccessful.
*/
public void executeWork(WorkerContext aWork) throws WorkException {
aWork.workAccepted(this);
try {
doExecute(aWork);
- WorkException exception = aWork.getWorkException();
- if (null != exception) {
+ WorkException exception = aWork.getWorkException();
+ if ( null != exception ) {
throw exception;
}
} catch (InterruptedException e) {
- WorkCompletedException wcj = new WorkCompletedException("The
execution has been interrupted.", e);
+ WorkCompletedException wcj = new WorkCompletedException(
+ "The execution has been interrupted.", e);
wcj.setErrorCode(WorkException.INTERNAL);
- throw wcj;
+ throw wcj;
}
}
@@ -150,57 +157,62 @@
}
/**
- * @see org.apache.geronimo.work.WorkExecutorPool#getPoolSize()
+ * Gets the size of this pool.
*/
public int getPoolSize() {
- return m_pooledExecutor.getPoolSize();
+ return pooledExecutor.getPoolSize();
}
/**
- * @see org.apache.geronimo.work.WorkExecutorPool#getMinimumPoolSize()
+ * Gets the minimu size of this pool.
*/
public int getMinimumPoolSize() {
- return m_pooledExecutor.getMinimumPoolSize();
+ return pooledExecutor.getMinimumPoolSize();
}
/**
- * @see org.apache.geronimo.work.WorkExecutorPool#setMinimumPoolSize(int)
+ * Sets the minimum size of this pool.
+ * @param aSize New minimum size of the pool.
*/
public void setMinimumPoolSize(int aSize) {
- m_pooledExecutor.setMinimumPoolSize(aSize);
+ pooledExecutor.setMinimumPoolSize(aSize);
}
/**
- * @see org.apache.geronimo.work.WorkExecutorPool#getMaximumPoolSize()
+ * Gets the maximum size of this pool.
*/
public int getMaximumPoolSize() {
- return m_pooledExecutor.getMaximumPoolSize();
+ return pooledExecutor.getMaximumPoolSize();
}
/**
- * @see org.apache.geronimo.work.WorkExecutorPool#setMaximumPoolSize(int)
+ * Sets the maximum size of this pool.
+ * @param aSize New maximum size of this pool.
*/
public void setMaximumPoolSize(int aSize) {
- m_pooledExecutor.setMaximumPoolSize(aSize);
+ pooledExecutor.setMaximumPoolSize(aSize);
}
/**
- * This method must be implemented by sub-classes in order to provide a
- * synchronization policy.
+ * This method must be implemented by sub-classes in order to provide the
+ * relevant synchronization policy. It is called by the executeWork
template
+ * method.
*
* @param aWork Work to be executed.
*
- * @throws WorkException Indicates the work has failed.
+ * @throws WorkException Indicates that the work has failed.
* @throws InterruptedException Indicates that the thread in charge of
the
- * execution of the specified work dies.
+ * execution of the specified work has been interrupted.
*/
- protected abstract void doExecute(WorkerContext aWork) throws
WorkException, InterruptedException;
+ protected abstract void doExecute(WorkerContext aWork)
+ throws WorkException, InterruptedException;
- /* (non-Javadoc)
- * @see org.apache.geronimo.connector.WorkExecutorPool#stop()
+ /**
+ * Stops this pool. Prior to stop this pool, all the enqueued Work
instances
+ * are processed. This is an orderly shutdown.
*/
public void doStop() {
- m_pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
}
-
+
}
1.2 +6 -3
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/ScheduleWorkExecutorPool.java
Index: ScheduleWorkExecutorPool.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/ScheduleWorkExecutorPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ScheduleWorkExecutorPool.java 16 Nov 2003 22:42:20 -0000 1.1
+++ ScheduleWorkExecutorPool.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -68,7 +68,7 @@
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
/**
- * WorkExecutorPool which treats the submitted Work instances
asynchronously.
+ * WorkExecutorPool handling the submitted Work instances asynchronously.
* More accurately, its execute method returns immediately after the work
* submission.
*
@@ -89,12 +89,15 @@
public ScheduleWorkExecutorPool(int aMinSize, int aMaxSize) {
super(new LinkedQueue(), aMinSize, aMaxSize);
}
-
+
public void setGeronimoWorkManager( GeronimoWorkManager wm ) {
wm.setAsyncExecutor(this);
}
/**
+ * Performs the actual execution of the specified work.
+ *
+ * @param aWork Work to be executed.
*/
public void doExecute(WorkerContext aWork)
throws WorkException, InterruptedException {
1.2 +9 -12
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/StartWorkExecutorPool.java
Index: StartWorkExecutorPool.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/StartWorkExecutorPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- StartWorkExecutorPool.java 16 Nov 2003 22:42:20 -0000 1.1
+++ StartWorkExecutorPool.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -69,8 +69,8 @@
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
/**
- * WorkExecutorPool which treats the submitted Work instances synchronously
- * until the work starts. More accurately, its execute method returns when
the
+ * WorkExecutorPool handling the submitted Work instances synchronously
+ * until the work start. More accurately, its execute method returns when the
* work is started.
*
* @jmx:mbean extends="AbstractWorkExecutorPoolMBean"
@@ -96,21 +96,18 @@
}
/**
- * In the case of a synchronous execution, the Work has been executed and
- * one needs to retrieve the WorkException thrown during this execution,
if
- * any.
- *
- * @exception WorkException Not thrown.
- * @exception InterruptedException Indicates that this work execution
- * has been interrupted.
+ * Performs the actual work execution. This execution is synchronous
until
+ * the start of the submitted work.
+ *
+ * @param aWork Work to be executed.
*/
public void doExecute(WorkerContext aWork)
throws WorkException, InterruptedException {
Latch latch = aWork.provideStartLatch();
- super.execute(aWork);
+ execute(aWork);
latch.acquire();
}
-
+
public static GeronimoMBeanInfo getGeronimoMBeanInfo() throws Exception {
try {
GeronimoMBeanInfo rc
=AbstractWorkExecutorPool.getGeronimoMBeanInfo();
1.2 +9 -13
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/SyncWorkExecutorPool.java
Index: SyncWorkExecutorPool.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/SyncWorkExecutorPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SyncWorkExecutorPool.java 16 Nov 2003 22:42:20 -0000 1.1
+++ SyncWorkExecutorPool.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -68,9 +68,9 @@
import EDU.oswego.cs.dl.util.concurrent.Latch;
/**
- * WorkExecutorPool which treats the submitted Work instances synchronously.
- * More accurately, its execute method blocks until the work is completed.
- *
+ * WorkExecutorPool handling the submitted Work instances synchronously.
+ * More accurately, its execute method blocks until the work completion.
+ *
* @version $Revision$ $Date$
*/
public class SyncWorkExecutorPool
@@ -86,24 +86,20 @@
public SyncWorkExecutorPool(int aMinSize, int aMaxSize) {
super(aMinSize, aMaxSize);
}
-
+
public void setGeronimoWorkManager( GeronimoWorkManager wm ) {
wm.setSyncExecutor(this);
}
/**
- * In the case of a synchronous execution, the Work has been executed and
- * one needs to retrieve the WorkException thrown during this execution,
if
- * any.
- *
- * @exception WorkException Not thrown.
- * @exception InterruptedException Indicates that this work execution
- * has been interrupted.
+ * Performs the actual work execution. This execution is synchronous.
+ *
+ * @param aWork Work to be executed.
*/
public void doExecute(WorkerContext aWork)
throws WorkException, InterruptedException {
Latch latch = aWork.provideEndLatch();
- super.execute(aWork);
+ execute(aWork);
latch.acquire();
}
1.2 +18 -8
incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/TimedOutPooledExecutor.java
Index: TimedOutPooledExecutor.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/connector/work/pool/TimedOutPooledExecutor.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TimedOutPooledExecutor.java 16 Nov 2003 22:42:20 -0000 1.1
+++ TimedOutPooledExecutor.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -70,10 +70,21 @@
public class TimedOutPooledExecutor extends PooledExecutor
{
+ /**
+ * Creates a pooled executor. The Channel used to enqueue the submitted
+ * Work instance is a queueless synchronous one.
+ */
public TimedOutPooledExecutor() {
setBlockedExecutionHandler(new TimedOutSpinHandler());
}
-
+
+ /**
+ * Creates a pooled executor, which uses the provided Channel as its
+ * queueing mechanism.
+ *
+ * @param aChannel Channel to be used to enqueue the submitted Work
+ * intances.
+ */
public TimedOutPooledExecutor(Channel aChannel) {
super(aChannel);
setBlockedExecutionHandler(new TimedOutSpinHandler());
@@ -83,14 +94,13 @@
* Executes the provided task, which MUST be an instance of WorkWrapper.
*
* @throws IllegalArgumentException Indicates that the provided task is
not
- * a WorkWrapper.
+ * a WorkWrapper instance.
*/
public void execute(Runnable aTask) throws InterruptedException {
- if ( aTask instanceof WorkerContext ) {
- super.execute(aTask);
- return;
+ if ( !(aTask instanceof WorkerContext) ) {
+ throw new IllegalArgumentException("Please submit a
WorkWrapper.");
}
- throw new IllegalArgumentException("Please submit a WorkWrapper");
+ super.execute(aTask);
}
/**
1.2 +10 -10
incubator-geronimo/modules/core/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java
Index: PooledWorkManagerTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/core/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PooledWorkManagerTest.java 16 Nov 2003 22:42:20 -0000 1.1
+++ PooledWorkManagerTest.java 16 Nov 2003 23:12:07 -0000 1.2
@@ -75,7 +75,7 @@
import org.apache.geronimo.connector.work.pool.SyncWorkExecutorPool;
/**
- * Timing is crucial for this test case, which focuses on the synchronous
+ * Timing is crucial for this test case, which focuses on the synchronization
* specificities of the doWork, startWork and scheduleWork.
*
* @version $Revision$ $Date$
@@ -87,8 +87,8 @@
private GeronimoWorkManager m_workManager;
private static final int m_nbMin = 1;
private static final int m_nbMax = 1;
- private static final int m_timeout = 3000;
- private static final int m_tempo = 2000;
+ private static final int m_timeout = 300;
+ private static final int m_tempo = 200;
public PooledWorkManagerTest() throws Exception {
super("WorkManager");
@@ -201,7 +201,7 @@
return rarThreads;
}
- private static abstract class AbstractDummyWork extends Thread {
+ public static abstract class AbstractDummyWork extends Thread {
public DummyWorkListener m_listener;
protected WorkManager m_workManager;
protected String m_name;
@@ -223,7 +223,7 @@
WorkListener workListener) throws Exception;
}
- private static class DummyDoWork extends AbstractDummyWork {
+ public static class DummyDoWork extends AbstractDummyWork {
public DummyDoWork(WorkManager aWorkManager, String aName) {
super(aWorkManager, aName);
}
@@ -235,7 +235,7 @@
}
}
- private static class DummyStartWork extends AbstractDummyWork {
+ public static class DummyStartWork extends AbstractDummyWork {
public DummyStartWork(WorkManager aWorkManager, String aName) {
super(aWorkManager, aName);
}
@@ -247,7 +247,7 @@
}
}
- private static class DummyScheduleWork extends AbstractDummyWork {
+ public static class DummyScheduleWork extends AbstractDummyWork {
public DummyScheduleWork(WorkManager aWorkManager, String aName) {
super(aWorkManager, aName);
}
@@ -259,7 +259,7 @@
}
}
- private static class DummyWork implements Work {
+ public static class DummyWork implements Work {
private String m_name;
public DummyWork(String aName) {m_name = aName;}
public void release() {}
@@ -273,7 +273,7 @@
public String toString() {return m_name;}
}
- private static class DummyWorkListener implements WorkListener {
+ public static class DummyWorkListener implements WorkListener {
public WorkEvent m_event;
public void workAccepted(WorkEvent e) {m_event = e;}
public void workRejected(WorkEvent e) {m_event = e;}