Author: jstrachan
Date: Fri Aug 25 08:39:16 2006
New Revision: 436835
URL: http://svn.apache.org/viewvc?rev=436835&view=rev
Log:
applied patch from John Heitmann to fix AMQ-889 to avoid duplicate consumers
(such as on failover) leaking resources
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Fri Aug 25 08:39:16 2006
@@ -17,6 +17,8 @@
*/
package org.apache.activemq.broker.jmx;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
@@ -33,6 +35,7 @@
final ManagedRegionBroker broker;
private final BrokerService brokerService;
+ private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
public BrokerView(BrokerService brokerService, ManagedRegionBroker
managedBroker) throws Exception {
this.brokerService = brokerService;
@@ -156,7 +159,7 @@
ConsumerInfo info = new ConsumerInfo();
ConsumerId consumerId = new ConsumerId();
consumerId.setConnectionId(clientId);
- consumerId.setSessionId(0);
+ consumerId.setSessionId(sessionIdCounter.incrementAndGet());
consumerId.setValue(0);
info.setConsumerId(consumerId);
info.setDestination(new ActiveMQTopic(topicName));
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Aug 25 08:39:16 2006
@@ -43,11 +43,11 @@
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
- *
+ *
* @version $Revision: 1.14 $
*/
abstract public class AbstractRegion implements Region {
-
+
private static final Log log = LogFactory.getLog(AbstractRegion.class);
protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
@@ -60,7 +60,8 @@
protected boolean autoCreateDestinations=true;
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
-
+ protected final Map consumerChangeMutexMap = new HashMap();
+
public AbstractRegion(RegionBroker broker,DestinationStatistics
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory
taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.broker = broker;
this.destinationStatistics = destinationStatistics;
@@ -111,7 +112,7 @@
public void removeDestination(ConnectionContext
context,ActiveMQDestination destination,long timeout)
throws Exception{
-
+
// No timeout.. then try to shut down right way, fails if there are
current subscribers.
if( timeout == 0 ) {
for(Iterator
iter=subscriptions.values().iterator();iter.hasNext();){
@@ -121,19 +122,19 @@
}
}
}
-
+
if( timeout > 0 ) {
- // TODO: implement a way to notify the subscribers that we want to
take the down
+ // TODO: implement a way to notify the subscribers that we want to
take the down
// the destination and that they should un-subscribe.. Then wait
up to timeout time before
// dropping the subscription.
-
+
}
log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination);
if(dest!=null){
-
+
// timeout<0 or we timed out, we now force any remaining
subscriptions to un-subscribe.
for(Iterator
iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
@@ -141,20 +142,20 @@
dest.removeSubscription(context, sub);
}
}
-
+
destinationMap.removeAll(destination);
dest.dispose(context);
dest.stop();
-
+
}else{
log.debug("Destination doesn't exist: " + dest);
}
}
}
-
+
/**
* Provide an exact or wildcard lookup of destinations in the region
- *
+ *
* @return a set of matching destination objects.
*/
public Set getDestinations(ActiveMQDestination destination) {
@@ -162,7 +163,7 @@
return destinationMap.get(destination);
}
}
-
+
public Map getDestinationMap() {
synchronized(destinationsMutex){
return new HashMap(destinations);
@@ -177,43 +178,66 @@
// lets auto-create the destination
lookup(context, destination);
}
-
- Subscription sub = createSubscription(context, info);
- // We may need to add some destinations that are in persistent store
but not active
- // in the broker.
- //
- // TODO: think about this a little more. This is good cause
destinations are not loaded into
- // memory until a client needs to use the queue, but a management
agent viewing the
- // broker will not see a destination that exists in persistent store.
We may want to
- // eagerly load all destinations into the broker but have an inactive
state for the
- // destination which has reduced memory usage.
- //
- if( persistenceAdapter!=null ) {
- Set inactiveDests = getInactiveDestinations();
- for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
- ActiveMQDestination dest = (ActiveMQDestination) iter.next();
- if( sub.matches(dest) ) {
- context.getBroker().addDestination(context, dest);
- }
+ Object addGuard;
+ synchronized(consumerChangeMutexMap) {
+ addGuard = consumerChangeMutexMap.get(info.getConsumerId());
+ if (addGuard == null) {
+ addGuard = new Object();
+ consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
}
}
-
- subscriptions.put(info.getConsumerId(), sub);
+ synchronized (addGuard) {
+ Object o = subscriptions.get(info.getConsumerId());
+ if (o != null) {
+ log.warn("A duplicate subscription was detected. Clients may
be misbehaving. Later warnings you may see about subscription removal are a
consequence of this.");
+ return (Subscription)o;
+ }
- // Add the subscription to all the matching queues.
- for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
- dest.addSubscription(context, sub);
- }
+ Subscription sub = createSubscription(context, info);
- if( info.isBrowser() ) {
- ((QueueBrowserSubscription)sub).browseDone();
+ // We may need to add some destinations that are in persistent
store but not active
+ // in the broker.
+ //
+ // TODO: think about this a little more. This is good cause
destinations are not loaded into
+ // memory until a client needs to use the queue, but a management
agent viewing the
+ // broker will not see a destination that exists in persistent
store. We may want to
+ // eagerly load all destinations into the broker but have an
inactive state for the
+ // destination which has reduced memory usage.
+ //
+ if( persistenceAdapter!=null ) {
+ Set inactiveDests = getInactiveDestinations();
+ for (Iterator iter = inactiveDests.iterator();
iter.hasNext();) {
+ ActiveMQDestination dest = (ActiveMQDestination)
iter.next();
+ if( sub.matches(dest) ) {
+ context.getBroker().addDestination(context, dest);
+ }
+ }
+ }
+
+ subscriptions.put(info.getConsumerId(), sub);
+
+ // At this point we're done directly manipulating subscriptions,
+ // but we need to retain the synchronized block here. Consider
+ // otherwise what would happen if at this point a second
+ // thread added, then removed, as would be allowed with
+ // no mutex held. Remove is only essentially run once
+ // so everything after this point would be leaked.
+
+ // Add the subscription to all the matching queues.
+ for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+ Destination dest = (Destination) iter.next();
+ dest.addSubscription(context, sub);
+ }
+
+ if( info.isBrowser() ) {
+ ((QueueBrowserSubscription)sub).browseDone();
+ }
+
+ return sub;
}
-
- return sub;
}
-
+
/**
* Get all the Destinations that are in storage
* @return Set of all stored destinations
@@ -230,26 +254,29 @@
inactiveDests.removeAll( destinations.keySet() );
return inactiveDests;
}
-
+
public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
-
+
log.debug("Removing consumer: "+info.getConsumerId());
-
+
Subscription sub = (Subscription)
subscriptions.remove(info.getConsumerId());
if( sub==null )
throw new IllegalArgumentException("The subscription does not
exist: "+info.getConsumerId());
-
+
// remove the subscription from all the matching queues.
for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.removeSubscription(context, sub);
}
-
+
destroySubscription(sub);
-
+
+ synchronized (consumerChangeMutexMap) {
+ consumerChangeMutexMap.remove(info.getConsumerId());
+ }
}
- protected void destroySubscription(Subscription sub) {
+ protected void destroySubscription(Subscription sub) {
sub.destroy();
}
@@ -262,7 +289,7 @@
Destination dest = lookup(context, messageSend.getDestination());
dest.send(context, messageSend);
}
-
+
public void acknowledge(ConnectionContext context, MessageAck ack) throws
Exception {
Subscription sub = (Subscription)
subscriptions.get(ack.getConsumerId());
if( sub==null )
@@ -295,7 +322,7 @@
return dest;
}
}
-
+
public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Exception{
Subscription sub = (Subscription)
subscriptions.get(messageDispatchNotification.getConsumerId());
if (sub != null){
@@ -306,23 +333,23 @@
for (Iterator iter = subscriptions.values().iterator();
iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
sub.gc();
- }
+ }
for (Iterator iter = destinations.values() .iterator();
iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.gc();
- }
+ }
}
protected abstract Subscription createSubscription(ConnectionContext
context, ConsumerInfo info) throws Exception;
abstract protected Destination createDestination(ConnectionContext
context, ActiveMQDestination destination) throws Exception;
- public boolean isAutoCreateDestinations() {
+ public boolean isAutoCreateDestinations() {
return autoCreateDestinations;
}
public void setAutoCreateDestinations(boolean autoCreateDestinations) {
this.autoCreateDestinations = autoCreateDestinations;
}
-
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Fri Aug 25 08:39:16 2006
@@ -105,6 +105,10 @@
else {
super.addConsumer(context, info);
sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+ if (sub == null) {
+ throw new JMSException("Cannot use the same consumerId: "
+ info.getConsumerId() + " for two different durable subscriptions clientID: "
+ + key.getClientId() + " subscriberName: " +
key.getSubscriptionName());
+ }
}
sub.activate(context, info);
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java?rev=436835&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
Fri Aug 25 08:39:16 2006
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+import junit.framework.Test;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.network.NetworkTestSupport;
+
+/**
+ * Pretend to be an abusive client that sends multiple
+ * identical ConsumerInfo commands and make sure the
+ * broker doesn't stall because of it.
+ */
+
+public class DoubleSubscriptionTest extends NetworkTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+
+ private String remoteURI =
"tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+
+ public static Test suite() {
+ return suite(DoubleSubscriptionTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void initCombosForTestDoubleSubscription() {
+ addCombinationValues("destination", new Object[] { new
ActiveMQQueue("TEST"), new ActiveMQQueue("TEST"), });
+ }
+ public void testDoubleSubscription() throws Exception {
+
+ // Start a normal consumer on the remote broker
+ StubConnection connection1 = createRemoteConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.request(consumerInfo1);
+
+ // Start a normal producer on a remote broker
+ StubConnection connection2 = createRemoteConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(producerInfo2);
+
+ // Send a message to make sure the basics are working
+ connection2.request(createMessage(producerInfo2, destination,
DeliveryMode.PERSISTENT));
+
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertNoMessagesLeft(connection1);
+
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ // Send a message to sit on the broker while we mess with it
+ connection2.request(createMessage(producerInfo2, destination,
DeliveryMode.PERSISTENT));
+
+ // Now we're going to resend the same consumer commands again and see
if the broker
+ // can handle it.
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.request(consumerInfo1);
+
+ // After this there should be 2 messages on the broker...
+ connection2.request(createMessage(producerInfo2, destination,
DeliveryMode.PERSISTENT));
+
+ // ... let's start a fresh consumer...
+ connection1.stop();
+ StubConnection connection3 = createRemoteConnection();
+ ConnectionInfo connectionInfo3 = createConnectionInfo();
+ SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+ ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3,
destination);
+ connection3.send(connectionInfo3);
+ connection3.send(sessionInfo3);
+ connection3.request(consumerInfo3);
+
+ // ... and then grab the 2 that should be there.
+ assertNotNull(receiveMessage(connection3));
+ assertNotNull(receiveMessage(connection3));
+ assertNoMessagesLeft(connection3);
+ }
+
+ protected String getRemoteURI() {
+ return remoteURI;
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL