Author: nuttycom
Date: Thu Oct 26 14:47:25 2006
New Revision: 468153
URL: http://svn.apache.org/viewvc?view=rev&rev=468153
Log:
Added functionality to allow inter-branch synchronization using event-based key
passing.
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
(with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
(with props)
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java?view=auto&rev=468153
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.event;
+
+import java.util.EventObject;
+
+/**
+ * This event is used to signal the availability of the specified key. This is
+ * usually used for inter-branch synchronization using the
+ * [EMAIL PROTECTED]
org.apache.commons.pipeline.stage.RaiseKeyAvailableEventStage
RaiseKeyAvailableEventStage}
+ * and/or the [EMAIL PROTECTED]
org.apache.commons.pipeline.stage.KeyWaitBufferStage KeyWaitBufferStage}.
+ */
+public class KeyAvailableEvent<T> extends EventObject {
+ private T key;
+
+ /** Creates a new instance of KeyAvailableEvent */
+ public KeyAvailableEvent(Object source, T key) {
+ super(source);
+ this.key = key;
+ }
+
+ /**
+ * Returns the key.
+ */
+ public T getKey() {
+ return this.key;
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java?view=auto&rev=468153
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.EventObject;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+import org.apache.commons.pipeline.util.QueueFactory;
+
+/**
+ *
+ * @author kjn
+ */
+public class KeyWaitBufferStage extends BaseStage implements
StageEventListener {
+
+ private Set<Object> receivedKeys = new TreeSet<Object>();
+ private Map<Object,Queue<Object>> buffers = new
TreeMap<Object,Queue<Object>>();
+
+ /** Creates a new instance of KeyWaitBufferStage */
+ public KeyWaitBufferStage() {
+ }
+
+ public void notify(EventObject ev) {
+ if (ev instanceof KeyAvailableEvent) {
+ KeyAvailableEvent e = (KeyAvailableEvent) ev;
+ synchronized(receivedKeys) {
+ receivedKeys.add(e.getKey());
+ }
+
+ //at this point, we know that no more objects will be added to
+ //the pending queue for the key, so we can remove and empty it.
+ if (buffers.containsKey(e.getKey())) {
+ for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
+ }
+ }
+ }
+
+ public void init(StageContext context) {
+ super.init(context);
+ context.registerListener(this);
+ }
+
+ public void process(Object obj) throws StageException {
+ Object key = keyFactory.generateKey(obj);
+ synchronized(receivedKeys) {
+ if (!receivedKeys.contains(key)) {
+ //store the object in a pending queue.
+ if (!buffers.containsKey(key)) buffers.put(key,
queueFactory.createQueue());
+ buffers.get(key).add(obj);
+ return;
+ }
+ }
+
+ this.emit(obj);
+ }
+
+ /**
+ * Holds value of property keyFactory.
+ */
+ private KeyFactory<Object,? extends Object> keyFactory;
+
+ /**
+ * Getter for property keyFactory.
+ * @return Value of property keyFactory.
+ */
+ public KeyFactory<Object,? extends Object> getKeyFactory() {
+ return this.keyFactory;
+ }
+
+ /**
+ * Setter for property keyFactory.
+ * @param keyFactory New value of property keyFactory.
+ */
+ public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
+ this.keyFactory = keyFactory;
+ }
+
+ /**
+ * Holds value of property queueFactory.
+ */
+ private QueueFactory<Object> queueFactory;
+
+ /**
+ * Getter for property queueFactory.
+ * @return Value of property queueFactory.
+ */
+ public QueueFactory<Object> getQueueFactory() {
+ return this.queueFactory;
+ }
+
+ /**
+ * Setter for property queueFactory.
+ * @param queueFactory New value of property queueFactory.
+ */
+ public void setQueueFactory(QueueFactory<Object> queueFactory) {
+ this.queueFactory = queueFactory;
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java?view=auto&rev=468153
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.commons.pipeline.stage;
+
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+
+/**
+ *
+ *
+ */
+public class RaiseKeyAvailableEventStage extends BaseStage {
+
+ /** Creates a new instance of RaiseKeyAvailableEventStage */
+ public RaiseKeyAvailableEventStage() {
+ }
+
+ /**
+ * This implementation of process() simply generates a key for the
+ * processed object and raises a KeyAvailableEvent with the generated
+ * key, then emits the processed object unchanged.
+ */
+ public void process(Object obj) throws StageException {
+ this.context.raise(new KeyAvailableEvent(this,
keyFactory.generateKey(obj)));
+ this.emit(obj);
+ }
+
+ /**
+ * Holds value of property keyFactory.
+ */
+ private KeyFactory<Object,Object> keyFactory;
+
+ /**
+ * Returns the KeyFactory used to create keys for the objects processed
+ * by this stage.
+ */
+ public KeyFactory<Object,Object> getKeyFactory() {
+ return keyFactory;
+ }
+
+ /**
+ * Sets the KeyFactory used to create keys for the objects processed
+ * by this stage.
+ */
+ public void setKeyFactory(KeyFactory<Object,Object> keyFactory) {
+ this.keyFactory = keyFactory;
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?view=auto&rev=468153
==============================================================================
---
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
(added)
+++
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import junit.framework.*;
+import java.util.EventObject;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+import org.apache.commons.pipeline.util.QueueFactory;
+
+/**
+ * Test case for the KeyWaitBufferStage
+ */
+public class KeyWaitBufferStageTest extends AbstractStageTest {
+
+ public KeyWaitBufferStageTest(String testName) {
+ super(testName);
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(KeyWaitBufferStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of notify and process methods, of class
org.apache.commons.pipeline.stage.KeyWaitBufferStage.
+ * The tests of these methods are coupled since the process method buffers
+ * data waiting for notify() to be called with an appropriate event.
+ */
+ public void testProcessAndNotify() throws Exception {
+ System.out.println("notify");
+
+ String obj = "Hello, World!";
+ KeyFactory<Object,Integer> keyFactory = new
KeyFactory.HashKeyFactory();
+ EventObject ev = new KeyAvailableEvent(this,
keyFactory.generateKey(obj));
+
+ KeyWaitBufferStage instance = new KeyWaitBufferStage();
+ instance.setKeyFactory(keyFactory);
+ instance.setQueueFactory(new QueueFactory.LinkedListFactory());
+
+ this.init(instance);
+
+ instance.process(obj);
+
+ assertTrue("The process object was not buffered correctly.",
this.testFeeder.receivedValues.isEmpty());
+
+ instance.notify(ev);
+
+ assertTrue("The buffer was not properly flushed upon receiving the
event.", this.testFeeder.receivedValues.contains(obj));
+ }
+
+ /**
+ * Test of init method, of class
org.apache.commons.pipeline.stage.KeyWaitBufferStage.
+ */
+ public void testInit() {
+ KeyWaitBufferStage instance = new KeyWaitBufferStage();
+
+ instance.init(this.testContext);
+
+ assertTrue("The automatic registration of the stage as a
StageEventListener failed.", this.testContext.listeners.contains(instance));
+ }
+}
Propchange:
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]