Author: rajdavies
Date: Wed Sep 23 15:54:37 2009
New Revision: 818147
URL: http://svn.apache.org/viewvc?rev=818147&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2403
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Wed Sep 23 15:54:37 2009
@@ -47,6 +47,7 @@
protected final CopyOnWriteArrayList<Destination> destinations = new
CopyOnWriteArrayList<Destination>();
private BooleanExpression selectorExpression;
private ObjectName objectName;
+ private int cursorMemoryHighWaterMark = 70;
public AbstractSubscription(Broker broker,ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
@@ -211,6 +212,14 @@
}
+ public int getCursorMemoryHighWaterMark(){
+ return this.cursorMemoryHighWaterMark;
+ }
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
+ this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
+ }
+
public int countBeforeFull() {
return getDispatchedQueueSize() - info.getPrefetchSize();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Sep 23 15:54:37 2009
@@ -50,7 +50,7 @@
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
- private boolean producerFlowControl = false;
+ private boolean producerFlowControl = true;
protected boolean warnOnProducerFlowControl = true;
private int maxProducersToAudit = 1024;
private int maxAuditDepth = 2048;
@@ -73,6 +73,7 @@
protected DeadLetterStrategy deadLetterStrategy =
DEFAULT_DEAD_LETTER_STRATEGY;
protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
+ protected int cursorMemoryHighWaterMark = 70;
/**
* @param broker
@@ -375,6 +376,14 @@
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
+
+ public int getCursorMemoryHighWaterMark() {
+ return this.cursorMemoryHighWaterMark;
+ }
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
{
+ this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+ }
/**
* called when message is consumed
@@ -511,5 +520,4 @@
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws
Exception {
}
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Sep 23 15:54:37 2009
@@ -105,6 +105,10 @@
public void setMinimumMessageSize(int minimumMessageSize);
+ public int getCursorMemoryHighWaterMark();
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
/**
* optionally called by a Subscriber - to inform the Destination its
* ready for more messages
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Sep 23 15:54:37 2009
@@ -265,4 +265,12 @@
MessageDispatchNotification messageDispatchNotification) throws
Exception {
next.processDispatchNotification(messageDispatchNotification);
}
+
+ public int getCursorMemoryHighWaterMark() {
+ return next.getCursorMemoryHighWaterMark();
+ }
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
{
+ next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Sep 23 15:54:37 2009
@@ -54,6 +54,7 @@
super(broker,usageManager, context, info);
this.pending = new
StoreDurableSubscriberCursor(broker,context.getClientId(),
info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
+
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(),
info.getSubscriptionName());
@@ -115,6 +116,7 @@
}
synchronized (pending) {
pending.setSystemUsage(memoryManager);
+
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.start();
// If nothing was in the persistent store, then try to use the
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Sep 23 15:54:37 2009
@@ -525,6 +525,7 @@
this.pending = pending;
if (this.pending!=null) {
this.pending.setSystemUsage(usageManager);
+
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Sep 23 15:54:37 2009
@@ -191,6 +191,7 @@
messages.setMaxAuditDepth(getMaxAuditDepth());
messages.setMaxProducersToAudit(getMaxProducersToAudit());
messages.setUseCache(isUseCache());
+
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Wed Sep 23 15:54:37 2009
@@ -223,4 +223,8 @@
int countBeforeFull();
ConnectionContext getContext();
+
+ public int getCursorMemoryHighWaterMark();
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Wed Sep 23 15:54:37 2009
@@ -52,6 +52,7 @@
public void initialize() throws Exception {
this.messages=new VMPendingMessageCursor();
+
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " +
destination.getPhysicalName());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Sep 23 15:54:37 2009
@@ -76,6 +76,7 @@
public void init() throws Exception {
this.matched.setSystemUsage(usageManager);
+
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.matched.start();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Wed Sep 23 15:54:37 2009
@@ -249,6 +249,16 @@
nonPersistent.setUseCache(useCache);
}
}
+
+ public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
+ super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+ if (persistent != null) {
+ persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+ }
+ if (nonPersistent != null) {
+
nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+ }
+ }
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Sep 23 15:54:37 2009
@@ -33,6 +33,7 @@
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.derby.impl.sql.compile.GetCurrentConnectionNode;
/**
* Represents an entry in a {...@link PolicyMap} for assigning policies to a
@@ -59,7 +60,7 @@
private int maxAuditDepth=2048;
private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
- private boolean producerFlowControl = false;
+ private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@@ -82,6 +83,7 @@
private int
queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
private int
durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+ private int cursorMemoryHighWaterMark=70;
public void configure(Broker broker,Queue queue) {
@@ -140,6 +142,7 @@
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
destination.setMaxExpirePageSize(getMaxExpirePageSize());
+
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -177,8 +180,8 @@
String clientId = sub.getSubscriptionKey().getClientId();
String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
+ sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
//override prefetch size if not set by the Consumer
-
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
sub.setPrefetchSize(getDurableTopicPrefetch());
}
@@ -189,6 +192,7 @@
}
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
+
}
public void configure(Broker broker, SystemUsage memoryManager,
QueueBrowserSubscription sub) {
@@ -199,6 +203,7 @@
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
sub.setPrefetchSize(getQueueBrowserPrefetch());
}
+ sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
public void configure(Broker broker, SystemUsage memoryManager,
QueueSubscription sub) {
@@ -209,6 +214,7 @@
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
sub.setPrefetchSize(getQueuePrefetch());
}
+ sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
// Properties
@@ -661,6 +667,14 @@
public void setDurableTopicPrefetch(int durableTopicPrefetch) {
this.durableTopicPrefetch = durableTopicPrefetch;
}
+
+ public int getCursorMemoryHighWaterMark() {
+ return this.cursorMemoryHighWaterMark;
+ }
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark)
{
+ this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+ }
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java?rev=818147&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
Wed Sep 23 15:54:37 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.policy;
+
+import junit.framework.TestCase;
+
+public class PolicyConfigTest extends TestCase{
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Wed Sep 23 15:54:37 2009
@@ -285,6 +285,14 @@
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
}
+
+ public int getCursorMemoryHighWaterMark(){
+ return 0;
+ }
+
+ public void setCursorMemoryHighWaterMark(
+ int cursorMemoryHighWaterMark) {
+ }
};
queue.addSubscription(contextNotInTx, subscription);