Author: nuttycom
Date: Thu Sep 7 14:46:02 2006
New Revision: 441243
URL: http://svn.apache.org/viewvc?view=rev&rev=441243
Log:
Fixed problem where events were not properly propagated to sibling branches
Added:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
(with props)
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
Modified:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java?view=diff&rev=441243&r1=441242&r2=441243
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
(original)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Pipeline.java
Thu Sep 7 14:46:02 2006
@@ -19,10 +19,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EventObject;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +61,9 @@
// Map of pipeline branches where the keys are branch names.
private final Map<String,Pipeline> branches;
+ // Used to store a reference to the parent pipeline, if this is a branch
+ private Pipeline parent;
+
// The list of listeners registered with the pipeline.
private final List<StageEventListener> listeners;
@@ -100,25 +103,35 @@
}
/**
- * Notifies each registered listener of an event and propagates
- * the event to any attached branches
+ * Asynchronously notifies each registered listener of an event and
propagates
+ * the event to any attached branches and the parent pipeline.
+ *
* @param ev The event to be sent to registered listeners
*/
- public void raise(final java.util.EventObject ev) {
+ public void raise(final EventObject ev) {
new Thread() {
public void run() {
- for (StageEventListener listener : listeners) {
- listener.notify(ev);
- }
+ //first, recursively find the root pipeline
+ Pipeline root = Pipeline.this;
+ while (root.parent != null) root = root.parent;
- for (Pipeline branch : branches.values()) {
- if (branch != Pipeline.this) branch.raise(ev);
- }
- }
+ //notify the listeners from the root pipeline
+ root.notifyListeners(ev);
+ }
}.start();
}
/**
+ * Notify all listeners and recursively notify child branches of the
+ * specified event. This method does not propagate events to the
+ * parent pipeline.
+ */
+ private void notifyListeners(EventObject ev) {
+ for (StageEventListener listener : listeners) listener.notify(ev);
+ for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
+ }
+
+ /**
* This method is used by a stage driver to pass data from one stage to
the next.
* @return the feeder for the downstream stage, or null if no downstream
* stage exists.
@@ -135,7 +148,7 @@
for (int i = drivers.size() - 2; i >= 0; i--) {
if (stage == drivers.get(i).getStage()) return
drivers.get(i+1).getFeeder();
}
-
+
throw new IllegalStateException("Unable to find stage " + stage +
" in pipeline.");
}
}
@@ -155,6 +168,7 @@
* be used to validate that the appended Stage can consume the output of
the
* previous stage of the pipeline. It does NOT validate the ability or
availability
* of branches to consume data produced by the appended stage.
+ *
* @param stage the stage to be added to the pipeline
* @param driverFactory the factory that will be used to create a [EMAIL
PROTECTED] StageDriver} that will run the stage
* @throws ValidationException if there is a non-null validator set for
this pipeline and an error is
@@ -189,6 +203,7 @@
/**
* Return the StageDriver for the specified Stage.
+ *
* @return the StageDriver for the specified Stage.
*/
public final StageDriver getStageDriver(Stage stage) {
@@ -211,24 +226,25 @@
* @throws org.apache.commons.pipeline.validation.ValidationException if
the pipeline has a non-null [EMAIL PROTECTED] PipelineValidator} and the branch
* cannot consume the data produced for the branch by stages in the
pipeline.
*/
- public void addBranch(String key, Pipeline pipeline) throws
ValidationException {
+ public void addBranch(String key, Pipeline branch) throws
ValidationException {
if (key == null)
throw new IllegalArgumentException("Branch key may not be null.");
if (MAIN_BRANCH.equalsIgnoreCase(key))
throw new IllegalArgumentException("Branch key name \"" +
MAIN_BRANCH + "\" is reserved.");
- if (pipeline == null)
+ if (branch == null)
throw new IllegalArgumentException("Illegal attempt to set
reference to null branch.");
- if (pipeline == this || pipeline.hasBranch(this))
+ if (branch == this || branch.hasBranch(this))
throw new IllegalArgumentException("Illegal attempt to set
reference to self as a branch (infinite recursion potential)");
if (validator != null) {
- List<ValidationFailure> errors = validator.validateAddBranch(this,
key, pipeline);
+ List<ValidationFailure> errors = validator.validateAddBranch(this,
key, branch);
if (errors != null && !errors.isEmpty()) {
- throw new ValidationException("An error occurred adding branch
pipeline " + pipeline, errors);
+ throw new ValidationException("An error occurred adding branch
pipeline " + branch, errors);
}
}
- this.branches.put(key, pipeline);
+ branch.parent = this;
+ this.branches.put(key, branch);
}
/**
@@ -250,19 +266,19 @@
if (branches.containsValue(pipeline)) return true;
for (Pipeline branch : branches.values()) {
if (branch.hasBranch(pipeline)) return true;
- }
-
- return false;
}
+ return false;
+ }
+
/**
* Returns a feeder for the first stage if the pipeline is not empty
* @return the feeder to feed the first stage of the pipeline
*/
public Feeder getSourceFeeder() {
- if (drivers.isEmpty()) return null;
+ if (drivers.isEmpty()) return this.terminalFeeder;
return drivers.peek().getFeeder();
- }
+ }
/**
* Gets the feeder that receives output from the final stage.
@@ -286,6 +302,7 @@
* Startups may occur sequentially or in parallel, depending upon the
stage driver
* used. If a the stage has not been configured with a [EMAIL PROTECTED]
StageDriver},
* we will use the default, non-threaded [EMAIL PROTECTED]
SynchronousStageDriver}.
+ *
* @throws org.apache.commons.pipeline.StageException Thrown if there is
an error during pipeline startup
*/
public void start() throws StageException {
@@ -300,6 +317,7 @@
* method will block until the stage's queue is exhausted, so this method
* may be used to safely finalize all stages without the risk of
* losing data in the queues.
+ *
* @throws org.apache.commons.pipeline.StageException Thrown if there is
an unhandled error during stage shutdown
*/
public void finish() throws StageException {
Added:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java?view=auto&rev=441243
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
Thu Sep 7 14:46:02 2006
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline;
+
+import junit.framework.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EventObject;
+import org.apache.commons.pipeline.driver.SynchronousStageDriverFactory;
+import org.apache.commons.pipeline.event.ObjectProcessedEvent;
+import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter;
+import org.apache.commons.pipeline.testFramework.TestStage;
+
+/**
+ * Test cases
+ */
+public class PipelineTest extends TestCase {
+
+ public PipelineTest(String testName) {
+ super(testName);
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(PipelineTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of registerListener method, of class
org.apache.commons.pipeline.Pipeline.
+ */
+ public void testRegisterListener() {
+ StageEventListener listener = new ObjectProcessedEventCounter();
+ Pipeline instance = new Pipeline();
+
+ instance.registerListener(listener);
+
+ assertEquals(1, instance.getRegisteredListeners().size());
+ }
+
+ /**
+ * Test of getRegisteredListeners method, of class
org.apache.commons.pipeline.Pipeline.
+ */
+ public void testGetRegisteredListeners() {
+ Pipeline instance = new Pipeline();
+
+ Collection<StageEventListener> expResult = Collections.EMPTY_LIST;
+ Collection<StageEventListener> result =
instance.getRegisteredListeners();
+ assertEquals(expResult, result);
+ }
+
+ /**
+ * Test of raise method, of class org.apache.commons.pipeline.Pipeline.
+ */
+ public void testRaise() {
+ Stage testStage = new TestStage(0);
+ EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!");
+ Pipeline instance = new Pipeline();
+ ObjectProcessedEventCounter counter = new
ObjectProcessedEventCounter();
+ instance.registerListener(counter);
+
+ instance.raise(ev);
+
+ Thread.currentThread().yield(); //give the notifier thread created by
raise priority
+
+ assertNotNull(counter.getCounts().get(testStage));
+ assertEquals(1, counter.getCounts().get(testStage).intValue());
+ }
+
+ public void testRaiseOnBranch() throws Exception {
+ Pipeline root = new Pipeline();
+
+ Pipeline branch1 = new Pipeline();
+ root.addBranch("b1", branch1);
+
+ Pipeline branch2 = new Pipeline();
+ root.addBranch("b2", branch2);
+
+ ObjectProcessedEventCounter counter = new
ObjectProcessedEventCounter();
+ branch2.registerListener(counter);
+
+ Stage testStage = new TestStage(0);
+ EventObject ev = new ObjectProcessedEvent(testStage, "Hello, World!");
+ branch1.raise(ev);
+
+ Thread.currentThread().yield(); //give the notifier thread created by
raise priority
+
+ assertNotNull(counter.getCounts().get(testStage));
+ assertEquals(1, counter.getCounts().get(testStage).intValue());
+ }
+
+ /**
+ * Test of getDownstreamFeeder method, of class
org.apache.commons.pipeline.Pipeline.
+ */
+ public void testGetDownstreamFeeder() throws Exception {
+ Stage stage1 = new TestStage(0);
+ Stage stage2 = new TestStage(1);
+ StageDriverFactory sdf = new SynchronousStageDriverFactory();
+
+ Pipeline instance = new Pipeline();
+ instance.addStage(stage1, sdf);
+ instance.addStage(stage2, sdf);
+
+ Feeder expResult = instance.getStageDriver(stage2).getFeeder();
+ Feeder result = instance.getDownstreamFeeder(stage1);
+ assertSame(expResult, result);
+ }
+
+ /**
+ * Test of getBranchFeeder method, of class
org.apache.commons.pipeline.Pipeline.
+ */
+ public void testGetBranchFeeder() throws Exception {
+ String branchKey = "b1";
+ Pipeline root = new Pipeline();
+ Pipeline branch = new Pipeline();
+ root.addBranch(branchKey, branch);
+
+ Feeder expResult = branch.getTerminalFeeder(); //no feeders registered
+ Feeder result = root.getBranchFeeder(branchKey);
+ assertSame(expResult, result);
+
+ StageDriverFactory sdf = new SynchronousStageDriverFactory();
+ Stage testStage = new TestStage(0);
+ branch.addStage(testStage, sdf);
+
+ expResult = branch.getStageDriver(testStage).getFeeder();
+ result = root.getBranchFeeder(branchKey);
+ assertSame(expResult, result);
+ }
+
+// /**
+// * Test of addStage method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testAddStage() throws Exception {
+// System.out.println("addStage");
+//
+// Stage stage = null;
+// StageDriverFactory driverFactory = null;
+// Pipeline instance = new Pipeline();
+//
+// instance.addStage(stage, driverFactory);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getStages method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetStages() {
+// System.out.println("getStages");
+//
+// Pipeline instance = new Pipeline();
+//
+// List<Stage> expResult = null;
+// List<Stage> result = instance.getStages();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getStageDriver method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetStageDriver() {
+// System.out.println("getStageDriver");
+//
+// Stage stage = null;
+// Pipeline instance = new Pipeline();
+//
+// StageDriver expResult = null;
+// StageDriver result = instance.getStageDriver(stage);
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getStageDrivers method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetStageDrivers() {
+// System.out.println("getStageDrivers");
+//
+// Pipeline instance = new Pipeline();
+//
+// List<StageDriver> expResult = null;
+// List<StageDriver> result = instance.getStageDrivers();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of addBranch method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testAddBranch() throws Exception {
+// System.out.println("addBranch");
+//
+// String key = "";
+// Pipeline branch = null;
+// Pipeline instance = new Pipeline();
+//
+// instance.addBranch(key, branch);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getBranches method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetBranches() {
+// System.out.println("getBranches");
+//
+// Pipeline instance = new Pipeline();
+//
+// Map<String, Pipeline> expResult = null;
+// Map<String, Pipeline> result = instance.getBranches();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getSourceFeeder method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetSourceFeeder() {
+// System.out.println("getSourceFeeder");
+//
+// Pipeline instance = new Pipeline();
+//
+// Feeder expResult = null;
+// Feeder result = instance.getSourceFeeder();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getTerminalFeeder method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetTerminalFeeder() {
+// System.out.println("getTerminalFeeder");
+//
+// Pipeline instance = new Pipeline();
+//
+// Feeder expResult = null;
+// Feeder result = instance.getTerminalFeeder();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of setTerminalFeeder method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testSetTerminalFeeder() {
+// System.out.println("setTerminalFeeder");
+//
+// Feeder terminalFeeder = null;
+// Pipeline instance = new Pipeline();
+//
+// instance.setTerminalFeeder(terminalFeeder);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of start method, of class org.apache.commons.pipeline.Pipeline.
+// */
+// public void testStart() throws Exception {
+// System.out.println("start");
+//
+// Pipeline instance = new Pipeline();
+//
+// instance.start();
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of finish method, of class org.apache.commons.pipeline.Pipeline.
+// */
+// public void testFinish() throws Exception {
+// System.out.println("finish");
+//
+// Pipeline instance = new Pipeline();
+//
+// instance.finish();
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of run method, of class org.apache.commons.pipeline.Pipeline.
+// */
+// public void testRun() {
+// System.out.println("run");
+//
+// Pipeline instance = new Pipeline();
+//
+// instance.run();
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of getValidator method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testGetValidator() {
+// System.out.println("getValidator");
+//
+// Pipeline instance = new Pipeline();
+//
+// PipelineValidator expResult = null;
+// PipelineValidator result = instance.getValidator();
+// assertEquals(expResult, result);
+//
+// fail("The test case is a prototype.");
+// }
+//
+// /**
+// * Test of setValidator method, of class
org.apache.commons.pipeline.Pipeline.
+// */
+// public void testSetValidator() {
+// System.out.println("setValidator");
+//
+// PipelineValidator validator = null;
+// Pipeline instance = new Pipeline();
+//
+// instance.setValidator(validator);
+//
+// fail("The test case is a prototype.");
+// }
+
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/PipelineTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]