Author: jstrachan
Date: Fri Aug 25 08:39:16 2006
New Revision: 436835

URL: http://svn.apache.org/viewvc?rev=436835&view=rev
Log:
applied patch from John Heitmann to fix AMQ-889 to avoid duplicate consumers 
(such as on failover) leaking resources

Added:
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
   (with props)
Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
 Fri Aug 25 08:39:16 2006
@@ -17,6 +17,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.Broker;
@@ -33,6 +35,7 @@
     
     final ManagedRegionBroker broker;
        private final BrokerService brokerService;
+    private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
 
     public BrokerView(BrokerService brokerService, ManagedRegionBroker 
managedBroker) throws Exception {
         this.brokerService = brokerService;
@@ -156,7 +159,7 @@
         ConsumerInfo info = new ConsumerInfo();
         ConsumerId consumerId = new ConsumerId();
         consumerId.setConnectionId(clientId);
-        consumerId.setSessionId(0);
+        consumerId.setSessionId(sessionIdCounter.incrementAndGet());
         consumerId.setValue(0);
         info.setConsumerId(consumerId);
         info.setDestination(new ActiveMQTopic(topicName));

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Fri Aug 25 08:39:16 2006
@@ -43,11 +43,11 @@
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 /**
- * 
+ *
  * @version $Revision: 1.14 $
  */
 abstract public class AbstractRegion implements Region {
-    
+
     private static final Log log = LogFactory.getLog(AbstractRegion.class);
 
     protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
@@ -60,7 +60,8 @@
     protected boolean autoCreateDestinations=true;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final Object destinationsMutex = new Object();
-    
+    protected final Map consumerChangeMutexMap = new HashMap();
+
     public AbstractRegion(RegionBroker broker,DestinationStatistics 
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory 
taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
         this.broker = broker;
         this.destinationStatistics = destinationStatistics;
@@ -111,7 +112,7 @@
 
     public void removeDestination(ConnectionContext 
context,ActiveMQDestination destination,long timeout)
                     throws Exception{
-        
+
         // No timeout.. then try to shut down right way, fails if there are 
current subscribers.
         if( timeout == 0 ) {
             for(Iterator 
iter=subscriptions.values().iterator();iter.hasNext();){
@@ -121,19 +122,19 @@
                 }
             }
         }
-        
+
         if( timeout > 0 ) {
-            // TODO: implement a way to notify the subscribers that we want to 
take the down 
+            // TODO: implement a way to notify the subscribers that we want to 
take the down
             // the destination and that they should un-subscribe..  Then wait 
up to timeout time before
             // dropping the subscription.
-        
+
         }
 
         log.debug("Removing destination: "+destination);
         synchronized(destinationsMutex){
             Destination dest=(Destination) destinations.remove(destination);
             if(dest!=null){
-                
+
                 // timeout<0 or we timed out, we now force any remaining 
subscriptions to un-subscribe.
                 for(Iterator 
iter=subscriptions.values().iterator();iter.hasNext();){
                     Subscription sub=(Subscription) iter.next();
@@ -141,20 +142,20 @@
                         dest.removeSubscription(context, sub);
                     }
                 }
-                
+
                 destinationMap.removeAll(destination);
                 dest.dispose(context);
                 dest.stop();
-                
+
             }else{
                 log.debug("Destination doesn't exist: " + dest);
             }
         }
     }
-    
+
     /**
      * Provide an exact or wildcard lookup of destinations in the region
-     * 
+     *
      * @return a set of matching destination objects.
      */
     public Set getDestinations(ActiveMQDestination destination) {
@@ -162,7 +163,7 @@
             return destinationMap.get(destination);
         }
     }
-    
+
     public Map getDestinationMap() {
         synchronized(destinationsMutex){
             return new HashMap(destinations);
@@ -177,43 +178,66 @@
             // lets auto-create the destination
             lookup(context, destination);
         }
-        
-        Subscription sub = createSubscription(context, info);
 
-        // We may need to add some destinations that are in persistent store 
but not active 
-        // in the broker.
-        //
-        // TODO: think about this a little more.  This is good cause 
destinations are not loaded into 
-        // memory until a client needs to use the queue, but a management 
agent viewing the 
-        // broker will not see a destination that exists in persistent store.  
We may want to
-        // eagerly load all destinations into the broker but have an inactive 
state for the
-        // destination which has reduced memory usage.
-        //
-        if( persistenceAdapter!=null ) {
-            Set inactiveDests = getInactiveDestinations();
-            for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
-                ActiveMQDestination dest = (ActiveMQDestination) iter.next();
-                if( sub.matches(dest) ) {
-                    context.getBroker().addDestination(context, dest);
-                }
+        Object addGuard;
+        synchronized(consumerChangeMutexMap) {
+            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
+            if (addGuard == null) {
+                addGuard = new Object();
+                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
             }
         }
-                
-        subscriptions.put(info.getConsumerId(), sub);
+        synchronized (addGuard) {
+            Object o = subscriptions.get(info.getConsumerId());
+            if (o != null) {
+                log.warn("A duplicate subscription was detected. Clients may 
be misbehaving. Later warnings you may see about subscription removal are a 
consequence of this.");
+                return (Subscription)o;
+            }
 
-        // Add the subscription to all the matching queues.
-        for (Iterator iter = 
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
-            Destination dest = (Destination) iter.next();            
-            dest.addSubscription(context, sub);
-        }        
+            Subscription sub = createSubscription(context, info);
 
-        if( info.isBrowser() ) {
-            ((QueueBrowserSubscription)sub).browseDone();
+            // We may need to add some destinations that are in persistent 
store but not active
+            // in the broker.
+            //
+            // TODO: think about this a little more.  This is good cause 
destinations are not loaded into
+            // memory until a client needs to use the queue, but a management 
agent viewing the
+            // broker will not see a destination that exists in persistent 
store.  We may want to
+            // eagerly load all destinations into the broker but have an 
inactive state for the
+            // destination which has reduced memory usage.
+            //
+            if( persistenceAdapter!=null ) {
+                Set inactiveDests = getInactiveDestinations();
+                for (Iterator iter = inactiveDests.iterator(); 
iter.hasNext();) {
+                    ActiveMQDestination dest = (ActiveMQDestination) 
iter.next();
+                    if( sub.matches(dest) ) {
+                        context.getBroker().addDestination(context, dest);
+                    }
+                }
+            }
+
+            subscriptions.put(info.getConsumerId(), sub);
+
+            // At this point we're done directly manipulating subscriptions,
+            // but we need to retain the synchronized block here. Consider
+            // otherwise what would happen if at this point a second
+            // thread added, then removed, as would be allowed with
+            // no mutex held. Remove is only essentially run once
+            // so everything after this point would be leaked.
+
+            // Add the subscription to all the matching queues.
+            for (Iterator iter = 
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+                Destination dest = (Destination) iter.next();
+                dest.addSubscription(context, sub);
+            }
+
+            if( info.isBrowser() ) {
+                ((QueueBrowserSubscription)sub).browseDone();
+            }
+
+            return sub;
         }
-        
-        return sub;
     }
-    
+
     /**
      * Get all the Destinations that are in storage
      * @return Set of all stored destinations
@@ -230,26 +254,29 @@
         inactiveDests.removeAll( destinations.keySet() );
         return inactiveDests;
     }
-    
+
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) 
throws Exception {
-       
+
         log.debug("Removing consumer: "+info.getConsumerId());
-        
+
         Subscription sub = (Subscription) 
subscriptions.remove(info.getConsumerId());
         if( sub==null )
             throw new IllegalArgumentException("The subscription does not 
exist: "+info.getConsumerId());
-        
+
         // remove the subscription from all the matching queues.
         for (Iterator iter = 
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
             Destination dest = (Destination) iter.next();
             dest.removeSubscription(context, sub);
         }
-        
+
         destroySubscription(sub);
-        
+
+        synchronized (consumerChangeMutexMap) {
+            consumerChangeMutexMap.remove(info.getConsumerId());
+        }
     }
 
-    protected void destroySubscription(Subscription sub) {        
+    protected void destroySubscription(Subscription sub) {
         sub.destroy();
     }
 
@@ -262,7 +289,7 @@
         Destination dest = lookup(context, messageSend.getDestination());
         dest.send(context, messageSend);
     }
-    
+
     public void acknowledge(ConnectionContext context, MessageAck ack) throws 
Exception {
         Subscription sub = (Subscription) 
subscriptions.get(ack.getConsumerId());
         if( sub==null )
@@ -295,7 +322,7 @@
             return dest;
         }
     }
-    
+
     public void processDispatchNotification(MessageDispatchNotification 
messageDispatchNotification) throws Exception{
         Subscription sub = (Subscription) 
subscriptions.get(messageDispatchNotification.getConsumerId());
         if (sub != null){
@@ -306,23 +333,23 @@
         for (Iterator iter = subscriptions.values().iterator(); 
iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();
             sub.gc();
-        }        
+        }
         for (Iterator iter = destinations.values()  .iterator(); 
iter.hasNext();) {
             Destination dest = (Destination) iter.next();
             dest.gc();
-        }        
+        }
     }
 
     protected abstract Subscription createSubscription(ConnectionContext 
context, ConsumerInfo info) throws Exception;
     abstract protected Destination createDestination(ConnectionContext 
context, ActiveMQDestination destination) throws Exception;
 
-    public boolean isAutoCreateDestinations() { 
+    public boolean isAutoCreateDestinations() {
         return autoCreateDestinations;
     }
 
     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
         this.autoCreateDestinations = autoCreateDestinations;
     }
-    
+
 
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=436835&r1=436834&r2=436835&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
 Fri Aug 25 08:39:16 2006
@@ -105,6 +105,10 @@
             else {
                 super.addConsumer(context, info);
                 sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+                if (sub == null) {
+                    throw new JMSException("Cannot use the same consumerId: " 
+ info.getConsumerId() + " for two different durable subscriptions clientID: "
+                            + key.getClientId() + " subscriberName: " + 
key.getSubscriptionName());
+                }
             }
             
             sub.activate(context, info);

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java?rev=436835&view=auto
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
 Fri Aug 25 08:39:16 2006
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+import junit.framework.Test;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.network.NetworkTestSupport;
+
+/**
+ * Pretend to be an abusive client that sends multiple
+ * identical ConsumerInfo commands and make sure the
+ * broker doesn't stall because of it.
+ */
+
+public class DoubleSubscriptionTest extends NetworkTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+
+    private String remoteURI = 
"tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+
+    public static Test suite() {
+        return suite(DoubleSubscriptionTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestDoubleSubscription() {
+        addCombinationValues("destination", new Object[] { new 
ActiveMQQueue("TEST"), new ActiveMQQueue("TEST"), });
+    }
+    public void testDoubleSubscription() throws Exception {
+
+        // Start a normal consumer on the remote broker
+        StubConnection connection1 = createRemoteConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // Start a normal producer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.request(producerInfo2);
+
+        // Send a message to make sure the basics are working
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNoMessagesLeft(connection1);
+
+        connection1.send(createAck(consumerInfo1, m1, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // Send a message to sit on the broker while we mess with it
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        // Now we're going to resend the same consumer commands again and see 
if the broker
+        // can handle it.
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // After this there should be 2 messages on the broker...
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        // ... let's start a fresh consumer...
+        connection1.stop();
+        StubConnection connection3 = createRemoteConnection();
+        ConnectionInfo connectionInfo3 = createConnectionInfo();
+        SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, 
destination);
+        connection3.send(connectionInfo3);
+        connection3.send(sessionInfo3);
+        connection3.request(consumerInfo3);
+
+        // ... and then grab the 2 that should be there.
+        assertNotNull(receiveMessage(connection3));
+        assertNotNull(receiveMessage(connection3));
+        assertNoMessagesLeft(connection3);
+    }
+
+    protected String getRemoteURI() {
+        return remoteURI;
+    }
+
+}

Propchange: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL


Reply via email to