Author: jstrachan
Date: Fri Aug 25 03:38:47 2006
New Revision: 436748
URL: http://svn.apache.org/viewvc?rev=436748&view=rev
Log:
applied patch from John Heitmann, many thanks - which fixes AMQ-515
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=436748&r1=436747&r2=436748&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Aug 25 03:38:47 2006
@@ -184,6 +184,7 @@
// while
// removing up a subscription.
dispatchValve.turnOff();
+
try {
synchronized (consumers) {
@@ -246,8 +247,13 @@
public void send(final ConnectionContext context, final Message message)
throws Exception {
- if (context.isProducerFlowControl())
+ if (context.isProducerFlowControl()) {
+ if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
+ throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
+ } else {
usageManager.waitForSpace();
+ }
+ }
message.setRegionDestination(this);
@@ -606,7 +612,5 @@
}
return false;
}
-
-
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=436748&r1=436747&r2=436748&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Aug 25 03:38:47 2006
@@ -232,8 +232,13 @@
public void send(final ConnectionContext context, final Message message)
throws Exception {
- if (context.isProducerFlowControl())
- usageManager.waitForSpace();
+ if (context.isProducerFlowControl()) {
+ if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
+ throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
+ } else {
+ usageManager.waitForSpace();
+ }
+ }
message.setRegionDestination(this);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?rev=436748&r1=436747&r2=436748&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Aug 25 03:38:47 2006
@@ -50,6 +50,11 @@
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
+ private boolean sendFailIfNoSpace;
+
+ /** True if someone called setSendFailIfNoSpace() on this particular usage
manager */
+ private boolean sendFailIfNoSpaceExplicitySet;
+
public UsageManager() {
this(null);
}
@@ -206,6 +211,22 @@
}
}
+ /**
+ * Sets whether or not a send() should fail if there is no space free. The
default
+ * value is false which means to block the send() method until space
becomes available
+ */
+ public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
+ sendFailIfNoSpaceExplicitySet = true;
+ this.sendFailIfNoSpace = failProducerIfNoSpace;
+ }
+
+ public boolean isSendFailIfNoSpace() {
+ if (sendFailIfNoSpaceExplicitySet || parent == null) {
+ return sendFailIfNoSpace;
+ } else {
+ return parent.isSendFailIfNoSpace();
+ }
+ }
private void setPercentUsage(int value) {
int oldValue = percentUsage;