Author: jstrachan
Date: Wed Aug 9 03:28:38 2006
New Revision: 430023
URL: http://svn.apache.org/viewvc?rev=430023&view=rev
Log:
added patch submitted by Mathew Kuppe for AMQ-871 to allow the eviction
strategies to decide to evict multiple messages
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=430023&r1=430022&r2=430023&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Aug 9 03:28:38 2006
@@ -94,11 +94,21 @@
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() >
maximumPendingMessages) {
- MessageReference oldMessage =
messageEvictionStrategy.evictMessage(matched);
- oldMessage.decrementReferenceCount();
- discarded++;
- if (log.isDebugEnabled()) {
- log.debug("Discarding message " + oldMessage);
+ MessageReference[] oldMessages =
messageEvictionStrategy.evictMessages(matched);
+ int messagesToEvict = oldMessages.length;
+ for(int i = 0; i < messagesToEvict; i++) {
+ oldMessages[i].decrementReferenceCount();
+ discarded++;
+ if (log.isDebugEnabled()) {
+ log.debug("Discarding message " +
oldMessages[i]);
+ }
+ }
+
+ // lets avoid an infinite loop if we are given a
bad eviction strategy
+ // for a bad strategy lets just not evict
+ if (messagesToEvict == 0) {
+ log.warn("No messages to evict returned from
eviction strategy: " + messageEvictionStrategy);
+ break;
}
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java?rev=430023&r1=430022&r2=430023&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
Wed Aug 9 03:28:38 2006
@@ -36,7 +36,7 @@
* @throws IOException if an exception occurs such as reading a message
content (but should not ever happen
* as usually all the messages will be in RAM when this method is called).
*/
- MessageReference evictMessage(LinkedList messages) throws IOException;
+ MessageReference[] evictMessages(LinkedList messages) throws IOException;
/**
* REturns the high water mark on which we will eagerly evict expired
messages from RAM
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java?rev=430023&r1=430022&r2=430023&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
Wed Aug 9 03:28:38 2006
@@ -31,7 +31,7 @@
*/
public class OldestMessageEvictionStrategy extends
MessageEvictionStrategySupport {
- public MessageReference evictMessage(LinkedList messages) {
- return (MessageReference) messages.removeFirst();
+ public MessageReference[] evictMessages(LinkedList messages) {
+ return new MessageReference[] {(MessageReference)
messages.removeFirst()};
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java?rev=430023&r1=430022&r2=430023&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
Wed Aug 9 03:28:38 2006
@@ -32,7 +32,7 @@
*/
public class OldestMessageWithLowestPriorityEvictionStrategy extends
MessageEvictionStrategySupport {
- public MessageReference evictMessage(LinkedList messages) throws
IOException {
+ public MessageReference[] evictMessages(LinkedList messages) throws
IOException {
byte lowestPriority = Byte.MAX_VALUE;
int pivot = 0;
Iterator iter = messages.iterator();
@@ -44,6 +44,6 @@
pivot = i;
}
}
- return (MessageReference) messages.remove(pivot);
+ return new MessageReference[] {(MessageReference)
messages.remove(pivot)};
}
}