Author: nuttycom
Date: Thu Sep 7 15:39:56 2006
New Revision: 441279
URL: http://svn.apache.org/viewvc?view=rev&rev=441279
Log:
Refactored StageDriver to make it an interface; this will allow for
StageDrivers to be created that decorate other drivers with alternative
feeder behavior.
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
(with props)
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java?view=auto&rev=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
Thu Sep 7 15:39:56 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import org.apache.commons.pipeline.StageDriver.State;
+
+/**
+ * This exception class is used to store detailed information about
+ * a failure in the processing step of a stage including the failing data,
+ * the driver state at the time of failure, and any exceptions encountered.
+ */
+public class ProcessingException extends StageException {
+ private final Object data;
+ private final State driverState;
+
+ /**
+ * Creates a new instance of ProcessingException
+ *
+ * @param data The object which was not able to be processed.
+ * @param throwable The exception that occurred.
+ */
+ public ProcessingException(Stage stage, Throwable cause, Object data,
State driverState) {
+ super(stage, cause);
+ this.data = data;
+ this.driverState = driverState;
+ }
+
+ /** Returns the data
+ [EMAIL PROTECTED] The object which was not able to be processed.
+ */
+ public Object getData(){
+ return this.data;
+ }
+
+ public State getDriverState() {
+ return this.driverState;
+ }
+}
\ No newline at end of file
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/ProcessingException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/StageDriver.java
Thu Sep 7 15:39:56 2006
@@ -16,194 +16,97 @@
package org.apache.commons.pipeline;
-import java.util.ArrayList;
import java.util.List;
/**
* This interface is used to define how processing for a stage is started,
* stopped, and run. StageDriver implementations may run stages in one or
- * more threads, and use the [EMAIL PROTECTED] StageMonitor} interface to
provide communication
- * between the stage, the driver, and the enclosing pipeline.
+ * more threads, and use the [EMAIL PROTECTED] StageContext} interface to
provide
+ * communication between the stage and the context it is run, usually
+ * a pipeline.
+ *
+ *
*/
-public abstract class StageDriver {
+public interface StageDriver {
/**
* This enumeration represents possible states of the a stage driver during
* processing.
*/
- public enum State {
+ public enum State {
+ /** Resources have been released and all stage activity has stopped, or
+ * the stage has never been started. This is the default state. */
+ STOPPED,
/** The stage driver has started and the preprocess() method is being
run. */
- STARTED,
+ STARTED,
/** Preprocessing is complete and objects are being processed.*/
- RUNNING,
- /** A stop has been requested - the stage will finish processing,
+ RUNNING,
+ /** A stop has been requested - the stage will finish processing,
* then postprocess and shut down. */
- STOP_REQUESTED,
+ STOP_REQUESTED,
/** Postprocessing tasks are complete; the stage is shutting down. */
- FINISHED,
- /** Resources have been released and all stage activity has stopped.
- * This is the default state. */
- STOPPED,
- /** A fatal error has occurred that has caused the driver to stop in
an
- * inconsistent state. The driver cannot be restarted from the error
state.
+ FINISHED,
+ /** A fatal error has occurred that has caused the driver to stop in an
+ * inconsistent state. The driver cannot be restarted from the error
state.
* The error(s) can be obtained using the getFatalErrors method. */
- ERROR
- }
-
- /**
- * The stage to run.
- */
- protected Stage stage;
+ ERROR
+ }
/**
- * The context for the stage being run
+ * This method is used to start the driver, run the
+ * [EMAIL PROTECTED] Stage#preprocess() preprocess()} method of the
attached stage
+ * and to then begin processing any objects fed to this driver's Feeder.
+ *
+ * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during stage startup. In most cases, such errors
+ * will be handled internally by the driver.
*/
- protected StageContext context;
+ public void start() throws StageException;
/**
- * List of processing failures that have occurred.
- */
- protected List<ProcessingFailure> processingFailures = new
ArrayList<ProcessingFailure>();
-
- /**
- * List of errors that have occurred.
+ * This method waits for the stage(s) queue(s) to empty and any processor
thread(s) to exit
+ * cleanly and then calls release() to release any resources acquired
during processing, if possible.
+ *
+ * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during driver shutdown. Ordinarily such
+ * exceptions will be handled internally.
*/
- protected List<Throwable> errors = new ArrayList<Throwable>();
+ public void finish() throws StageException;
/**
- * Creates a StageDriver for the specified stage.
- * @param stage The stage for which the driver will be created
- * @param context The context in which to run the stage
+ * This method is used to provide a communication channel between the
context
+ * in which the driver is being run and the managed stage.
+ *
+ * @return the Feeder used to feed objects to the managed stage for
processing.
*/
- public StageDriver(Stage stage, StageContext context) {
- if (stage == null) throw new IllegalArgumentException("Stage may not
be null.");
- if (context == null) throw new IllegalArgumentException("Context may
not be null.");
- this.stage = stage;
- this.context = context;
- }
+ public Feeder getFeeder();
/**
* Returns the Stage being run by this StageDriver.
+ *
* @return The stage being run by this StageDriver instance
*/
- public Stage getStage() {
- return this.stage;
- }
-
- /**
- * This method is used to provide a communication channel between the
context
- * in which the driver is being run and the managed stage.
- * @return the Feeder used to feed objects to the managed stage for
processing.
- */
- public abstract Feeder getFeeder();
+ public Stage getStage();
/**
* Returns the current state of stage processing.
- * @return The current state
- */
- public abstract State getState();
-
- /**
- * This method is used to start the driver, run the
- * [EMAIL PROTECTED] Stage#preprocess() preprocess()} method of the
attached stage
- * and to then begin processing any objects fed to this driver's Feeder.
*
- * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during stage startup. In most cases, such errors
- * will be handled internally by the driver.
+ * @return The current state
*/
- public abstract void start() throws StageException;
+ public State getState();
/**
- * This method waits for the stage(s) queue(s) to empty and any processor
thread(s) to exit
- * cleanly and then calls release() to release any resources acquired
during processing, if possible.
- * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during driver shutdown. Ordinarily such
- * exceptions will be handled internally.
- */
- public abstract void finish() throws StageException;
-
- /**
* Returns a list of unrecoverable errors that occurred during stage
* processing.
+ *
* @return A list of unrecoverable errors that occurred during stage
processing.
*/
- public List<Throwable> getFatalErrors() {
- return this.errors;
- }
-
- /**
- * Store a fatal error.
- * @param error The error to be stored for later analysis
- */
- protected void recordFatalError(Throwable error) {
- this.errors.add(error);
- }
+ public List<Throwable> getFatalErrors();
/**
* Returns a list of errors that occurred while processing data objects,
* along with the objects that were being processed when the errors
* were generated.
+ *
* @return The list of non-fatal processing errors.
*/
- public List<ProcessingFailure> getProcessingFailures() {
- return this.processingFailures;
- }
-
- /**
- * Store processing failure information for the specified data object.
- * @param data The data being processed at the time of the error
- * @param error The error encountered
- */
- protected void recordProcessingFailure(Object data, Throwable error) {
- this.processingFailures.add(new ProcessingFailure(data, error));
- }
-
- /**
- * FailedProcess objects are used to store detailed information about
- * processing failures including the failing data, the driver state
- * at the time of failure
- */
- public class ProcessingFailure {
- private Object data;
- private Throwable throwable;
- private State driverState;
-
- /** Creates a new instance of FailedProcess
- [EMAIL PROTECTED] data The object which was not able to be processed.
- [EMAIL PROTECTED] throwable The exception that occurred.
- */
- protected ProcessingFailure(Object data, Throwable throwable){
- this.data = data;
- this.throwable = throwable;
- this.driverState = StageDriver.this.getState();
- }
-
- /** Returns the data
- [EMAIL PROTECTED] The object which was not able to be processed.
- */
- public Object getData(){
- return this.data;
- }
-
- /** Returns the exception
- [EMAIL PROTECTED] The throwable
- */
- public Throwable getThrowable(){
- return this.throwable;
- }
-
- /** Returns the stage which threw the exception.
- [EMAIL PROTECTED] Stage
- */
- public Stage getStage() {
- return StageDriver.this.getStage();
- }
-
- /**
- * This method is used to determine what stage driver handled a
particular error.
- * @return A reference to the stage driver that encountered the error.
- */
- public StageDriver getStageDriver() {
- return StageDriver.this;
- }
- }
+ public List<ProcessingException> getProcessingExceptions();
}
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java?view=auto&rev=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
Thu Sep 7 15:39:56 2006
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.pipeline.*;
+
+/**
+ * This interface is used to define how processing for a stage is started,
+ * stopped, and run. AbstractStageDriver implementations may run stages in one
or
+ * more threads, and use the [EMAIL PROTECTED] StageMonitor} interface to
provide communication
+ * between the stage, the driver, and the enclosing pipeline.
+ */
+public abstract class AbstractStageDriver implements StageDriver {
+
+ /**
+ * The stage to run.
+ */
+ protected Stage stage;
+
+ /**
+ * The context for the stage being run
+ */
+ protected StageContext context;
+
+ /**
+ * List of processing failures that have occurred.
+ */
+ protected List<ProcessingException> processingExceptions = new
ArrayList<ProcessingException>();
+
+ /**
+ * List of errors that have occurred.
+ */
+ protected List<Throwable> errors = new ArrayList<Throwable>();
+
+ /**
+ * Creates a StageDriver for the specified stage.
+ *
+ * @param stage The stage for which the driver will be created
+ * @param context The context in which to run the stage
+ */
+ public AbstractStageDriver(Stage stage, StageContext context) {
+ if (stage == null) throw new IllegalArgumentException("Stage may not
be null.");
+ if (context == null) throw new IllegalArgumentException("Context may
not be null.");
+ this.stage = stage;
+ this.context = context;
+ }
+
+ /**
+ * Returns the Stage being run by this StageDriver.
+ *
+ * @return The stage being run by this StageDriver instance
+ */
+ public Stage getStage() {
+ return this.stage;
+ }
+
+ /**
+ * This method is used to provide a communication channel between the
context
+ * in which the driver is being run and the managed stage.
+ * @return the Feeder used to feed objects to the managed stage for
processing.
+ */
+ public abstract Feeder getFeeder();
+
+ /**
+ * Returns the current state of stage processing.
+ * @return The current state
+ */
+ public abstract State getState();
+
+ /**
+ * This method is used to start the driver, run the
+ * [EMAIL PROTECTED] Stage#preprocess() preprocess()} method of the
attached stage
+ * and to then begin processing any objects fed to this driver's Feeder.
+ *
+ * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during stage startup. In most cases, such errors
+ * will be handled internally by the driver.
+ */
+ public abstract void start() throws StageException;
+
+ /**
+ * This method waits for the stage(s) queue(s) to empty and any processor
thread(s) to exit
+ * cleanly and then calls release() to release any resources acquired
during processing, if possible.
+ * @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during driver shutdown. Ordinarily such
+ * exceptions will be handled internally.
+ */
+ public abstract void finish() throws StageException;
+
+ /**
+ * Returns a list of unrecoverable errors that occurred during stage
+ * processing.
+ * @return A list of unrecoverable errors that occurred during stage
processing.
+ */
+ public List<Throwable> getFatalErrors() {
+ return this.errors;
+ }
+
+ /**
+ * Store a fatal error.
+ * @param error The error to be stored for later analysis
+ */
+ protected void recordFatalError(Throwable error) {
+ this.errors.add(error);
+ }
+
+ /**
+ * Returns a list of errors that occurred while processing data objects,
+ * along with the objects that were being processed when the errors
+ * were generated.
+ * @return The list of non-fatal processing errors.
+ */
+ public List<ProcessingException> getProcessingExceptions() {
+ return this.processingExceptions;
+ }
+
+ /**
+ * Store processing failure information for the specified data object.
+ * @param data The data being processed at the time of the error
+ * @param error The error encountered
+ */
+ protected void recordProcessingException(Object data, Throwable error) {
+ ProcessingException ex = new ProcessingException(this.stage, error,
data, this.getState());
+ this.processingExceptions.add(ex);
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/AbstractStageDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/DedicatedThreadStageDriver.java
Thu Sep 7 15:39:56 2006
@@ -21,20 +21,21 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.StageDriver;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.StageDriver.State;
import static org.apache.commons.pipeline.StageDriver.State.*;
+import org.apache.commons.pipeline.StageDriver.State;
import static org.apache.commons.pipeline.driver.FaultTolerance.*;
/**
- * This is a very simple implementation of a StageDriver which spawns
+ * This is a very simple implementation of a AbstractStageDriver which spawns
* a single thread to process a stage.
*/
-public class DedicatedThreadStageDriver extends StageDriver {
+public class DedicatedThreadStageDriver extends AbstractStageDriver {
private final Log log =
LogFactory.getLog(DedicatedThreadStageDriver.class);
//wait timeout to ensure deadlock cannot occur on thread termination
@@ -52,6 +53,24 @@
//current state of thread processing
private volatile State currentState = State.STOPPED;
+ //feeder used to feed data to this stage's queue
+ private final Feeder feeder = new Feeder() {
+ public void feed(Object obj) {
+ if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage
" + stage
+ + " (" +
DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots
in queue)");
+ try {
+ DedicatedThreadStageDriver.this.queue.put(obj);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Unexpected interrupt while
waiting for space to become available for object "
+ + obj + " in queue for stage " + stage, e);
+ }
+
+ synchronized(DedicatedThreadStageDriver.this) {
+ DedicatedThreadStageDriver.this.notifyAll();
+ }
+ }
+ };
+
/**
* Creates a new DedicatedThreadStageDriver with the specified thread wait
* timeout and fault tolerance values.
@@ -62,7 +81,7 @@
* @param timeout The amount of time, in milliseconds, that the worker
thread
* will wait before checking the processing state if no objects are
available
* in the thread's queue.
- * @param faultTolerant Flag determining the behavior of the driver when
+ * @param faultTolerance Flag determining the behavior of the driver when
* an error is encountered in execution of [EMAIL PROTECTED]
Stage#process(Object)}.
* If this is set to false, any exception thrown during [EMAIL PROTECTED]
Stage#process(Object)}
* will cause the worker thread to halt without executing [EMAIL
PROTECTED] Stage#postprocess()}
@@ -80,15 +99,7 @@
* @return The feeder for objects processed by this driver's stage.
*/
public Feeder getFeeder() {
- return new Feeder() {
- public void feed(Object obj) {
- if (log.isDebugEnabled()) log.debug(obj + " is being fed to
stage " + stage);
- DedicatedThreadStageDriver.this.queue.add(obj);
- synchronized(DedicatedThreadStageDriver.this) {
- DedicatedThreadStageDriver.this.notifyAll();
- }
- }
- };
+ return this.feeder;
}
/**
@@ -106,7 +117,7 @@
try {
while ( !(this.currentState == RUNNING || this.currentState ==
ERROR) ) this.wait();
} catch (InterruptedException e) {
- throw new StageException("Worker thread unexpectedly
interrupted while waiting for thread startup.", e);
+ throw new StageException(this.getStage(), "Worker thread
unexpectedly interrupted while waiting for thread startup.", e);
}
} else {
throw new IllegalStateException("Attempt to start driver in state
" + this.currentState);
@@ -135,7 +146,7 @@
log.debug("Worker thread for stage " + stage + " halted");
} catch (InterruptedException e) {
- throw new StageException("Worker thread unexpectedly interrupted
while waiting for graceful shutdown.", e);
+ throw new StageException(this.getStage(), "Worker thread
unexpectedly interrupted while waiting for graceful shutdown.", e);
}
setState(STOPPED);
@@ -268,11 +279,11 @@
try {
stage.process(obj);
} catch (StageException e) {
- recordProcessingFailure(obj, e);
+ recordProcessingException(obj, e);
if (faultTolerance == NONE) throw e;
} catch (RuntimeException e) {
- recordProcessingFailure(obj, e);
- if (faultTolerance == CHECKED ||
faultTolerance == NONE) throw e;
+ recordProcessingException(obj, e);
+ if (faultTolerance == CHECKED ||
faultTolerance == NONE) throw e;
}
}
} catch (InterruptedException e) {
@@ -298,5 +309,22 @@
//do not transition into finished state if an error has occurred
testAndSetState(STOP_REQUESTED, FINISHED);
}
+ }
+
+ /**
+ * Get the size of the queue used by this StageDriver.
+ * @return the queue capacity
+ */
+ public int getQueueSize() {
+ return this.queue.size() + this.queue.remainingCapacity();
+ }
+
+ /**
+ * Get the timeout value (in milliseconds) used by this StageDriver on
+ * thread termination.
+ * @return the timeout setting in milliseconds
+ */
+ public long getTimeout() {
+ return this.timeout;
}
}
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/SynchronousStageDriver.java
Thu Sep 7 15:39:56 2006
@@ -22,16 +22,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.StageContext;
-import org.apache.commons.pipeline.StageDriver.State;
import static org.apache.commons.pipeline.StageDriver.State.*;
/**
- * This is a non-threaded version of the StageDriver.
+ * This is a non-threaded version of the AbstractStageDriver.
*/
-public class SynchronousStageDriver extends StageDriver {
+public class SynchronousStageDriver extends AbstractStageDriver {
private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
//flag describing whether or not the driver is fault tolerant
@@ -44,6 +43,25 @@
//when it is not in a running state
private Queue<Object> queue = new LinkedList<Object>();
+ //Feeder used to feed objects to this stage
+ private final Feeder feeder = new Feeder() {
+ public void feed(Object obj) {
+ synchronized (SynchronousStageDriver.this) {
+ if (currentState != RUNNING) { //enqueue objects if stage has
not been started
+ if (currentState == ERROR) throw new
IllegalStateException("Unable to process data: driver in fatal error state.");
+ queue.add(obj);
+ } else {
+ try {
+ stage.process(obj);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (!faultTolerant) throw fatalError(e);
+ }
+ }
+ }
+ }
+ };
+
/**
* Creates a new instance of SimpleStageDriver
* @param stage The stage to be run
@@ -61,23 +79,7 @@
* @return The Feeder instance for the stage.
*/
public Feeder getFeeder() {
- return new Feeder() {
- public void feed(Object obj) {
- synchronized (SynchronousStageDriver.this) {
- if (currentState != RUNNING) { //enqueue objects if stage
has not been started
- if (currentState == ERROR) throw new
IllegalStateException("Unable to process data: driver in fatal error state.");
- queue.add(obj);
- } else {
- try {
- stage.process(obj);
- } catch (StageException e) {
- recordProcessingFailure(obj, e);
- if (!faultTolerant) throw fatalError(e);
- }
- }
- }
- }
- };
+ return this.feeder;
}
/**
@@ -95,7 +97,7 @@
this.currentState = RUNNING;
this.notifyAll();
-
+
// feed any queued values before returning control
while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
} else {
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
Thu Sep 7 15:39:56 2006
@@ -17,8 +17,8 @@
package org.apache.commons.pipeline.driver;
import junit.framework.TestCase;
-import org.apache.commons.pipeline.testFramework.FaultingTestStage;
import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.testFramework.FaultingTestStage;
import org.apache.commons.pipeline.StageDriverFactory;
import org.apache.commons.pipeline.testFramework.TestFeeder;
import org.apache.commons.pipeline.testFramework.TestStage;
@@ -139,7 +139,7 @@
test.assertEquals("Incorrect processed object count from stage 0.", 3,
stage0.processedObjects.size());
test.assertEquals("Incorrect processed object count from stage 1.", 2,
stage1.processedObjects.size());
- test.assertEquals("Incorrect number of processing failures recorded by
driver 2.", 1, d1.getProcessingFailures().size());
+ test.assertEquals("Incorrect number of processing failures recorded by
driver 2.", 1, d1.getProcessingExceptions().size());
test.assertEquals("Incorrect processed object count from stage 2.", 2,
stage2.processedObjects.size());
test.assertEquals("Incorrect final processed object count.", 2,
terminalFeeder.receivedValues.size());
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java?view=diff&rev=441279&r1=441278&r2=441279
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SynchronousStageDriverTest.java
Thu Sep 7 15:39:56 2006
@@ -16,13 +16,11 @@
package org.apache.commons.pipeline.driver;
-import junit.framework.*;
+import junit.framework.Test;
+import junit.framework.TestSuite;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.StageDriver.State;
-import static org.apache.commons.pipeline.StageDriver.State.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStage;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+
/**
*
@@ -32,7 +30,7 @@
public SynchronousStageDriverTest(String testName) {
super(testName);
- }
+ }
public static Test suite() {
TestSuite suite = new TestSuite(SynchronousStageDriverTest.class);
@@ -47,12 +45,12 @@
SynchronousStageDriver instance = new SynchronousStageDriver(stage,
context);
Feeder feeder = instance.getFeeder();
- assertNotNull(feeder);
+ assertNotNull(feeder);
}
/**
* Due to the design of the SynchronousStageDriver, it is meaningless
- * to independently test the start or finish methods; however, testing
+ * to independently test the start or finish methods; however, testing
* both together is meaningful. This test also provides verification of
* proper behavior of the getState() method.
*/
@@ -69,14 +67,14 @@
assertEquals(instance.getState(), State.STOPPED);
}
-
+
/*********************
* INTEGRATION TESTS *
*********************/
- public void testSingleStage() throws Exception {
+ public void testSingleStage() throws Exception {
StageDriverTestUtils.testSingleStage(this, new
SynchronousStageDriverFactory());
- }
+ }
public void testMultiStage() throws Exception {
StageDriverTestUtils.testMultiStage(this, new
SynchronousStageDriverFactory());
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]