Author: chirino
Date: Tue Mar 17 13:27:44 2009
New Revision: 755235
URL: http://svn.apache.org/viewvc?rev=755235&view=rev
Log:
Committing Colin's patch for https://issues.apache.org/activemq/browse/AMQ-2165
Thanks!
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Tue Mar 17 13:27:44 2009
@@ -30,10 +30,10 @@
private static final Executor DEFAULT_EXECUTOR =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// Sinks that are blocking us.
- private final HashSet<ISinkController<E>> blockingSinks = new
HashSet<ISinkController<E>>();
+ private final HashSet<ISinkController<?>> blockingSinks = new
HashSet<ISinkController<?>>();
// Holds the sources that this limiter is currently blocking
- private final HashSet<ISourceController<E>> blockedSources = new
HashSet<ISourceController<E>>();
+ private final HashSet<ISourceController<?>> blockedSources = new
HashSet<ISourceController<?>>();
// Holds the sources that this limiter is currently blocking
private final HashSet<FlowUnblockListener<E>> unblockListeners = new
HashSet<FlowUnblockListener<E>>();
@@ -122,7 +122,7 @@
/**
* Should be called by a resource anytime it's limits are exceeded.
*/
- public final void onFlowBlock(ISinkController<E> sinkController) {
+ public final void onFlowBlock(ISinkController<?> sinkController) {
synchronized (mutex) {
if (!blockingSinks.add(sinkController)) {
throw new IllegalStateException(sinkController + " has already
blocked: " + this);
@@ -135,7 +135,7 @@
}
}
- public final void onFlowResume(ISinkController<E> sinkController) {
+ public final void onFlowResume(ISinkController<?> sinkController) {
synchronized (mutex) {
if (!blockingSinks.remove(sinkController)) {
throw new IllegalStateException(sinkController + " can't
resume unblocked " + this);
@@ -217,7 +217,7 @@
* @param controller
* the source flow controller.
*/
- public void add(E elem, ISourceController<E> sourceController) {
+ public void add(E elem, ISourceController<?> sourceController) {
boolean ok = false;
synchronized (mutex) {
// If we don't have an fc sink, then just increment the limiter.
@@ -255,7 +255,7 @@
* @param controller
* the source flow controller.
*/
- public boolean offer(E elem, ISourceController<E> sourceController) {
+ public boolean offer(E elem, ISourceController<?> sourceController) {
boolean ok = false;
synchronized (mutex) {
// If we don't have an fc sink, then just increment the limiter.
@@ -310,7 +310,7 @@
* @param source
* The {...@link ISinkController} of the source to be blocked.
*/
- protected void blockSource(final ISourceController<E> source) {
+ protected void blockSource(final ISourceController<?> source) {
if (source == null) {
return;
}
@@ -389,7 +389,7 @@
}
String was = Thread.currentThread().getName();
try {
- for (ISourceController<E> source : blockedSources) {
+ for (ISourceController<?> source : blockedSources) {
// System.out.println("UNBLOCKING: SINK[" +
FlowController.this + "], SOURCE[" + source + "]");
source.onFlowResume(FlowController.this);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
Tue Mar 17 13:27:44 2009
@@ -28,7 +28,7 @@
* @param source
* The source's flow controller.
*/
- public void add(E elem, ISourceController<E> source);
+ public void add(E elem, ISourceController<?> source);
/**
* Offers an element to the sink. If there is no room available the
source's
@@ -40,7 +40,7 @@
* The source's controller.
* @return false if the element wasn't accepted.
*/
- public boolean offer(E elem, ISourceController<E> source);
+ public boolean offer(E elem, ISourceController<?> source);
/**
* Sets the executor to be used by the sink's {...@link IFlowController}s.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
Tue Mar 17 13:27:44 2009
@@ -51,7 +51,7 @@
* @param controller
* the source flow controller.
*/
- public boolean offer(E elem, ISourceController<E> sourceController);
+ public boolean offer(E elem, ISourceController<?> sourceController);
/**
* Adds an element to the sink associated with this resource if space is
@@ -64,7 +64,7 @@
* @param controller
* the source flow controller.
*/
- public void add(E elem, ISourceController<E> controller);
+ public void add(E elem, ISourceController<?> controller);
/**
* Called to check if this FlowController is currently being blocked
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
Tue Mar 17 13:27:44 2009
@@ -41,19 +41,19 @@
/**
* This is called when a particular flow is blocked for a resource
*
- * @param sink
- * The sink blocking this source
+ * @param sinkController
+ * The sink controller blocking this source
*/
- public void onFlowBlock(ISinkController<E> sink);
+ public void onFlowBlock(ISinkController<?> sinkController);
/**
* Callback used with FlowControllers to get a notification that an
* IFlowController has been resumed.
*
- * @param controller
- * The IFlowController that was unblocked.
+ * @param sinkController
+ * The sink controller that was unblocked.
*/
- public void onFlowResume(ISinkController<E> sink);
+ public void onFlowResume(ISinkController<?> sinkController);
public boolean isSourceBlocked();
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
Tue Mar 17 13:27:44 2009
@@ -63,11 +63,11 @@
return "DISABLED Flow Controller for: " + source;
}
- public boolean offer(E elem, ISourceController<E> sourceController) {
+ public boolean offer(E elem, ISourceController<?> sourceController) {
throw new UnsupportedOperationException();
}
- public void add(E elem, ISourceController<E> controller) {
+ public void add(E elem, ISourceController<?> controller) {
throw new UnsupportedOperationException();
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
Tue Mar 17 13:27:44 2009
@@ -43,12 +43,12 @@
// ISinkController interface impl.
// /////////////////////////////////////////////////////////////////
- public boolean offer(E elem, ISourceController<E> controller) {
+ public boolean offer(E elem, ISourceController<?> controller) {
int prio = limiter.getPriorityMapper().map(elem);
return controllers.get(prio).offer(elem, controller);
}
- public void add(E elem, ISourceController<E> controller) {
+ public void add(E elem, ISourceController<?> controller) {
int prio = limiter.getPriorityMapper().map(elem);
controllers.get(prio).add(elem, controller);
}
@@ -89,13 +89,13 @@
return controllable.getFlowSource();
}
- public void onFlowBlock(ISinkController<E> sink) {
+ public void onFlowBlock(ISinkController<?> sink) {
for (int i = 0; i < controllers.size(); i++) {
controllers.get(i).onFlowBlock(sink);
}
}
- public void onFlowResume(ISinkController<E> sink) {
+ public void onFlowResume(ISinkController<?> sink) {
for (int i = 0; i < controllers.size(); i++) {
controllers.get(i).onFlowBlock(sink);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
Tue Mar 17 13:27:44 2009
@@ -18,14 +18,14 @@
public class SizeLimiter<E> extends AbstractLimiter<E> {
- protected int capacity;
- protected int resumeThreshold;
+ protected long capacity;
+ protected long resumeThreshold;
- private int size;
+ private long size;
private boolean throttled;
- private int reserved;
+ private long reserved;
- public SizeLimiter(int capacity, int resumeThreshold) {
+ public SizeLimiter(long capacity, long resumeThreshold) {
this.capacity = capacity;
throttled = false;
this.resumeThreshold = resumeThreshold;
@@ -50,13 +50,13 @@
public void releaseReserved() {
if (reserved > 0) {
- int res = reserved;
+ long res = reserved;
reserved = 0;
remove(res);
}
}
- protected void remove(int s) {
+ public void remove(long s) {
this.size -= s;
if (size < 0) {
Exception ie = new IllegalStateException("Size Negative!" + size);
@@ -91,18 +91,41 @@
return !throttled;
}
- public int getCapacity() {
+ public long getCapacity() {
return capacity;
}
- public int getResumeThreshold() {
+ public long getResumeThreshold() {
return resumeThreshold;
}
- public int getSize() {
+ public long getSize() {
return size;
}
+ public void setCapacity(long capacity) {
+ if (capacity < resumeThreshold) {
+ throw new IllegalArgumentException("capacity less than resume
threshold");
+ }
+
+ this.capacity = capacity;
+
+ if (this.size >= capacity) {
+ throttled = true;
+ }
+ }
+
+ public void setResumeThreshold(long size) {
+
+ if (capacity < resumeThreshold) {
+ throw new IllegalArgumentException("capacity less than resume
threshold");
+ }
+ if (throttled && this.size <= resumeThreshold) {
+ throttled = false;
+ notifyUnThrottleListeners();
+ }
+ }
+
public String toString() {
return "SizeLimiter " + capacity + "/" + resumeThreshold + ", s=" +
size + " res=" + reserved + ", thr= " + throttled;
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -57,14 +57,14 @@
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return controller.offer(elem, source);
}
/**
* Performs a limited add to the queue.
*/
- public final void add(E elem, ISourceController<E> source) {
+ public final void add(E elem, ISourceController<?> source) {
controller.add(elem, source);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
Tue Mar 17 13:27:44 2009
@@ -41,14 +41,14 @@
super.onFlowOpened(controller);
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return controller.offer(elem, source);
}
/**
* Performs a limited add to the queue.
*/
- public final void add(E elem, ISourceController<E> source) {
+ public final void add(E elem, ISourceController<?> source) {
controller.add(elem, source);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
Tue Mar 17 13:27:44 2009
@@ -50,7 +50,7 @@
return LoadBalancedFlowQueue.this;
}
- public void onFlowBlock(ISinkController<E> sink) {
+ public void onFlowBlock(ISinkController<?> sink) {
synchronized (LoadBalancedFlowQueue.this) {
SinkNode node = consumers.get(sink);
if (node != null) {
@@ -61,7 +61,7 @@
}
- public void onFlowResume(ISinkController<E> sink) {
+ public void onFlowResume(ISinkController<?> sink) {
synchronized (LoadBalancedFlowQueue.this) {
SinkNode node = consumers.get(sink);
if (node != null) {
@@ -108,14 +108,14 @@
super.onFlowOpened(sinkController);
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return sinkController.offer(elem, source);
}
/**
* Performs a limited add to the queue.
*/
- public final void add(E elem, ISourceController<E> source) {
+ public final void add(E elem, ISourceController<?> source) {
sinkController.add(elem, source);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
Tue Mar 17 13:27:44 2009
@@ -48,11 +48,11 @@
throw new UnsupportedOperationException();
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
throw new UnsupportedOperationException("Not yet implemented");
}
- public synchronized void add(E elem, ISourceController<E> source) {
+ public synchronized void add(E elem, ISourceController<?> source) {
SingleFlowQueue queue = flowQueues.get(source.getFlow());
if (queue == null) {
queue = new SingleFlowQueue(source.getFlow(), new
SizeLimiter<E>(perFlowWindow, resumeThreshold));
@@ -139,7 +139,7 @@
this.controller = new FlowController<E>(this, flow, limiter,
MultiFlowQueue.this);
}
- final void enqueue(E elem, ISourceController<E> source) {
+ final void enqueue(E elem, ISourceController<?> source) {
controller.add(elem, source);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
Tue Mar 17 13:27:44 2009
@@ -94,13 +94,13 @@
return partitionMapper;
}
- public void add(V value, ISourceController<V> source) {
+ public void add(V value, ISourceController<?> source) {
P partitionKey = partitionMapper.map(value);
IQueue<K, V> partition = getPartition(partitionKey);
partition.add(value, source);
}
- public boolean offer(V value, ISourceController<V> source) {
+ public boolean offer(V value, ISourceController<?> source) {
P partitionKey = partitionMapper.map(value);
IQueue<K, V> partition = getPartition(partitionKey);
return partition.offer(value, source);
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -109,13 +109,13 @@
}
}
- public void add(V value, ISourceController<V> source) {
+ public void add(V value, ISourceController<?> source) {
int prio = priorityMapper.map(value);
IQueue<K, V> partition = getPartition(prio);
partition.add(value, source);
}
- public boolean offer(V value, ISourceController<V> source) {
+ public boolean offer(V value, ISourceController<?> source) {
int prio = priorityMapper.map(value);
IQueue<K, V> partition = getPartition(prio);
return partition.offer(value, source);
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
Tue Mar 17 13:27:44 2009
@@ -49,7 +49,6 @@
private final FlowController<V> sinkController;
private final Object mutex;
- private final AbstractFlowQueue whoToWakeup = this;
protected Mapper<K, V> keyMapper;
private long directs;
@@ -63,11 +62,11 @@
public void elementDispatched(V elem) {
}
- public void onFlowBlock(ISinkController<V> sink) {
+ public void onFlowBlock(ISinkController<?> sink) {
}
- public void onFlowResume(ISinkController<V> sinkController) {
- IFlowSink<V> sink = sinkController.getFlowSink();
+ public void onFlowResume(ISinkController<?> sinkController) {
+ IFlowSink<V> sink = (IFlowSink<V>)sinkController.getFlowSink();
synchronized (mutex) {
SubscriptionNode node = sinks.get(sink);
if (node != null) {
@@ -75,13 +74,13 @@
boolean notify = false;
if (node.cursor == null) {
readyDirectSubs.addLast(node);
- // System.out.println("Subscription state change:
un-ready direct -> ready direct: "+node);
+ //System.out.println("Subscription state change:
un-ready direct -> ready direct: "+node);
} else {
if (readyPollingSubs.isEmpty()) {
notify = !store.isEmpty();
}
readyPollingSubs.addLast(node);
- // System.out.println("Subscription state change:
un-ready polling -> ready polling: "+node);
+ //System.out.println("Subscription state change:
un-ready polling -> ready polling: "+node);
}
if (notify) {
@@ -108,6 +107,7 @@
public SharedQueue(String name, IFlowLimiter<V> limiter) {
this(name, limiter, new Object());
+ autoRelease = true;
}
/**
@@ -126,14 +126,14 @@
super.onFlowOpened(sinkController);
}
- public boolean offer(V elem, ISourceController<V> source) {
+ public boolean offer(V elem, ISourceController<?> source) {
return sinkController.offer(elem, source);
}
/**
* Performs a limited add to the queue.
*/
- public final void add(V value, ISourceController<V> source) {
+ public final void add(V value, ISourceController<?> source) {
sinkController.add(value, source);
}
@@ -170,7 +170,7 @@
sub.resumeAt(node);
unreadyPollingSubs.addLast(sub);
matchCount++;
- // System.out.println("Subscription state change:
un-ready direct -> un-ready polling: "+sub);
+ //System.out.println("Subscription state change:
un-ready direct -> un-ready polling: "+sub);
}
sub = next;
}
@@ -182,7 +182,7 @@
subNode.unlink();
subNode.resumeAt(node);
unreadyPollingSubs.addLast(subNode);
- // System.out.println("Subscription state change: ready
direct -> un-ready polling: "+subNode);
+ //System.out.println("Subscription state change: ready
direct -> un-ready polling: "+subNode);
}
matchCount += matches.size();
@@ -294,7 +294,7 @@
}
return true;
} else {
- // System.out.println("Subscription state change: ready
polling -> un-ready polling: "+subNode);
+ //System.out.println("Subscription state change: ready
polling -> un-ready polling: "+subNode);
// Subscription is no longer ready..
subNode.cursorUnPeek(storeNode);
subNode.unlink();
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -53,14 +53,14 @@
super.onFlowOpened(controller);
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return controller.offer(elem, source);
}
/**
* Performs a limited add to the queue.
*/
- public final void add(E elem, ISourceController<E> source) {
+ public final void add(E elem, ISourceController<?> source) {
controller.add(elem, source);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Tue Mar 17 13:27:44 2009
@@ -23,11 +23,11 @@
super.onFlowOpened(controller);
}
- public void add(E elem, ISourceController<E> source) {
+ public void add(E elem, ISourceController<?> source) {
controller.add(elem, source);
}
- public boolean offer(E elem, ISourceController<E> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return controller.offer(elem, source);
}