Author: nuttycom
Date: Tue Oct 10 14:21:02 2006
New Revision: 462575
URL: http://svn.apache.org/viewvc?view=rev&rev=462575
Log:
Initial import. Stage implementations contributed by Steve Christensen.
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java
(with props)
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?view=auto&rev=462575
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
Tue Oct 10 14:21:02 2006
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+
+/**
+ * This is a very simple implementation of a AbstractStageDriver which spawns
+ * a single thread to process a stage.
+ */
+public class ThreadPoolStageDriver extends AbstractStageDriver {
+ private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
+
+ //wait timeout to ensure deadlock cannot occur on thread termination
+ private long timeout;
+
+ //flag describing whether or not the driver is fault tolerant
+ private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+ //signal telling threads to start polling queue
+ final private CountDownLatch startSignal;
+
+ //signal threads use to tell driver they have finished
+ final private CountDownLatch doneSignal;
+
+ // number of threads polling queue
+ private int numThreads = 1;
+
+ //queue to hold data to be processed
+ private BlockingQueue queue;
+
+ //current state of thread processing
+ private volatile State currentState = State.STOPPED;
+
+ //feeder used to feed data to this stage's queue
+ private final Feeder feeder = new Feeder() {
+ public void feed(Object obj) {
+ if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage
" + stage
+ + " (" +
ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in
queue)");
+ try {
+ ThreadPoolStageDriver.this.queue.put(obj);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Unexpected interrupt while
waiting for space to become available for object "
+ + obj + " in queue for stage " + stage, e);
+ }
+ synchronized(ThreadPoolStageDriver.this) {
+ ThreadPoolStageDriver.this.notifyAll();
+ }
+ }
+ };
+
+ /**
+ * Creates a new ThreadPoolStageDriver with the specified thread wait
+ * timeout and fault tolerance values.
+ * @param stage The stage that the driver will run
+ * @param context the context in which to run the stage
+ * @param queue The object queue to use for storing objects prior to
processing. The
+ * default is [EMAIL PROTECTED] LinkedBlockingQueue}
+ * @param timeout The amount of time, in milliseconds, that the worker
thread
+ * will wait before checking the processing state if no objects are
available
+ * in the thread's queue.
+ * @param faultTolerance Flag determining the behavior of the driver when
+ * an error is encountered in execution of [EMAIL PROTECTED]
Stage#process(Object)}.
+ * If this is set to false, any exception thrown during [EMAIL PROTECTED]
Stage#process(Object)}
+ * will cause the worker thread to halt without executing [EMAIL
PROTECTED] Stage#postprocess()}
+ * ([EMAIL PROTECTED] Stage#release()} will be called.)
+ * @param numThreads Number of threads that will be simultaneously reading
from queue
+ */
+ public ThreadPoolStageDriver(Stage stage, StageContext context,
+ BlockingQueue queue,
+ long timeout,
+ FaultTolerance faultTolerance,
+ int numThreads) {
+ super(stage, context);
+ this.numThreads = numThreads;
+
+ this.startSignal = new CountDownLatch(1);
+ this.doneSignal = new CountDownLatch(this.numThreads);
+
+ this.queue = queue;
+ this.timeout = timeout;
+ this.faultTolerance = faultTolerance;
+ }
+
+ /**
+ * Return the Feeder used to feed data to the queue of objects to be
processed.
+ * @return The feeder for objects processed by this driver's stage.
+ */
+ public Feeder getFeeder() {
+ return this.feeder;
+ }
+
+ /**
+ * Start the processing of the stage. Creates threads to poll items
+ * from queue.
+ * @throws org.apache.commons.pipeline.StageException Thrown if the driver
is in an illegal state during startup
+ */
+ public synchronized void start() throws StageException {
+ if (this.currentState == STOPPED) {
+ setState(STARTED);
+
+ if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage
+ "...");
+ stage.preprocess();
+ if (log.isDebugEnabled()) log.debug("Preprocessing for stage " +
stage + " complete.");
+
+ log.debug("Starting worker threads for stage " + stage + ".");
+
+ for (int i=0;i<this.numThreads;i++) {
+ new LatchWorkerThread(i).start();
+ }
+
+ // let threads know they can start
+ testAndSetState(STARTED, RUNNING);
+ startSignal.countDown();
+
+ log.debug("Worker threads for stage " + stage + " started.");
+
+ //wait to ensure that the stage starts up correctly
+ try {
+ while ( !(this.currentState == RUNNING || this.currentState ==
ERROR) ) this.wait();
+ } catch (InterruptedException e) {
+ throw new StageException(this.getStage(), "Worker thread
unexpectedly interrupted while waiting for thread startup.", e);
+ }
+ } else {
+ throw new IllegalStateException("Attempt to start driver in state
" + this.currentState);
+ }
+ }
+
+ /**
+ * Causes processing to shut down gracefully. Waits until all worker
threads
+ * have completed.
+ * @throws org.apache.commons.pipeline.StageException Thrown if the driver
is in an illegal state for shutdown.
+ */
+ public synchronized void finish() throws StageException {
+ if (currentState == STOPPED) {
+ throw new IllegalStateException("The driver is not currently
running.");
+ }
+
+ try {
+ while ( !(this.currentState == RUNNING || this.currentState ==
ERROR) ) this.wait();
+
+ //ask the worker threads to shut down
+ testAndSetState(RUNNING, STOP_REQUESTED);
+
+ if (log.isDebugEnabled()) log.debug("Waiting for worker threads to
stop for stage " + stage + ".");
+ doneSignal.await();
+
+ if (log.isDebugEnabled()) log.debug("Postprocessing stage " +
stage + "...");
+ ThreadPoolStageDriver.this.stage.postprocess();
+ if (log.isDebugEnabled()) log.debug("Postprocessing for stage " +
stage + " complete.");
+
+ //do not transition into finished state if an error has occurred
+ testAndSetState(STOP_REQUESTED, FINISHED);
+
+ while ( !(this.currentState == FINISHED || this.currentState ==
ERROR) ) this.wait();
+
+ log.debug("Worker threads for stage " + stage + " halted");
+ } catch (StageException e) {
+ log.error("An error occurred during postprocess for stage " +
stage , e);
+ recordFatalError(e);
+ setState(ERROR);
+ } catch (InterruptedException e) {
+ throw new StageException(this.getStage(), "StageDriver
unexpectedly interrupted while waiting for shutdown of worker threads.", e);
+ } finally {
+ if (log.isDebugEnabled()) log.debug("Releasing resources for stage
" + stage + "...");
+ stage.release();
+ if (log.isDebugEnabled()) log.debug("Stage " + stage + "
released.");
+ }
+
+ setState(STOPPED);
+ }
+
+ /**
+ * Return the current state of stage processing.
+ * @return the current state of processing
+ */
+ public StageDriver.State getState() {
+ return this.currentState;
+ }
+
+ /**
+ * Atomically tests to determine whether or not the driver is in the one of
+ * the specified states.
+ */
+ private synchronized boolean isInState(State... states) {
+ for (State state : states) if (state == currentState) return true;
+ return false;
+ }
+
+ /**
+ * Set the current state of stage processing and notify any listeners
+ * that may be waiting on a state change.
+ */
+ private synchronized void setState(State nextState) {
+ if (log.isDebugEnabled()) log.debug("State change for " + stage + ": "
+ this.currentState + " -> " + nextState);
+ this.currentState = nextState;
+ this.notifyAll();
+ }
+
+ /**
+ * This method performs an atomic conditional state transition change
+ * to the value specified by the nextState parameter if and only if the
+ * current state is equal to the test state.
+ */
+ private synchronized boolean testAndSetState(State testState, State
nextState) {
+ if (currentState == testState) {
+ setState(nextState);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Sets the failure tolerance flag for the worker thread. If faultTolerance
+ * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
+ * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
+ * processing, but will simply be logged with a severity of ERROR.
+ * If faultTolerance is set to ALL, runtime exceptions will also be
+ * logged and otherwise ignored.
+ * @param faultTolerance the flag value
+ */
+ public final void setFaultTolerance(String faultTolerance) {
+ this.faultTolerance = FaultTolerance.valueOf(faultTolerance);
+ }
+
+ /**
+ * Sets the failure tolerance flag for the worker thread. If faultTolerance
+ * is set to CHECKED, [EMAIL PROTECTED] StageException StageException}s
thrown by
+ * the [EMAIL PROTECTED] Stage#process(Object)} method will not interrupt
queue
+ * processing, but will simply be logged with a severity of ERROR.
+ * If faultTolerance is set to ALL, runtime exceptions will also be
+ * logged and otherwise ignored.
+ * @param faultTolerance the flag value
+ */
+ public final void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
+ }
+
+ /**
+ * Getter for property faultTolerant.
+ * @return Value of property faultTolerant.
+ */
+ public FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /*********************************
+ * WORKER THREAD IMPLEMENTATIONS *
+ *********************************/
+ private UncaughtExceptionHandler workerThreadExceptionHandler = new
UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ setState(ERROR);
+ recordFatalError(e);
+ log.error("Uncaught exception in stage " + stage, e);
+ }
+ };
+
+ /**
+ * This worker thread removes and processes data objects from the incoming
+ * synchronize queue. It calls the Stage's process() method to process data
+ * from the queue. This loop runs until State has changed to
+ * STOP_REQUESTED. To break the loop the calling code must run the writer's
+ * finish() method to set the running property to false.
+ *
+ * @throws StageException if an error is encountered during data processing
+ * and faultTolerant is set to false.
+ */
+ private class LatchWorkerThread extends Thread {
+ final int threadID;
+
+ LatchWorkerThread(int threadID) {
+ this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
+ this.threadID = threadID;
+ }
+
+ public final void run() {
+ try {
+ ThreadPoolStageDriver.this.startSignal.await();
+ //do not transition into running state if an error has
occurred or a stop requested
+ running: while (currentState != ERROR) {
+ try {
+ Object obj = queue.poll(timeout,
TimeUnit.MILLISECONDS);
+ if (obj == null) {
+ if (currentState == STOP_REQUESTED) break running;
+ } else {
+ try {
+ stage.process(obj);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == NONE) throw e;
+ } catch (RuntimeException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == CHECKED ||
faultTolerance == NONE) throw e;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Worker thread " +
this.threadID + " unexpectedly interrupted while waiting on data for stage " +
stage, e);
+ }
+ }
+ if (log.isDebugEnabled()) log.debug("Stage " + stage + "
(threadID: " + this.threadID + ") exited running state.");
+
+ } catch (StageException e) {
+ log.error("An error occurred in the stage " + stage + "
(threadID: " + this.threadID + ")", e);
+ recordFatalError(e);
+ setState(ERROR);
+ } catch (InterruptedException e) {
+ log.error("Stage " + stage + " (threadID: " + threadID + ")
interrupted while waiting for barrier",e);
+ recordFatalError(e);
+ setState(ERROR);
+ } finally {
+ doneSignal.countDown();
+ synchronized (ThreadPoolStageDriver.this) {
+ ThreadPoolStageDriver.this.notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the size of the queue used by this StageDriver.
+ * @return the queue capacity
+ */
+ public int getQueueSize() {
+ return this.queue.size() + this.queue.remainingCapacity();
+ }
+
+ /**
+ * Get the timeout value (in milliseconds) used by this StageDriver on
+ * thread termination.
+ * @return the timeout setting in milliseconds
+ */
+ public long getTimeout() {
+ return this.timeout;
+ }
+
+ /**
+ * Returns the number of threads allocated to the thread pool.
+ */
+ public int getNumThreads() {
+ return numThreads;
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java?view=auto&rev=462575
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java
Tue Oct 10 14:21:02 2006
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageDriverFactory;
+
+/**
+ * This factory is used to create ThreadPoolStageDriver instances configured
+ * to run specific stages.
+ */
+public class ThreadPoolStageDriverFactory implements StageDriverFactory {
+
+ private int numThreads = 1;
+
+ /** Creates a new instance of ThreadPoolStageDriverFactory */
+ public ThreadPoolStageDriverFactory() {
+ }
+
+ /**
+ * Creates the new [EMAIL PROTECTED] ThreadPoolStageDriver} based upon the
configuration
+ * of this factory instance
+ * @param stage The stage to be run by the newly created driver
+ * @param context The context in which the stage will be run
+ * @return the newly created driver
+ */
+ public StageDriver createStageDriver(Stage stage, StageContext context) {
+ try {
+ return new ThreadPoolStageDriver(stage, context,
queueClass.newInstance(), timeout, faultTolerance, numThreads);
+ } catch (Exception e) {
+ throw new IllegalStateException("Instantiation of driver failed
due to illegal factory state.", e);
+ }
+ }
+
+ /**
+ * Holds value of property queueClass.
+ */
+ private Class<? extends BlockingQueue> queueClass =
LinkedBlockingQueue.class;
+
+ /**
+ * Getter for property queueClass.
+ * @return Value of property queueClass.
+ */
+ public Class<? extends BlockingQueue> getQueueClass() {
+ return this.queueClass;
+ }
+
+ /**
+ * Setter for property queueClass.
+ * @param queueClass New value of property queueClass.
+ */
+ public void setQueueClass(Class<? extends BlockingQueue> queueClass) {
+ if (queueClass == null) throw new IllegalArgumentException("Queue
class may not be null.");
+ this.queueClass = queueClass;
+ }
+
+ /**
+ * Holds value of property timeout.
+ */
+ private long timeout = 500;
+
+ /**
+ * Timeout for wait to ensure deadlock cannot occur on thread termination.
+ * Default is 500
+ * @return Value of property timeout.
+ */
+ public long getTimeout() {
+ return this.timeout;
+ }
+
+ /**
+ * Setter for property timeout.
+ * @param timeout New value of property timeout.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Holds value of property faultTolerance.
+ */
+ private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+ /**
+ * Getter for property faultTolerance. See [EMAIL PROTECTED]
FaultTolerance} for valid values
+ * and enumation meanings.
+ * @return Value of property faultTolerance.
+ */
+ public FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /**
+ * Setter for property faultTolerance.
+ *
+ * @param faultTolerance New value of property faultTolerance.
+ */
+ public void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
+ }
+
+ /**
+ * Convenience setter for property faultTolerance for use by Digester.
+ *
+ * @param level New value of property level ("ALL","CHECKED", or "NONE").
+ */
+ public void setFaultToleranceLevel(String level) {
+ this.faultTolerance = FaultTolerance.valueOf(level);
+ }
+
+ /**
+ * Returns the number of threads that will be allocated to the thread
+ * pool of a driver created by this factory.
+ */
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ /**
+ * Sets the number of threads that will be allocated to the thread
+ * pool of a driver created by this factory.
+ */
+ public void setNumThreads(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java?view=auto&rev=462575
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java
Tue Oct 10 14:21:02 2006
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import junit.framework.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver.State;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+
+/**
+ *
+ *
+ */
+public class ThreadPoolStageDriverTest extends AbstractStageDriverTest {
+ private Log log;
+
+ public ThreadPoolStageDriverTest(String testName) {
+ super(testName);
+ this.log = LogFactory.getLog(ThreadPoolStageDriverTest.class);
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(ThreadPoolStageDriverTest.class);
+
+ return suite;
+ }
+ /**
+ * Test of getFeeder method, of class
org.apache.commons.pipeline.driver.SynchronousStageDriver.
+ */
+ public void testGetFeeder() {
+ log.debug("testGetFeeder
---------------------------------------------");
+ ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage,
context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5);
+
+ Feeder feeder = instance.getFeeder();
+ assertNotNull(feeder);
+ }
+
+ /**
+ * Due to the design of the ThreadPoolStageDriver, it is meaningless
+ * to independently test the start or finish methods; however, testing
+ * both together is meaningful. This test also provides verification of
+ * proper behavior of the getState() method.
+ */
+ public void testStartFinish() throws Exception {
+ log.debug("testStartFinish
-------------------------------------------");
+ ThreadPoolStageDriver instance = new ThreadPoolStageDriver(stage,
context, new LinkedBlockingQueue(), 500, FaultTolerance.NONE, 5);
+
+ assertEquals(State.STOPPED, instance.getState());
+
+ instance.start();
+
+ assertTrue(instance.getState() == State.STARTED || instance.getState()
== State.RUNNING);
+
+ instance.finish();
+
+ assertEquals(State.STOPPED, instance.getState());
+ }
+
+
+ /*********************
+ * INTEGRATION TESTS *
+ *********************/
+
+ public void testSingleStage() throws Exception {
+ log.debug("testSingleStage
-------------------------------------------");
+ StageDriverTestUtils.testSingleStage(this, new
ThreadPoolStageDriverFactory());
+ }
+
+ public void testMultiStage() throws Exception {
+ log.debug("testMultiStage
--------------------------------------------");
+ StageDriverTestUtils.testMultiStage(this, new
ThreadPoolStageDriverFactory());
+ }
+
+ public void testMultiFaultingStage() throws Exception {
+ log.debug("testMultiFaultingStage
------------------------------------");
+ ThreadPoolStageDriverFactory factory = new
ThreadPoolStageDriverFactory();
+ factory.setFaultTolerance(FaultTolerance.CHECKED);
+
+ StageDriverTestUtils.testMultiFaultingStage(this, factory);
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]