Author: rajdavies
Date: Tue Mar 18 07:32:41 2008
New Revision: 638385
URL: http://svn.apache.org/viewvc?rev=638385&view=rev
Log:
change reference count boundaries around messages - so they
are around acks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Mar 18 07:32:41 2008
@@ -610,15 +610,4 @@
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
-
-
- public List<MessageReference> getInFlightMessages(){
- List<MessageReference> result = new ArrayList<MessageReference>();
- synchronized(pendingLock) {
- result.addAll(dispatched);
- result.addAll(pending.pageInList(1000));
- }
- return result;
- }
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar 18 07:32:41 2008
@@ -944,6 +944,7 @@
reference.drop();
acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement();
+ reference.decrementReferenceCount();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
@@ -1034,6 +1035,7 @@
if (dispatchSelector.canSelect(s, node)) {
if (!s.isFull()) {
s.add(node);
+ node.incrementReferenceCount();
target = s;
break;
} else {
@@ -1055,6 +1057,7 @@
}
if (target != null) {
target.add(node);
+ node.incrementReferenceCount();
}
}
if (target != null && !strictOrderDispatch &&
consumers.size() > 1 &&
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Mar 18 07:32:41 2008
@@ -108,59 +108,6 @@
}
/**
- * Override so that the message ref count is > 0 only when the message is
- * being dispatched to a client. Keeping it at 0 when it is in the pending
- * list allows the message to be swapped out to disk.
- *
- * @return true if the message was dispatched.
- */
- protected boolean dispatch(MessageReference node) throws IOException {
- boolean rc = false;
- // This brings the message into memory if it was swapped out.
- node.incrementReferenceCount();
- try {
- rc = super.dispatch(node);
- } finally {
- // If the message was dispatched, it could be getting dispatched
- // async, so we
- // can only drop the reference count when that completes @see
- // onDispatch
- if (!rc) {
- node.decrementReferenceCount();
- }
- }
- return rc;
- }
-
- /**
- * OK Message was transmitted, we can now drop the reference count.
- *
- * @see
org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference,
- * org.apache.activemq.command.Message)
- */
- protected void onDispatch(MessageReference node, Message message) {
- // Now that the message has been sent over the wire to the client,
- // we can let it get swapped out.
- node.decrementReferenceCount();
- super.onDispatch(node, message);
- }
-
- /**
- * Sending a message to the DQL will require us to increment the ref count
- * so we can get at the content.
- */
- protected void sendToDLQ(ConnectionContext context, MessageReference node)
throws IOException, Exception {
- // This brings the message into memory if it was swapped out.
- node.incrementReferenceCount();
- try {
- super.sendToDLQ(context, node);
- } finally {
- // This let's the message be swapped out of needed.
- node.decrementReferenceCount();
- }
- }
-
- /**
*/
public void destroy() {
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Mar 18 07:32:41 2008
@@ -215,11 +215,4 @@
* @return true if a browser
*/
boolean isBrowser();
-
- /**
- * Get the list of in flight messages
- * @return list
- */
- List<MessageReference> getInFlightMessages();
-
}