Author: robbie
Date: Tue Mar 30 11:50:18 2010
New Revision: 929095

URL: http://svn.apache.org/viewvc?rev=929095&view=rev
Log:
QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around 
the change and unsubscription of durable subscriptions

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
    qpid/trunk/qpid/java/test-profiles/Excludes
    qpid/trunk/qpid/java/test-profiles/JavaExcludes

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
 Tue Mar 30 11:50:18 2010
@@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
 
 import javax.jms.*;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
             durConnection2.close();
         }
     }
+    
+    /**
+     * create and register a durable subscriber without a message selector and 
then unsubscribe it
+     * create and register a durable subscriber with a message selector and 
then close it
+     * restart the broker
+     * send matching and non matching messages
+     * recreate and register the durable subscriber with a message selector
+     * verify only the matching messages are received
+     */
+    public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent 
store");
+            return;
+        }
+        
+        final String SUB_NAME=getTestQueueName();
+        
+        TopicConnectionFactory factory = getConnectionFactory();
+        Topic topic = (Topic) getInitialContext().lookup(_topicName);
+        
+        //create and register a durable subscriber then unsubscribe it
+        TopicConnection durConnection = factory.createTopicConnection("guest", 
"guest");
+        TopicSession durSession = durConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, 
SUB_NAME);
+        durConnection.start();
+        durSub1.close();
+        durSession.unsubscribe(SUB_NAME);
+        durSession.close();
+        durConnection.close();
+
+        //create and register a durable subscriber with a message selector and 
then close it
+        TopicConnection durConnection2 = 
factory.createTopicConnection("guest", "guest");
+        TopicSession durSession2 = durConnection2.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, 
SUB_NAME, "testprop='true'", false);
+        durConnection2.start();
+        durSub2.close();
+        durSession2.close();
+        durConnection2.close();
+        
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        //send messages matching and not matching the selector
+        TopicConnection pubConnection = factory.createTopicConnection("guest", 
"guest");
+        TopicSession pubSession = pubConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher publisher = pubSession.createPublisher(topic);
+        for (int i = 0; i < 5; i++)
+        {
+            Message message = pubSession.createMessage();
+            message.setStringProperty("testprop", "true");
+            publisher.publish(message);
+            message = pubSession.createMessage();
+            message.setStringProperty("testprop", "false");
+            publisher.publish(message);
+        }
+        publisher.close();
+        pubSession.close();
+
+        //now recreate the durable subscriber with selector to check there are 
no exceptions generated
+        //and then verify the messages are received correctly
+        TopicConnection durConnection3 = (TopicConnection) 
factory.createConnection("guest", "guest");
+        TopicSession durSession3 = (TopicSession) 
durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, 
SUB_NAME, "testprop='true'", false);
+        durConnection3.start();
+        
+        for (int i = 0; i < 5; i++)
+        {
+            Message message = durSub3.receive(2000);
+            if (message == null)
+            {
+                fail("testDurSubChangedToHaveSelectorThenRestart test failed. 
Expected message " + i + " was not returned");
+            }
+            else
+            {
+                assertTrue("testDurSubChangedToHaveSelectorThenRestart test 
failed. Got message not matching selector",
+                           
message.getStringProperty("testprop").equals("true"));
+            }
+        }
+
+        durSub3.close();
+        durSession3.unsubscribe(SUB_NAME);
+        durSession3.close();
+        durConnection3.close();
+    }
+
+    
+    /**
+     * create and register a durable subscriber with a message selector and 
then unsubscribe it
+     * create and register a durable subscriber without a message selector and 
then close it
+     * restart the broker
+     * send matching and non matching messages
+     * recreate and register the durable subscriber without a message selector
+     * verify ALL the sent messages are received
+     */
+    public void testDurSubChangedToNotHaveSelectorThenRestart() throws 
Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent 
store");
+            return;
+        }
+        
+        final String SUB_NAME=getTestQueueName();
+        
+        TopicConnectionFactory factory = getConnectionFactory();
+        Topic topic = (Topic) getInitialContext().lookup(_topicName);
+        
+        //create and register a durable subscriber with selector then 
unsubscribe it
+        TopicConnection durConnection = factory.createTopicConnection("guest", 
"guest");
+        TopicSession durSession = durConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, 
SUB_NAME, "testprop='true'", false);
+        durConnection.start();
+        durSub1.close();
+        durSession.unsubscribe(SUB_NAME);
+        durSession.close();
+        durConnection.close();
+
+        //create and register a durable subscriber without the message 
selector and then close it
+        TopicConnection durConnection2 = 
factory.createTopicConnection("guest", "guest");
+        TopicSession durSession2 = durConnection2.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, 
SUB_NAME);
+        durConnection2.start();
+        durSub2.close();
+        durSession2.close();
+        durConnection2.close();
+        
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        //send messages matching and not matching the original used selector
+        TopicConnection pubConnection = factory.createTopicConnection("guest", 
"guest");
+        TopicSession pubSession = pubConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher publisher = pubSession.createPublisher(topic);
+        for (int i = 1; i <= 5; i++)
+        {
+            Message message = pubSession.createMessage();
+            message.setStringProperty("testprop", "true");
+            publisher.publish(message);
+            message = pubSession.createMessage();
+            message.setStringProperty("testprop", "false");
+            publisher.publish(message);
+        }
+        publisher.close();
+        pubSession.close();
+
+        //now recreate the durable subscriber without selector to check there 
are no exceptions generated
+        //then verify ALL messages sent are received
+        TopicConnection durConnection3 = (TopicConnection) 
factory.createConnection("guest", "guest");
+        TopicSession durSession3 = (TopicSession) 
durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, 
SUB_NAME);
+        durConnection3.start();
+        
+        for (int i = 1; i <= 10; i++)
+        {
+            Message message = durSub3.receive(2000);
+            if (message == null)
+            {
+                fail("testDurSubChangedToNotHaveSelectorThenRestart test 
failed. Expected message " + i + " was not received");
+            }
+        }
+        
+        durSub3.close();
+        durSession3.unsubscribe(SUB_NAME);
+        durSession3.close();
+        durConnection3.close();
+    }
+    
+    
+    public void testResubscribeWithChangedSelectorAndRestart() throws Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent 
store");
+            return;
+        }
+        
+        Connection conn = getConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        AMQTopic topic = new AMQTopic((AMQConnection) conn, 
"testResubscribeWithChangedSelectorAndRestart");
+        MessageProducer producer = session.createProducer(topic);
+        
+        // Create durable subscriber that matches A
+        TopicSubscriber subA = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelector",
+                "Match = True", false);
+
+        // Send 1 matching message and 1 non-matching message
+        TextMessage msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+
+        Message rMsg = subA.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart1",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subA.receive(1000);
+        assertNull(rMsg);
+        
+        // Send another 1 matching message and 1 non-matching message
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        // Disconnect subscriber without receiving the message to 
+        //leave it on the underlying queue
+        subA.close();
+        
+        // Reconnect with new selector that matches B
+        TopicSubscriber subB = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelectorAndRestart","Match = 
False", false);
+        
+        //verify no messages are now present on the queue as changing selector 
should have issued
+        //an unsubscribe and thus deleted the previous durable backing queue 
for the subscription.
+        //check the dur sub's underlying queue now has msg count 1
+        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + 
"testResubscribeWithChangedSelector");
+        assertEquals("Msg count should be 0", 0, 
((AMQSession)session).getQueueDepth(subQueue));
+        
+        
+        // Check that new messages are received properly
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        rMsg = subB.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart2",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subB.receive(1000);
+        assertNull(rMsg);
+
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        // Check that new messages are still received properly
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = 
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        rMsg = subB.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart2",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subB.receive(1000);
+        assertNull(rMsg);
+        
+        session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
+        subB.close();
+        session.close();
+        conn.close();
+    }
+
 }
 

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
 Tue Mar 30 11:50:18 2010
@@ -20,8 +20,13 @@
  */
 package org.apache.qpid.test.unit.topic;
 
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.management.common.JMXConnnectionFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 
@@ -39,6 +44,9 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
 
 /**
  * @todo Code to check that a consumer gets only one particular method could 
be factored into a re-usable method (as
@@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
     /** Timeout for receive() if we are not expecting a message */
     private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
     
+    private JMXConnector _jmxc;
+    private MBeanServerConnection _mbsc;
+    private static final String USER = "admin";
+    private static final String PASSWORD = "admin";
+    private boolean _jmxConnected;
+
+    public void setUp() throws Exception
+    {
+        setConfigurationProperty("management.enabled", "true");     
+        _jmxConnected=false;
+        super.setUp();
+    }
+
+    public void tearDown() throws Exception
+    {
+        if(_jmxConnected)
+        {
+            try
+            {
+                _jmxc.close();
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
+        }
+        
+        super.tearDown();
+    }
+    
     public void testUnsubscribe() throws Exception
     {
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
 
         _logger.info("Producer sending message A");
         producer.send(session1.createTextMessage("A"));
+        
+        ((AMQSession)session1).sync();
+        
+        //check the dur sub's underlying queue now has msg count 1
+        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + 
"MySubscription");
+        assertEquals("Msg count should be 1", 1, 
((AMQSession)session1).getQueueDepth(subQueue));
 
         Message msg;
         _logger.info("Receive message on consumer 1:expecting A");
@@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         _logger.info("Receive message on consumer 1 :expecting null");
         assertEquals(null, msg);
+        
+        ((AMQSession)session2).sync();
+        
+        //check the dur sub's underlying queue now has msg count 0
+        assertEquals("Msg count should be 0", 0, 
((AMQSession)session2).getQueueDepth(subQueue));
 
         consumer2.close();
         _logger.info("Unsubscribe session2/consumer2");
         session2.unsubscribe("MySubscription");
-
+        
+        ((AMQSession)session2).sync();
+        
+        if(isJavaBroker() && isExternalBroker())
+        {
+            //Verify that the queue was deleted by querying for its JMX MBean
+            _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
+                    getManagementPort(getPort()), USER, PASSWORD);
+
+            _jmxConnected = true;
+            _mbsc = _jmxc.getMBeanServerConnection();
+            
+            //must replace the occurrence of ':' in queue name with '-'
+            String queueObjectNameText = "clientid" + "-" + "MySubscription";
+            
+            ObjectName objName = new 
ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" 
+                                                + queueObjectNameText + ",*");
+            
+            Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
+            
+            if(objectInstances.size() != 0)
+            {
+                fail("Queue MBean was found. Expected queue to have been 
deleted");
+            }
+            else
+            {
+                _logger.info("Underlying dueue for the durable subscription 
was confirmed deleted.");
+            }
+        }
+        
+        //verify unsubscribing the durable subscriber did not affect the 
non-durable one
         _logger.info("Producer sending message B");
         producer.send(session1.createTextMessage("B"));
 
@@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
         rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         
+        // Send another 1 matching message and 1 non-matching message
+        sendMatchingAndNonMatchingMessage(session, producer);
+        
         // Disconnect subscriber
         subA.close();
         
@@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
         TopicSubscriber subB = session.createDurableSubscriber(topic, 
                 "testResubscribeWithChangedSelector","Match = False", false);
         
+        //verify no messages are now present as changing selector should have 
issued
+        //an unsubscribe and thus deleted the previous backing queue for the 
subscription.
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+        assertNull("Should not have received message as the queue underlying 
the " +
+                       "subscription should have been cleared/deleted when the 
selector was changed", rMsg);
         
-        // Check messages are recieved properly
+        // Check that new messages are received properly
         sendMatchingAndNonMatchingMessage(session, producer);
+        
         rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010
@@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
 
+// QPID-2418 : The queue backing the dur sub is not currently deleted at 
subscription change, so the test will fail.
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart

Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010
@@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
 // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is 
reliable
 org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
 org.apache.qpid.server.queue.ModelTest#*
-
-//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support 
causes exclusivity mismatch after restart
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to