Author: chirino
Date: Fri Feb 13 12:19:15 2009
New Revision: 744094
URL: http://svn.apache.org/viewvc?rev=744094&view=rev
Log:
- the ISinkController looks and smells like a IFlowSink.. so lets let it be one.
- Can replace using the exclusive queue for connection outbound messages with
just a FlowContoller
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Fri Feb 13 12:19:15 2009
@@ -438,4 +438,34 @@
public IFlowSink<E> getFlowSink() {
return controllable.getFlowSink();
}
+
+ public long getResourceId() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceId();
+ }
+ return 0;
+ }
+
+ public String getResourceName() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceName();
+ }
+ return null;
+ }
+
+ public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.addFlowLifeCycleListener(listener);
+ }
+ }
+
+ public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.removeFlowLifeCycleListener(listener);
+ }
+ }
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
Fri Feb 13 12:19:15 2009
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.flow;
-public interface ISinkController<E> {
+public interface ISinkController<E> extends IFlowSink<E> {
/**
* Defines required attributes for an entity that can be flow controlled.
*
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
Fri Feb 13 12:19:15 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.flow;
+
public class NoOpFlowController<E> implements ISinkController<E> {
private final IFlowSource<E> source;
private final Flow flow;
@@ -89,4 +90,33 @@
return null;
}
+ public long getResourceId() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceId();
+ }
+ return 0;
+ }
+
+ public String getResourceName() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceName();
+ }
+ return null;
+ }
+
+ public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.addFlowLifeCycleListener(listener);
+ }
+ }
+
+ public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.removeFlowLifeCycleListener(listener);
+ }
+ }
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
Fri Feb 13 12:19:15 2009
@@ -16,48 +16,40 @@
*/
package org.apache.activemq.flow;
-import org.apache.activemq.queue.Mapper;
+import java.util.ArrayList;
public class PriorityFlowController<E> implements ISourceController<E>,
ISinkController<E> {
private final Object mutex;
- private final FlowController<E> controllers[];
+ private final ArrayList<FlowController<E>> controllers;
private final PrioritySizeLimiter<E> limiter;
- private Mapper<Integer, E> priorityMapper;
-
private final Flow flow;
private final FlowControllable<E> controllable;
- public PriorityFlowController(int priorities, FlowControllable<E>
controllable, Flow flow, Object mutex, int capacity, int resume) {
+ public PriorityFlowController(FlowControllable<E> controllable, Flow flow,
PrioritySizeLimiter<E> limiter, Object mutex) {
this.controllable = controllable;
this.flow = flow;
this.mutex = mutex;
- this.limiter = new PrioritySizeLimiter<E>(capacity, resume,
priorities);
- this.limiter.setPriorityMapper(priorityMapper);
- this.controllers = createControlerArray(priorities);
- for (int i = 0; i < priorities; i++) {
- this.controllers[i] = new FlowController<E>(controllable, flow,
limiter.getPriorityLimter(i), mutex);
+ this.limiter = limiter;
+ this.controllers = new
ArrayList<FlowController<E>>(limiter.getPriorities());
+ for (int i = 0; i < limiter.getPriorities(); i++) {
+ controllers.add(new FlowController<E>(controllable, flow,
limiter.getPriorityLimter(i), mutex));
}
}
- @SuppressWarnings("unchecked")
- private FlowController<E>[] createControlerArray(int priorities) {
- return new FlowController[priorities];
- }
-
// /////////////////////////////////////////////////////////////////
// ISinkController interface impl.
// /////////////////////////////////////////////////////////////////
public boolean offer(E elem, ISourceController<E> controller) {
- int prio = priorityMapper.map(elem);
- return controllers[prio].offer(elem, controller);
+ int prio = limiter.getPriorityMapper().map(elem);
+ return controllers.get(prio).offer(elem, controller);
}
public void add(E elem, ISourceController<E> controller) {
- int prio = priorityMapper.map(elem);
- controllers[prio].add(elem, controller);
+ int prio = limiter.getPriorityMapper().map(elem);
+ controllers.get(prio).add(elem, controller);
}
public boolean isSinkBlocked() {
@@ -68,8 +60,8 @@
public boolean
addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener<E>
listener) {
boolean rc = false;
- for (int i = 0; i < controllers.length; i++) {
- rc |= this.controllers[i].addUnblockListener(listener);
+ for (int i = 0; i < controllers.size(); i++) {
+ rc |= this.controllers.get(i).addUnblockListener(listener);
}
return rc;
}
@@ -77,13 +69,45 @@
public void waitForFlowUnblock() throws InterruptedException {
throw new UnsupportedOperationException();
}
+
+ public long getResourceId() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceId();
+ }
+ return 0;
+ }
+
+ public String getResourceName() {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ return flowSink.getResourceName();
+ }
+ return null;
+ }
+
+ public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.addFlowLifeCycleListener(listener);
+ }
+ }
+
+ public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ IFlowSink<E> flowSink = getFlowSink();
+ if( flowSink!=null ) {
+ flowSink.removeFlowLifeCycleListener(listener);
+ }
+ }
+
// /////////////////////////////////////////////////////////////////
// ISourceController interface impl.
// /////////////////////////////////////////////////////////////////
public void elementDispatched(E elem) {
- FlowController<E> controler = controllers[priorityMapper.map(elem)];
+ Integer prio = limiter.getPriorityMapper().map(elem);
+ FlowController<E> controler = controllers.get(prio);
controler.elementDispatched(elem);
}
@@ -96,14 +120,14 @@
}
public void onFlowBlock(ISinkController<E> sink) {
- for (int i = 0; i < controllers.length; i++) {
- controllers[i].onFlowBlock(sink);
+ for (int i = 0; i < controllers.size(); i++) {
+ controllers.get(i).onFlowBlock(sink);
}
}
public void onFlowResume(ISinkController<E> sink) {
- for (int i = 0; i < controllers.length; i++) {
- controllers[i].onFlowBlock(sink);
+ for (int i = 0; i < controllers.size(); i++) {
+ controllers.get(i).onFlowBlock(sink);
}
}
@@ -115,15 +139,6 @@
// Getters and Setters
// /////////////////////////////////////////////////////////////////
- public Mapper<Integer, E> getPriorityMapper() {
- return priorityMapper;
- }
-
- public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
- this.priorityMapper = priorityMapper;
- limiter.setPriorityMapper(priorityMapper);
- }
-
public IFlowSink<E> getFlowSink() {
return controllable.getFlowSink();
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
Fri Feb 13 12:19:15 2009
@@ -21,6 +21,7 @@
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PriorityFlowController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.kahadb.util.LinkedNode;
/**
@@ -28,7 +29,6 @@
public class ExclusivePriorityQueue<E> extends AbstractFlowQueue<E> implements
IFlowQueue<E> {
private final PriorityLinkedList<PriorityNode> queue;
- private Mapper<Integer, E> priorityMapper;
private class PriorityNode extends LinkedNode<PriorityNode> {
E elem;
@@ -36,6 +36,7 @@
}
private final PriorityFlowController<E> controller;
+ private final PrioritySizeLimiter<E> limiter;
/**
* Creates a flow queue that can handle multiple flows.
@@ -48,10 +49,11 @@
* @param controller
* The FlowController if this queue is flow controlled:
*/
- public ExclusivePriorityQueue(int priority, Flow flow, String name, int
capacity, int resume) {
+ public ExclusivePriorityQueue(Flow flow, String name,
PrioritySizeLimiter<E> limiter) {
super(name);
+ this.limiter = limiter;
this.queue = new PriorityLinkedList<PriorityNode>(10);
- this.controller = new PriorityFlowController<E>(priority,
getFlowControllableHook(), flow, this, capacity, resume);
+ this.controller = new
PriorityFlowController<E>(getFlowControllableHook(), flow, limiter, this);
}
@@ -72,7 +74,7 @@
public synchronized void flowElemAccepted(ISourceController<E> controller,
E elem) {
PriorityNode node = new PriorityNode();
node.elem = elem;
- node.prio = priorityMapper.map(elem);
+ node.prio = limiter.getPriorityMapper().map(elem);
queue.add(node, node.prio);
notifyReady();
@@ -107,15 +109,6 @@
}
}
- public Mapper<Integer, E> getPriorityMapper() {
- return priorityMapper;
- }
-
- public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
- this.priorityMapper = priorityMapper;
- controller.setPriorityMapper(priorityMapper);
- }
-
@Override
public String toString() {
return getResourceName();
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Fri Feb 13 12:19:15 2009
@@ -65,8 +65,9 @@
if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
this.output = TestFlowManager.createFlowQueue(flow, name +
"-OUTPUT", outputQueueSize, resumeThreshold);
} else {
- ExclusivePriorityQueue<Message> t = new
ExclusivePriorityQueue<Message>(MockBrokerTest.PRIORITY_LEVELS, flow, name +
"-OUTPUT", outputQueueSize, resumeThreshold);
- t.setPriorityMapper(Message.PRIORITY_MAPPER);
+ PrioritySizeLimiter<Message> pl = new
PrioritySizeLimiter<Message>(outputQueueSize, resumeThreshold,
MockBrokerTest.PRIORITY_LEVELS);
+ pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+ ExclusivePriorityQueue<Message> t = new
ExclusivePriorityQueue<Message>(flow, name + "-OUTPUT", pl);
this.output = t;
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Fri Feb 13 12:19:15 2009
@@ -9,21 +9,20 @@
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
-import org.apache.activemq.queue.ExclusivePriorityQueue;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IFlowQueue;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
public class RemoteConnection implements TransportListener, DeliveryTarget {
- protected final Object mutex = new Object();
protected Transport transport;
protected MockBroker broker;
- protected IFlowQueue<Message> output;
+ protected final Object inboundMutex = new Object();
protected FlowController<Message> inboundController;
+
+ protected final Object outboundMutex = new Object();
+ protected IFlowSink<Message> outboundController;
protected String name;
private int priorityLevels;
@@ -106,26 +105,12 @@
public IFlowSource<Message> getFlowSource() {
return null;
}
- }, flow, limiter, mutex);
+ }, flow, limiter, inboundMutex);
// Setup output processing
- if (priorityLevels <= 1) {
- limiter = new SizeLimiter<Message>(outputWindowSize,
outputResumeThreshold);
- flow = new Flow(name + "-outbound", false);
- ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow,
flow.getFlowName(), limiter);
- this.output = queue;
- } else {
- ExclusivePriorityQueue<Message> t = new
ExclusivePriorityQueue<Message>(priorityLevels, flow, name + "-outbound",
outputWindowSize, outputResumeThreshold);
- t.setPriorityMapper(Message.PRIORITY_MAPPER);
- this.output = t;
- }
-
- // Use an async thread to drain the output queue.
- // Personally I think it would be better if we polled messages out of
the output queue.
writer = Executors.newSingleThreadExecutor();
- output.setDispatcher(dispatcher);
- output.setDrain(new IFlowDrain<Message>() {
- public void drain(final Message elem, final
ISourceController<Message> controller) {
+ FlowControllable<Message> controllable = new
FlowControllable<Message>(){
+ public void flowElemAccepted(final ISourceController<Message>
controller, final Message elem) {
writer.execute(new Runnable() {
public void run() {
if (!stopping.get()) {
@@ -139,7 +124,24 @@
}
});
}
- });
+ public IFlowSink<Message> getFlowSink() {
+ return null;
+ }
+ public IFlowSource<Message> getFlowSource() {
+ return null;
+ }
+ };
+
+ flow = new Flow(name + "-outbound", false);
+ if (priorityLevels <= 1) {
+ limiter = new SizeLimiter<Message>(outputWindowSize,
outputResumeThreshold);
+ outboundController = new FlowController<Message>(controllable,
flow, limiter, outboundMutex);
+ } else {
+ PrioritySizeLimiter<Message> pl = new
PrioritySizeLimiter<Message>(outputWindowSize, outputResumeThreshold,
priorityLevels);
+ pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+ outboundController = new
PriorityFlowController<Message>(controllable, flow, pl, outboundMutex);
+ }
+
}
public void onException(IOException error) {
@@ -200,7 +202,7 @@
}
public IFlowSink<Message> getSink() {
- return output;
+ return outboundController;
}
public boolean match(Message message) {