Author: dejanb
Date: Thu Dec 16 17:07:24 2010
New Revision: 1050059

URL: http://svn.apache.org/viewvc?rev=1050059&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3092 - Deleting a Queue from the 
console results in lost messages

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1050059&r1=1050058&r2=1050059&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Thu Dec 16 17:07:24 2010
@@ -492,7 +492,8 @@ public class RegionBroker extends EmptyB
     @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) 
throws Exception {
         message.setBrokerInTime(System.currentTimeMillis());
-        if (producerExchange.isMutable() || producerExchange.getRegion() == 
null) {
+        if (producerExchange.isMutable() || producerExchange.getRegion() == 
null
+                || (producerExchange.getRegion() != null && 
producerExchange.getRegion().getDestinationMap().get(message.getDestination()) 
== null)) {
             ActiveMQDestination destination = message.getDestination();
             // ensure the destination is registered with the RegionBroker
             
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(),
 destination,false);
@@ -514,6 +515,7 @@ public class RegionBroker extends EmptyB
                 throw createUnknownDestinationTypeException(destination);
             }
             producerExchange.setRegion(region);
+            producerExchange.setRegionDestination(null);
         }
         producerExchange.getRegion().send(producerExchange, message);
     }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java?rev=1050059&r1=1050058&r2=1050059&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
 Thu Dec 16 17:07:24 2010
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
@@ -111,6 +108,38 @@ public class PurgeTest extends EmbeddedB
         addCombinationValues("persistenceAdapter", new Object[] {new 
MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new 
JDBCPersistenceAdapter()});
     }
 
+    public void testDeleteSameProducer() throws Exception {
+        connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination();
+
+        MessageProducer producer = session.createProducer(destination);
+        Message message = session.createTextMessage("Test Message");
+        producer.send(message);
+
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        Message received = consumer.receive(1000);
+        assertEquals(message, received);
+
+        ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + 
":Type=Broker,BrokerName=localhost");
+        BrokerViewMBean brokerProxy = 
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
brokerViewMBeanName, BrokerViewMBean.class, true);
+
+        brokerProxy.removeQueue(getDestinationString());
+
+
+        producer.send(message);
+
+        received = consumer.receive(1000);
+
+        assertNotNull("Message not received", received);
+        assertEquals(message, received);
+
+
+    }
+
     public void testDelete() throws Exception {
         // Send some messages
         connection = connectionFactory.createConnection();


Reply via email to