Author: jstrachan
Date: Fri Apr 7 10:58:17 2006
New Revision: 392349
URL: http://svn.apache.org/viewcvs?rev=392349&view=rev
Log:
added a hook to eagerly evict expired messages on non-durable topics first
before we apply other eviction policies such as old messages etc
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Fri Apr 7 10:58:17 2006
@@ -59,6 +59,8 @@
private int referenceCount;
/** the size of the message **/
private int cachedSize = 0;
+ /** the expiration time of the message */
+ private long expiration;
/**
* Only used by the END_OF_BROWSE_MARKER singleton
@@ -71,6 +73,7 @@
this.groupID = null;
this.groupSequence = 0;
this.targetConsumerId=null;
+ this.expiration = message.getExpiration();
this.cachedSize = message != null ? message.getSize() : 0;
}
@@ -82,6 +85,7 @@
this.groupID = message.getGroupID();
this.groupSequence = message.getGroupSequence();
this.targetConsumerId=message.getTargetConsumerId();
+ this.expiration = message.getExpiration();
this.referenceCount=1;
message.incrementReferenceCount();
@@ -205,6 +209,18 @@
public ConsumerId getTargetConsumerId() {
return targetConsumerId;
+ }
+
+ public long getExpiration() {
+ return expiration;
+ }
+
+ public boolean isExpired() {
+ long expireTime = getExpiration();
+ if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+ return true;
+ }
+ return false;
}
public int getSize(){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
Fri Apr 7 10:58:17 2006
@@ -47,5 +47,11 @@
public int decrementReferenceCount();
public ConsumerId getTargetConsumerId();
public int getSize();
+ public long getExpiration();
+
+ /**
+ * Returns true if this message is expired
+ */
+ public boolean isExpired();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Apr 7 10:58:17 2006
@@ -72,18 +72,44 @@
synchronized(matchedListMutex){
matched.addLast(node);
// NOTE - be careful about the slaveBroker!
- if(maximumPendingMessages>0){
+ if (maximumPendingMessages > 0) {
+
+ // calculate the high water mark from which point we
will eagerly evict expired messages
+ int max =
messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
+ if (maximumPendingMessages > 0 &&
maximumPendingMessages < max) {
+ max = maximumPendingMessages;
+ }
+ if (!matched.isEmpty() && matched.size() > max) {
+ removeExpiredMessages(matched);
+ }
+
// lets discard old messages as we are a slow consumer
-
while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
- MessageReference
oldMessage=messageEvictionStrategy.evictMessage(matched);
+ while (!matched.isEmpty() && matched.size() >
maximumPendingMessages) {
+ MessageReference oldMessage =
messageEvictionStrategy.evictMessage(matched);
oldMessage.decrementReferenceCount();
discarded++;
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Discarding message " + oldMessage);
}
}
}
}
+ }
+ }
+ }
+
+ /**
+ * Discard any expired messages from the matched list. Called from a
synchronized block.
+ * @throws IOException
+ */
+ protected void removeExpiredMessages(LinkedList messages) throws
IOException {
+ for(Iterator i=matched.iterator();i.hasNext();){
+ MessageReference node=(MessageReference) i.next();
+ if (node.isExpired()) {
+ i.remove();
+ dispatched.incrementAndGet();
+ node.decrementReferenceCount();
+ break;
}
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
Fri Apr 7 10:58:17 2006
@@ -37,4 +37,9 @@
*/
MessageReference evictMessage(LinkedList messages) throws IOException;
+ /**
+ * REturns the high water mark on which we will eagerly evict expired
messages from RAM
+ */
+ int getEvictExpiredMessagesHighWatermark();
+
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java?rev=392349&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
Fri Apr 7 10:58:17 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+/**
+ * A useful base class for implementation inheritence.
+ *
+ * @version $Revision$
+ */
+public abstract class MessageEvictionStrategySupport implements
MessageEvictionStrategy {
+
+ private int evictExpiredMessagesHighWatermark = 1000;
+
+ public int getEvictExpiredMessagesHighWatermark() {
+ return evictExpiredMessagesHighWatermark;
+ }
+
+ /**
+ * Sets the high water mark on which we will eagerly evict expired
messages from RAM
+ */
+ public void setEvictExpiredMessagesHighWatermark(int
evictExpiredMessagesHighWaterMark) {
+ this.evictExpiredMessagesHighWatermark =
evictExpiredMessagesHighWaterMark;
+ }
+
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java
Fri Apr 7 10:58:17 2006
@@ -28,7 +28,7 @@
*
* @version $Revision$
*/
-public class OldestMessageEvictionStrategy implements MessageEvictionStrategy {
+public class OldestMessageEvictionStrategy extends
MessageEvictionStrategySupport {
public MessageReference evictMessage(LinkedList messages) {
return (MessageReference) messages.removeFirst();
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java?rev=392349&r1=392348&r2=392349&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java
Fri Apr 7 10:58:17 2006
@@ -29,7 +29,7 @@
*
* @version $Revision$messageEvictionStrategy
*/
-public class OldestMessageWithLowestPriorityEvictionStrategy implements
MessageEvictionStrategy {
+public class OldestMessageWithLowestPriorityEvictionStrategy extends
MessageEvictionStrategySupport {
public MessageReference evictMessage(LinkedList messages) throws
IOException {
byte lowestPriority = Byte.MAX_VALUE;