Author: nuttycom
Date: Fri Aug 3 21:31:35 2007
New Revision: 562653
URL: http://svn.apache.org/viewvc?view=rev&rev=562653
Log:
Refactored duplicate functionality from pooling, dedicated thread drivers into
abstract base class. Retrofitted synchronous drivers with fault tolerance.
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
---
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
(original)
+++
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
Fri Aug 3 21:31:35 2007
@@ -37,7 +37,18 @@
/**
* The context for the stage being run
*/
- protected StageContext context;
+ protected StageContext context;
+
+ /**
+ * The current state of processing. In most drivers, this is used for
+ * thread control.
+ */
+ protected volatile State currentState = State.STOPPED;
+
+ /**
+ * Enumerated value indicating the fault tolerance level of the
StageDriver.
+ */
+ protected FaultTolerance faultTolerance = FaultTolerance.NONE;
/**
* List of processing failures that have occurred.
@@ -56,10 +67,21 @@
* @param context The context in which to run the stage
*/
public AbstractStageDriver(Stage stage, StageContext context) {
+ this(stage, context, FaultTolerance.NONE);
+ }
+
+ /**
+ * Creates a StageDriver for the specified stage.
+ *
+ * @param stage The stage for which the driver will be created
+ * @param context The context in which to run the stage
+ */
+ public AbstractStageDriver(Stage stage, StageContext context,
FaultTolerance faultTolerance) {
if (stage == null) throw new IllegalArgumentException("Stage may not
be null.");
if (context == null) throw new IllegalArgumentException("Context may
not be null.");
this.stage = stage;
this.context = context;
+ this.faultTolerance = faultTolerance;
}
/**
@@ -76,14 +98,48 @@
* in which the driver is being run and the managed stage.
* @return the Feeder used to feed objects to the managed stage for
processing.
*/
- public abstract Feeder getFeeder();
+ public abstract Feeder getFeeder();
+
+ /**
+ * Return the current state of stage processing.
+ * @return the current state of processing
+ */
+ public State getState() {
+ return this.currentState;
+ }
/**
- * Returns the current state of stage processing.
- * @return The current state
+ * Atomically tests to determine whether or not the driver is in the one of
+ * the specified states.
*/
- public abstract State getState();
-
+ protected synchronized boolean isInState(State... states) {
+ for (State state : states) if (state == currentState) return true;
+ return false;
+ }
+
+ /**
+ * Set the current state of stage processing and notify any listeners
+ * that may be waiting on a state change.
+ */
+ protected synchronized void setState(State nextState) {
+ this.currentState = nextState;
+ this.notifyAll();
+ }
+
+ /**
+ * This method performs an atomic conditional state transition change
+ * to the value specified by the nextState parameter if and only if the
+ * current state is equal to the test state.
+ */
+ protected synchronized boolean testAndSetState(State testState, State
nextState) {
+ if (currentState == testState) {
+ setState(nextState);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/**
* This method is used to start the driver, run the
* [EMAIL PROTECTED] Stage#preprocess() preprocess()} method of the
attached stage
@@ -103,30 +159,41 @@
public abstract void finish() throws StageException;
/**
- * Returns a list of unrecoverable errors that occurred during stage
- * processing.
- * @return A list of unrecoverable errors that occurred during stage
processing.
+ * Sets the failure tolerance flag for the worker thread. If faultTolerance
+ * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
+ * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
+ * processing, but will simply be logged with a severity of ERROR.
+ * If faultTolerance is set to ALL, runtime exceptions will also be
+ * logged and otherwise ignored.
+ * @param faultTolerance the flag value
*/
- public List<Throwable> getFatalErrors() {
- return this.errors;
+ public final void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
}
/**
+ * Getter for property faultTolerant.
+ * @return Value of property faultTolerant.
+ */
+ public final FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /**
* Store a fatal error.
* @param error The error to be stored for later analysis
*/
protected void recordFatalError(Throwable error) {
this.errors.add(error);
}
-
+
/**
- * Returns a list of errors that occurred while processing data objects,
- * along with the objects that were being processed when the errors
- * were generated.
- * @return The list of non-fatal processing errors.
+ * Returns a list of unrecoverable errors that occurred during stage
+ * processing.
+ * @return A list of unrecoverable errors that occurred during stage
processing.
*/
- public List<ProcessingException> getProcessingExceptions() {
- return this.processingExceptions;
+ public List<Throwable> getFatalErrors() {
+ return this.errors;
}
/**
@@ -138,4 +205,14 @@
ProcessingException ex = new ProcessingException(this.stage, error,
data, this.getState());
this.processingExceptions.add(ex);
}
+
+ /**
+ * Returns a list of errors that occurred while processing data objects,
+ * along with the objects that were being processed when the errors
+ * were generated.
+ * @return The list of non-fatal processing errors.
+ */
+ public List<ProcessingException> getProcessingExceptions() {
+ return this.processingExceptions;
+ }
}
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
---
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
(original)
+++
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
Fri Aug 3 21:31:35 2007
@@ -45,17 +45,11 @@
//poll timeout to ensure deadlock cannot occur on thread termination
private long timeout;
- //flag describing whether or not the driver is fault tolerant
- private FaultTolerance faultTolerance = FaultTolerance.NONE;
-
//thread responsible for stage processing
private Thread workerThread;
//queue to hold data to be processed
- private BlockingQueue queue;
-
- //current state of thread processing
- private volatile State currentState = State.STOPPED;
+ private BlockingQueue queue;
//feeder used to feed data to this stage's queue
private final Feeder feeder = new Feeder() {
@@ -92,10 +86,9 @@
* ([EMAIL PROTECTED] Stage#release()} will be called.)
*/
public DedicatedThreadStageDriver(Stage stage, StageContext context,
BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
- super(stage, context);
+ super(stage, context, faultTolerance);
this.queue = queue;
this.timeout = timeout;
- this.faultTolerance = faultTolerance;
}
/**
@@ -156,81 +149,6 @@
setState(STOPPED);
}
- /**
- * Return the current state of stage processing.
- * @return the current state of processing
- */
- public StageDriver.State getState() {
- return this.currentState;
- }
-
- /**
- * Atomically tests to determine whether or not the driver is in the one of
- * the specified states.
- */
- private synchronized boolean isInState(State... states) {
- for (State state : states) if (state == currentState) return true;
- return false;
- }
-
- /**
- * Set the current state of stage processing and notify any listeners
- * that may be waiting on a state change.
- */
- private synchronized void setState(State nextState) {
- if (log.isDebugEnabled()) log.debug("State change for " + stage + ": "
+ this.currentState + " -> " + nextState);
- this.currentState = nextState;
- this.notifyAll();
- }
-
- /**
- * This method performs an atomic conditional state transition change
- * to the value specified by the nextState parameter if and only if the
- * current state is equal to the test state.
- */
- private synchronized boolean testAndSetState(State testState, State
nextState) {
- if (currentState == testState) {
- setState(nextState);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Sets the failure tolerance flag for the worker thread. If faultTolerance
- * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
- * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
- * processing, but will simply be logged with a severity of ERROR.
- * If faultTolerance is set to ALL, runtime exceptions will also be
- * logged and otherwise ignored.
- * @param faultTolerance the flag value
- */
- public final void setFaultTolerance(String faultTolerance) {
- this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
- }
-
- /**
- * Sets the failure tolerance flag for the worker thread. If faultTolerance
- * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
- * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
- * processing, but will simply be logged with a severity of ERROR.
- * If faultTolerance is set to ALL, runtime exceptions will also be
- * logged and otherwise ignored.
- * @param faultTolerance the flag value
- */
- public final void setFaultTolerance(FaultTolerance faultTolerance) {
- this.faultTolerance = faultTolerance;
- }
-
- /**
- * Getter for property faultTolerant.
- * @return Value of property faultTolerant.
- */
- public FaultTolerance getFaultTolerance() {
- return this.faultTolerance;
- }
-
/*********************************
* WORKER THREAD IMPLEMENTATIONS *
*********************************/
@@ -279,6 +197,7 @@
Object obj = queue.poll(timeout,
TimeUnit.MILLISECONDS);
if (obj == null) {
if (currentState == STOP_REQUESTED) break running;
+ //else continue running;
} else {
try {
stage.process(obj);
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
---
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
(original)
+++
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
Fri Aug 3 21:31:35 2007
@@ -23,10 +23,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.driver.AbstractStageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.StageContext;
import static org.apache.commons.pipeline.StageDriver.State.*;
+import org.apache.commons.pipeline.StageDriver.State;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
/**
* This is a non-threaded version of the AbstractStageDriver.
@@ -34,12 +35,6 @@
public class SynchronousStageDriver extends AbstractStageDriver {
private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
- //flag describing whether or not the driver is fault tolerant
- private boolean faultTolerant = false;
-
- //current state of thread processing
- private State currentState = State.STOPPED;
-
//queue of objects to be processed that are fed to the driver
//when it is not in a running state
private Queue<Object> queue = new LinkedList<Object>();
@@ -48,18 +43,19 @@
private final Feeder feeder = new Feeder() {
public void feed(Object obj) {
synchronized (SynchronousStageDriver.this) {
+ if (currentState == ERROR) throw new
IllegalStateException("Unable to process data: driver in fatal error state.");
if (currentState != RUNNING) { //enqueue objects if stage has
not been started
- if (currentState == ERROR) throw new
IllegalStateException("Unable to process data: driver in fatal error state.");
queue.add(obj);
- } else {
- try {
- stage.process(obj);
- } catch (StageException e) {
- recordProcessingException(obj, e);
- if (!faultTolerant) throw fatalError(e);
- }
+ return;
}
}
+
+ try {
+ stage.process(obj);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == NONE) throw fatalError(e);
+ }
}
};
@@ -68,8 +64,8 @@
* @param stage The stage to be run
* @param context The context in which the stage will be run
*/
- public SynchronousStageDriver(Stage stage, StageContext context) {
- super(stage, context);
+ public SynchronousStageDriver(Stage stage, StageContext context,
FaultTolerance faultTolerance) {
+ super(stage, context, faultTolerance);
}
/**
@@ -92,13 +88,11 @@
if (this.currentState == STOPPED) {
try {
stage.preprocess();
+ setState(RUNNING);
} catch (StageException e) {
throw fatalError(e);
}
- this.currentState = RUNNING;
- this.notifyAll();
-
// feed any queued values before returning control
while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
} else {
@@ -112,33 +106,29 @@
* @throws org.apache.commons.pipeline.StageException Thrown if an error
occurs during postprocessing
*/
public synchronized void finish() throws StageException {
- if (this.currentState == RUNNING) {
+ if (this.currentState == RUNNING) {
try {
- stage.postprocess();
+ testAndSetState(RUNNING, STOP_REQUESTED);
+ if (this.currentState == STOP_REQUESTED) stage.postprocess();
} catch (StageException e) {
throw fatalError(e);
- }
-
- stage.release();
-
- this.currentState = STOPPED;
- this.notifyAll();
+ } finally {
+ stage.release();
+ testAndSetState(STOP_REQUESTED, STOPPED);
+ }
} else {
throw new IllegalStateException("Driver is not running (current
state: " + this.currentState + ")");
}
}
/**
- * Accessor for the current state of the stage driver
- * @return the current driver state
+ * This method obtains a lock to set the current state of processing
+ * to error, records the error and returns a RuntimeException encapsulating
+ * the specified throwable.
*/
- public synchronized State getState() {
- return this.currentState;
- }
-
- private synchronized RuntimeException fatalError(Throwable t) {
+ private RuntimeException fatalError(Throwable t) {
try {
- this.currentState = ERROR;
+ setState(ERROR);
this.recordFatalError(t);
stage.release();
this.notifyAll();
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
---
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
(original)
+++
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriverFactory.java
Fri Aug 3 21:31:35 2007
@@ -40,6 +40,27 @@
* @return the newly created and configured driver
*/
public StageDriver createStageDriver(Stage stage, StageContext context) {
- return new SynchronousStageDriver(stage, context);
+ return new SynchronousStageDriver(stage, context, this.faultTolerance);
}
+
+ /**
+ * Holds value of property faultTolerance. Default value is [EMAIL
PROTECTED] FaultTolerance.NONE}.
+ */
+ private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+ /**
+ * Getter for property faultTolerance.
+ * @return Value of property faultTolerance.
+ */
+ public FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /**
+ * Setter for property faultTolerance.
+ * @param faultTolerance New value of property faultTolerance.
+ */
+ public void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
+ }
}
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?view=diff&rev=562653&r1=562652&r2=562653
==============================================================================
---
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
(original)
+++
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
Fri Aug 3 21:31:35 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- */
+ * under the License.
+ */
package org.apache.commons.pipeline.driver;
@@ -41,40 +41,37 @@
* to process objects from an input queue.
*/
public class ThreadPoolStageDriver extends AbstractStageDriver {
+ // logger for the class
private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
- //wait timeout to ensure deadlock cannot occur on thread termination
+ // wait timeout to ensure deadlock cannot occur on thread termination
private long timeout;
- //flag describing whether or not the driver is fault tolerant
- private FaultTolerance faultTolerance = FaultTolerance.NONE;
-
- //signal telling threads to start polling queue
- final private CountDownLatch startSignal;
+ // signal telling threads to start polling queue
+ private final CountDownLatch startSignal;
- //signal threads use to tell driver they have finished
- final private CountDownLatch doneSignal;
+ // signal threads use to tell driver they have finished
+ private final CountDownLatch doneSignal;
// number of threads polling queue
- private int numThreads = 1;
-
- //queue to hold data to be processed
- private BlockingQueue queue;
+ private final int numThreads;
- //current state of thread processing
- private volatile State currentState = State.STOPPED;
+ // queue to hold data to be processed
+ private final BlockingQueue queue;
//feeder used to feed data to this stage's queue
private final Feeder feeder = new Feeder() {
public void feed(Object obj) {
if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage
" + stage
+ " (" +
ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in
queue)");
+
try {
ThreadPoolStageDriver.this.queue.put(obj);
} catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interrupt while
waiting for space to become available for object "
+ obj + " in queue for stage " + stage, e);
}
+
synchronized(ThreadPoolStageDriver.this) {
ThreadPoolStageDriver.this.notifyAll();
}
@@ -103,7 +100,7 @@
long timeout,
FaultTolerance faultTolerance,
int numThreads) {
- super(stage, context);
+ super(stage, context, faultTolerance);
this.numThreads = numThreads;
this.startSignal = new CountDownLatch(1);
@@ -111,7 +108,6 @@
this.queue = queue;
this.timeout = timeout;
- this.faultTolerance = faultTolerance;
}
/**
@@ -146,13 +142,6 @@
startSignal.countDown();
log.debug("Worker threads for stage " + stage + " started.");
-
- //wait to ensure that the stage starts up correctly
- try {
- while ( !(this.currentState == RUNNING || this.currentState ==
ERROR) ) this.wait();
- } catch (InterruptedException e) {
- throw new StageException(this.getStage(), "Worker thread
unexpectedly interrupted while waiting for thread startup.", e);
- }
} else {
throw new IllegalStateException("Attempt to start driver in state
" + this.currentState);
}
@@ -160,7 +149,11 @@
/**
* Causes processing to shut down gracefully. Waits until all worker
threads
- * have completed.
+ * have completed. It is important that this method be called only after
+ * the completion of execution of finish() in the driver for the prior
+ * stage; parallel finish calls can cause the stage to shut down before
+ * all prior stages have finished processing.
+ *
* @throws org.apache.commons.pipeline.StageException Thrown if the driver
is in an illegal state for shutdown.
*/
public synchronized void finish() throws StageException {
@@ -169,6 +162,9 @@
}
try {
+ //it may be the case that finish() is called when the driver is
still in the process
+ //of starting up, so it is necessary to wait to enter the running
state before
+ //a stop can be requested
while ( !(this.currentState == RUNNING || this.currentState ==
ERROR) ) this.wait();
//ask the worker threads to shut down
@@ -176,19 +172,27 @@
if (log.isDebugEnabled()) log.debug("Waiting for worker threads to
stop for stage " + stage + ".");
doneSignal.await();
+ if (log.isDebugEnabled()) log.debug("Worker threads for stage " +
stage + " halted");
- if (log.isDebugEnabled()) log.debug("Postprocessing stage " +
stage + "...");
- ThreadPoolStageDriver.this.stage.postprocess();
- if (log.isDebugEnabled()) log.debug("Postprocessing for stage " +
stage + " complete.");
-
- //do not transition into finished state if an error has occurred
+ //transition into finished state (not used internally?)
testAndSetState(STOP_REQUESTED, FINISHED);
- while ( !(this.currentState == FINISHED || this.currentState ==
ERROR) ) this.wait();
+ //do not run postprocessing if the driver is in an error state
+ if (this.currentState != ERROR) {
+ if (log.isDebugEnabled()) log.debug("Postprocessing stage " +
stage + "...");
+ this.stage.postprocess();
+ if (log.isDebugEnabled()) log.debug("Postprocessing for stage
" + stage + " complete.");
+ }
+
+ //the following lines appear to be artifacts of copy-and-paste from
+ //DedicatedThreadStageDriver.
+// //do not transition into finished state if an error has occurred
+// testAndSetState(STOP_REQUESTED, FINISHED);
+//
+// while ( !(this.currentState == FINISHED || this.currentState ==
ERROR) ) this.wait();
- log.debug("Worker threads for stage " + stage + " halted");
} catch (StageException e) {
- log.error("An error occurred during postprocess for stage " +
stage , e);
+ log.error("An error occurred during postprocessing of stage " +
stage , e);
recordFatalError(e);
setState(ERROR);
} catch (InterruptedException e) {
@@ -199,82 +203,31 @@
if (log.isDebugEnabled()) log.debug("Stage " + stage + "
released.");
}
- setState(STOPPED);
+ testAndSetState(FINISHED, STOPPED);
}
/**
- * Return the current state of stage processing.
- * @return the current state of processing
- */
- public StageDriver.State getState() {
- return this.currentState;
- }
-
- /**
- * Atomically tests to determine whether or not the driver is in the one of
- * the specified states.
- */
- private synchronized boolean isInState(State... states) {
- for (State state : states) if (state == currentState) return true;
- return false;
- }
-
- /**
- * Set the current state of stage processing and notify any listeners
- * that may be waiting on a state change.
- */
- private synchronized void setState(State nextState) {
- if (log.isDebugEnabled()) log.debug("State change for " + stage + ": "
+ this.currentState + " -> " + nextState);
- this.currentState = nextState;
- this.notifyAll();
- }
-
- /**
- * This method performs an atomic conditional state transition change
- * to the value specified by the nextState parameter if and only if the
- * current state is equal to the test state.
- */
- private synchronized boolean testAndSetState(State testState, State
nextState) {
- if (currentState == testState) {
- setState(nextState);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Sets the failure tolerance flag for the worker thread. If faultTolerance
- * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
- * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
- * processing, but will simply be logged with a severity of ERROR.
- * If faultTolerance is set to ALL, runtime exceptions will also be
- * logged and otherwise ignored.
- * @param faultTolerance the flag value
+ * Get the size of the queue used by this StageDriver.
+ * @return the queue capacity
*/
- public final void setFaultTolerance(String faultTolerance) {
- this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
+ public int getQueueSize() {
+ return this.queue.size() + this.queue.remainingCapacity();
}
/**
- * Sets the failure tolerance flag for the worker thread. If faultTolerance
- * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
- * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
- * processing, but will simply be logged with a severity of ERROR.
- * If faultTolerance is set to ALL, runtime exceptions will also be
- * logged and otherwise ignored.
- * @param faultTolerance the flag value
+ * Get the timeout value (in milliseconds) used by this StageDriver on
+ * thread termination.
+ * @return the timeout setting in milliseconds
*/
- public final void setFaultTolerance(FaultTolerance faultTolerance) {
- this.faultTolerance = faultTolerance;
+ public long getTimeout() {
+ return this.timeout;
}
/**
- * Getter for property faultTolerant.
- * @return Value of property faultTolerant.
+ * Returns the number of threads allocated to the thread pool.
*/
- public FaultTolerance getFaultTolerance() {
- return this.faultTolerance;
+ public int getNumThreads() {
+ return numThreads;
}
/*********************************
@@ -304,7 +257,7 @@
LatchWorkerThread(int threadID) {
this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
this.threadID = threadID;
- }
+ }
public final void run() {
try {
@@ -337,7 +290,7 @@
recordFatalError(e);
setState(ERROR);
} catch (InterruptedException e) {
- log.error("Stage " + stage + " (threadID: " + threadID + ")
interrupted while waiting for barrier",e);
+ log.error("Stage " + stage + " (threadID: " + threadID + ")
interrupted while waiting for barrier", e);
recordFatalError(e);
setState(ERROR);
} finally {
@@ -347,29 +300,5 @@
}
}
}
- }
-
- /**
- * Get the size of the queue used by this StageDriver.
- * @return the queue capacity
- */
- public int getQueueSize() {
- return this.queue.size() + this.queue.remainingCapacity();
- }
-
- /**
- * Get the timeout value (in milliseconds) used by this StageDriver on
- * thread termination.
- * @return the timeout setting in milliseconds
- */
- public long getTimeout() {
- return this.timeout;
- }
-
- /**
- * Returns the number of threads allocated to the thread pool.
- */
- public int getNumThreads() {
- return numThreads;
- }
+ }
}