Author: robbie Date: Tue Mar 30 11:50:18 2010 New Revision: 929095 URL: http://svn.apache.org/viewvc?rev=929095&view=rev Log: QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around the change and unsubscription of durable subscriptions
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java qpid/trunk/qpid/java/test-profiles/Excludes qpid/trunk/qpid/java/test-profiles/JavaExcludes Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Tue Mar 30 11:50:18 2010 @@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct; import javax.jms.*; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -163,5 +167,301 @@ public class DurableSubscriberTest exten durConnection2.close(); } } + + /** + * create and register a durable subscriber without a message selector and then unsubscribe it + * create and register a durable subscriber with a message selector and then close it + * restart the broker + * send matching and non matching messages + * recreate and register the durable subscriber with a message selector + * verify only the matching messages are received + */ + public void testDurSubChangedToHaveSelectorThenRestart() throws Exception + { + if (! isBrokerStorePersistent()) + { + _logger.warn("Test skipped due to requirement of a persistent store"); + return; + } + + final String SUB_NAME=getTestQueueName(); + + TopicConnectionFactory factory = getConnectionFactory(); + Topic topic = (Topic) getInitialContext().lookup(_topicName); + + //create and register a durable subscriber then unsubscribe it + TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); + TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME); + durConnection.start(); + durSub1.close(); + durSession.unsubscribe(SUB_NAME); + durSession.close(); + durConnection.close(); + + //create and register a durable subscriber with a message selector and then close it + TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); + TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); + durConnection2.start(); + durSub2.close(); + durSession2.close(); + durConnection2.close(); + + //now restart the server + try + { + restartBroker(); + } + catch (Exception e) + { + _logger.error("problems restarting broker: " + e); + throw e; + } + + //send messages matching and not matching the selector + TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); + TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = pubSession.createPublisher(topic); + for (int i = 0; i < 5; i++) + { + Message message = pubSession.createMessage(); + message.setStringProperty("testprop", "true"); + publisher.publish(message); + message = pubSession.createMessage(); + message.setStringProperty("testprop", "false"); + publisher.publish(message); + } + publisher.close(); + pubSession.close(); + + //now recreate the durable subscriber with selector to check there are no exceptions generated + //and then verify the messages are received correctly + TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest"); + TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); + durConnection3.start(); + + for (int i = 0; i < 5; i++) + { + Message message = durSub3.receive(2000); + if (message == null) + { + fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned"); + } + else + { + assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector", + message.getStringProperty("testprop").equals("true")); + } + } + + durSub3.close(); + durSession3.unsubscribe(SUB_NAME); + durSession3.close(); + durConnection3.close(); + } + + + /** + * create and register a durable subscriber with a message selector and then unsubscribe it + * create and register a durable subscriber without a message selector and then close it + * restart the broker + * send matching and non matching messages + * recreate and register the durable subscriber without a message selector + * verify ALL the sent messages are received + */ + public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception + { + if (! isBrokerStorePersistent()) + { + _logger.warn("Test skipped due to requirement of a persistent store"); + return; + } + + final String SUB_NAME=getTestQueueName(); + + TopicConnectionFactory factory = getConnectionFactory(); + Topic topic = (Topic) getInitialContext().lookup(_topicName); + + //create and register a durable subscriber with selector then unsubscribe it + TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); + TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); + durConnection.start(); + durSub1.close(); + durSession.unsubscribe(SUB_NAME); + durSession.close(); + durConnection.close(); + + //create and register a durable subscriber without the message selector and then close it + TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); + TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME); + durConnection2.start(); + durSub2.close(); + durSession2.close(); + durConnection2.close(); + + //now restart the server + try + { + restartBroker(); + } + catch (Exception e) + { + _logger.error("problems restarting broker: " + e); + throw e; + } + + //send messages matching and not matching the original used selector + TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); + TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = pubSession.createPublisher(topic); + for (int i = 1; i <= 5; i++) + { + Message message = pubSession.createMessage(); + message.setStringProperty("testprop", "true"); + publisher.publish(message); + message = pubSession.createMessage(); + message.setStringProperty("testprop", "false"); + publisher.publish(message); + } + publisher.close(); + pubSession.close(); + + //now recreate the durable subscriber without selector to check there are no exceptions generated + //then verify ALL messages sent are received + TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest"); + TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME); + durConnection3.start(); + + for (int i = 1; i <= 10; i++) + { + Message message = durSub3.receive(2000); + if (message == null) + { + fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received"); + } + } + + durSub3.close(); + durSession3.unsubscribe(SUB_NAME); + durSession3.close(); + durConnection3.close(); + } + + + public void testResubscribeWithChangedSelectorAndRestart() throws Exception + { + if (! isBrokerStorePersistent()) + { + _logger.warn("Test skipped due to requirement of a persistent store"); + return; + } + + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart"); + MessageProducer producer = session.createProducer(topic); + + // Create durable subscriber that matches A + TopicSubscriber subA = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelector", + "Match = True", false); + + // Send 1 matching message and 1 non-matching message + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + Message rMsg = subA.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart1", + ((TextMessage) rMsg).getText()); + + rMsg = subA.receive(1000); + assertNull(rMsg); + + // Send another 1 matching message and 1 non-matching message + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + // Disconnect subscriber without receiving the message to + //leave it on the underlying queue + subA.close(); + + // Reconnect with new selector that matches B + TopicSubscriber subB = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorAndRestart","Match = False", false); + + //verify no messages are now present on the queue as changing selector should have issued + //an unsubscribe and thus deleted the previous durable backing queue for the subscription. + //check the dur sub's underlying queue now has msg count 1 + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector"); + assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue)); + + + // Check that new messages are received properly + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + rMsg = subB.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart2", + ((TextMessage) rMsg).getText()); + + rMsg = subB.receive(1000); + assertNull(rMsg); + + //now restart the server + try + { + restartBroker(); + } + catch (Exception e) + { + _logger.error("problems restarting broker: " + e); + throw e; + } + + // Check that new messages are still received properly + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + rMsg = subB.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart2", + ((TextMessage) rMsg).getText()); + + rMsg = subB.receive(1000); + assertNull(rMsg); + + session.unsubscribe("testResubscribeWithChangedSelectorAndRestart"); + subB.close(); + session.close(); + conn.close(); + } + } Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 30 11:50:18 2010 @@ -20,8 +20,13 @@ */ package org.apache.qpid.test.unit.topic; +import java.io.IOException; +import java.util.Set; + +import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -39,6 +44,9 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as @@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext /** Timeout for receive() if we are not expecting a message */ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; + private JMXConnector _jmxc; + private MBeanServerConnection _mbsc; + private static final String USER = "admin"; + private static final String PASSWORD = "admin"; + private boolean _jmxConnected; + + public void setUp() throws Exception + { + setConfigurationProperty("management.enabled", "true"); + _jmxConnected=false; + super.setUp(); + } + + public void tearDown() throws Exception + { + if(_jmxConnected) + { + try + { + _jmxc.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + super.tearDown(); + } + public void testUnsubscribe() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); @@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); + + ((AMQSession)session1).sync(); + + //check the dur sub's underlying queue now has msg count 1 + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); + assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); + + ((AMQSession)session2).sync(); + + //check the dur sub's underlying queue now has msg count 0 + assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); - + + ((AMQSession)session2).sync(); + + if(isJavaBroker() && isExternalBroker()) + { + //Verify that the queue was deleted by querying for its JMX MBean + _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", + getManagementPort(getPort()), USER, PASSWORD); + + _jmxConnected = true; + _mbsc = _jmxc.getMBeanServerConnection(); + + //must replace the occurrence of ':' in queue name with '-' + String queueObjectNameText = "clientid" + "-" + "MySubscription"; + + ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" + + queueObjectNameText + ",*"); + + Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null); + + if(objectInstances.size() != 0) + { + fail("Queue MBean was found. Expected queue to have been deleted"); + } + else + { + _logger.info("Underlying dueue for the durable subscription was confirmed deleted."); + } + } + + //verify unsubscribing the durable subscriber did not affect the non-durable one _logger.info("Producer sending message B"); producer.send(session1.createTextMessage("B")); @@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); + // Send another 1 matching message and 1 non-matching message + sendMatchingAndNonMatchingMessage(session, producer); + // Disconnect subscriber subA.close(); @@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); + //verify no messages are now present as changing selector should have issued + //an unsubscribe and thus deleted the previous backing queue for the subscription. + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Should not have received message as the queue underlying the " + + "subscription should have been cleared/deleted when the selector was changed", rMsg); - // Check messages are recieved properly + // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", Modified: qpid/trunk/qpid/java/test-profiles/Excludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/Excludes (original) +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010 @@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#* org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#* +// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010 @@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable org.apache.qpid.management.jmx.ManagementActorLoggingTest#* org.apache.qpid.server.queue.ModelTest#* - -//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart -org.apache.qpid.test.unit.ct.DurableSubscriberTest#* --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org