Author: chirino
Date: Tue Mar 25 09:45:00 2008
New Revision: 640890
URL: http://svn.apache.org/viewvc?rev=640890&view=rev
Log:
In the queue case, when a consumer was closed it was not properly re-delivering
messages to other available consumers. This was causing message to look like
they got dropped.
- When we shut a queue sub down we now get it's pending+dispatched list and
re-dispatch that to the other available subscriptions.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Tue Mar 25 09:45:00 2008
@@ -17,6 +17,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
@@ -104,8 +107,9 @@
destinations.add(destination);
}
- public void remove(ConnectionContext context, Destination destination)
throws Exception {
+ public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
destinations.remove(destination);
+ return Collections.EMPTY_LIST;
}
public ConsumerInfo getConsumerInfo() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Mar 25 09:45:00 2008
@@ -436,11 +436,18 @@
}
}
- public void remove(ConnectionContext context, Destination destination)
throws Exception {
+ public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
+ List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
- pending.remove(context, destination);
+ for (MessageReference r : dispatched) {
+ if( r.getRegionDestination() == destination ) {
+ rc.add((QueueMessageReference)r);
+ }
+ }
+ rc.addAll(pending.remove(context, destination));
}
+ return rc;
}
protected void dispatchPending() throws IOException {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar 25 09:45:00 2008
@@ -292,25 +292,20 @@
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners()
.removeConsumer(consumerId);
+
// redeliver inflight messages
- sub.remove(context, this);
-
List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>();
- List<QueueMessageReference> inFlight = null;
- synchronized(pagedInMessages) {
- inFlight = new
ArrayList<QueueMessageReference>(pagedInMessages.values());
- }
-
- for (QueueMessageReference node:inFlight){
- if (!node.isDropped() && !node.isAcked()
- && node.getLockOwner() == sub) {
- if (node.unlock()) {
- node.incrementRedeliveryCounter();
- list.add(node);
- }
+ for (MessageReference ref : sub.remove(context, this)) {
+ QueueMessageReference qmr = (QueueMessageReference)ref;
+ qmr.incrementRedeliveryCounter();
+ if( qmr.getLockOwner()==sub ) {
+ qmr.unlock();
+ qmr.incrementRedeliveryCounter();
}
+ list.add(qmr);
}
- if (list != null && !consumers.isEmpty()) {
+
+ if (!list.isEmpty() && !consumers.isEmpty()) {
doDispatch(list);
}
}
@@ -938,6 +933,7 @@
if( rd.subscription instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
}
+
} catch (Exception e) {
e.printStackTrace();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Mar 25 09:45:00 2008
@@ -87,8 +87,9 @@
* The subscription will be no longer be receiving messages from the
destination.
* @param context
* @param destination
+ * @return a list of un-acked messages that were added to the subscription.
*/
- void remove(ConnectionContext context, Destination destination) throws
Exception;
+ List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception;
/**
* The ConsumerInfo object that created the subscription.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -16,12 +16,15 @@
*/
package org.apache.activemq.broker.region.cursors;
+import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@@ -59,7 +62,9 @@
public void add(ConnectionContext context, Destination destination) throws
Exception {
}
- public void remove(ConnectionContext context, Destination destination)
throws Exception {
+ @SuppressWarnings("unchecked")
+ public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
+ return Collections.EMPTY_LIST;
}
public boolean isRecoveryRequired() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -110,6 +110,8 @@
}
return isDiskListEmpty();
}
+
+
/**
* reset the cursor
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -18,12 +18,14 @@
import java.io.IOException;
import java.util.LinkedList;
+import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@@ -51,7 +53,7 @@
* @param destination
* @throws Exception
*/
- void remove(ConnectionContext context, Destination destination) throws
Exception;
+ List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception;
/**
* @return true if there are no pending messages
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Mar 25 09:45:00 2008
@@ -17,9 +17,11 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
@@ -27,6 +29,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@@ -128,11 +131,12 @@
* @param destination
* @throws Exception
*/
- public synchronized void remove(ConnectionContext context, Destination
destination) throws Exception {
+ public synchronized List<MessageReference> remove(ConnectionContext
context, Destination destination) throws Exception {
Object tsp = topics.remove(destination);
if (tsp != null) {
storePrefetches.remove(tsp);
}
+ return Collections.EMPTY_LIST;
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Tue Mar 25 09:45:00 2008
@@ -16,8 +16,13 @@
*/
package org.apache.activemq.broker.region.cursors;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
@@ -32,6 +37,18 @@
private Iterator<MessageReference> iter;
private MessageReference last;
+
+ @Override
+ public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
+ List<MessageReference> rc = new ArrayList<MessageReference>();
+ for (MessageReference r : list) {
+ if( r.getRegionDestination()==destination ) {
+ rc.add(r);
+ }
+ }
+ return rc ;
+ }
+
/**
* @return true if there are no pending messages
*/