http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java new file mode 100644 index 0000000..b7c00b8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class MBeanOperationTimeoutTest { + private static final Logger LOG = LoggerFactory.getLogger(MBeanOperationTimeoutTest.class); + + private ActiveMQConnectionFactory connectionFactory; + private BrokerService broker; + private String connectionUri; + private static final String destinationName = "MBeanOperationTimeoutTestQ"; + private static final String moveToDestinationName = "MBeanOperationTimeoutTestQ.Moved"; + + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + + protected int messageCount = 50000; + + @Test(expected = TimeoutException.class) + public void testLongOperationTimesOut() throws Exception { + + sendMessages(messageCount); + LOG.info("Produced " + messageCount + " messages to the broker."); + + // Now get the QueueViewMBean and purge + String objectNameStr = broker.getBrokerObjectName().toString(); + objectNameStr += ",destinationType=Queue,destinationName="+destinationName; + + ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr); + QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long count = proxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + LOG.info("Attempting to move one message, TimeoutException expected"); + proxy.moveMatchingMessagesTo(null, moveToDestinationName); + } + + private void sendMessages(int count) throws Exception { + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < messageCount; i++) { + Message message = session.createMessage(); + message.setIntProperty("id", i); + producer.send(message); + } + session.commit(); + } finally { + connection.close(); + } + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + LOG.info("Bean Registered: " + objectName); + } else { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + @After + public void tearDown() throws Exception { + Thread.sleep(500); + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setMbeanInvocationTimeout(TimeUnit.SECONDS.toMillis(1)); + answer.setUseJmx(true); + answer.addConnector("vm://localhost"); + answer.setDeleteAllMessagesOnStartup(true); + return answer; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java new file mode 100644 index 0000000..e2b0c51 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -0,0 +1,1508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import junit.textui.TestRunner; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.BlobMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A test case of the various MBeans in ActiveMQ. If you want to look at the + * various MBeans after the test has been run then run this test case as a + * command line application. + */ +public class MBeanTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); + + private static boolean waitForKeyPress; + + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + protected String clientID = "foo"; + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + protected static final int MESSAGE_COUNT = 2*BaseDestination.MAX_PAGE_SIZE; + final static String QUEUE_WITH_OPTIONS = "QueueWithOptions"; + + /** + * When you run this test case from the command line it will pause before + * terminating so that you can look at the MBeans state for debugging + * purposes. + */ + public static void main(String[] args) { + waitForKeyPress = true; + TestRunner.run(MBeanTest.class); + } + + public void testConnectors() throws Exception{ + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertEquals("openwire URL port doesn't equal bind Address", + new URI(broker.getTransportConnectorByType("tcp")).getPort(), + new URI(this.broker.getTransportConnectors().get(0).getPublishableConnectString()).getPort()); + } + + public void testMBeans() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + // test all the various MBeans now we have a producer, consumer and + // messages on a queue + assertSendViaMBean(); + assertSendCsnvViaMBean(); + assertQueueBrowseWorks(); + assertCreateAndDestroyDurableSubscriptions(); + assertConsumerCounts(); + assertProducerCounts(); + } + + public void testMoveMessages() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + if (initialQueueSize == 0) { + fail("There is no message in the queue:"); + } + else { + echo("Current queue size: " + initialQueueSize); + } + int messageCount = initialQueueSize; + String[] messageIDs = new String[messageCount]; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + messageIDs[i] = messageID; + } + + assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); + + echo("About to move " + messageCount + " messages"); + + String newDestination = getSecondDestinationString(); + for (String messageID : messageIDs) { + //echo("Moving message: " + messageID); + queue.moveMessageTo(messageID, newDestination); + } + + echo("Now browsing the queue"); + compdatalist = queue.browse(); + int actualCount = compdatalist.length; + echo("Current queue size: " + actualCount); + assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); + + echo("Now browsing the second queue"); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination ); + QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long newQueuesize = queueNew.getQueueSize(); + echo("Second queue size: " + newQueuesize); + assertEquals("Unexpected number of messages ",messageCount, newQueuesize); + + // check memory usage migration + assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); + assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); + assertTrue("use cache", queueNew.isUseCache()); + assertTrue("cache enabled", queueNew.isCacheEnabled()); + assertEquals("no forwards", 0, queueNew.getForwardCount()); + } + + public void testRemoveMessages() throws Exception { + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(getDestinationString()); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + String msg1 = queue.sendTextMessage("message 1"); + String msg2 = queue.sendTextMessage("message 2"); + + assertTrue(queue.removeMessage(msg2)); + + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination dest = createDestination(); + + MessageConsumer consumer = session.createConsumer(dest); + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals(msg1, message.getJMSMessageID()); + + String msg3 = queue.sendTextMessage("message 3"); + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(msg3, message.getJMSMessageID()); + + message = consumer.receive(1000); + assertNull(message); + + } + + public void testRemoveQueue() throws Exception { + String queueName = "TEST"; + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(queueName); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + queue.sendTextMessage("message 1"); + queue.sendTextMessage("message 2"); + + assertEquals(2, broker.getTotalMessageCount()); + + broker.removeQueue(queueName); + + assertEquals(0, broker.getTotalMessageCount()); + + } + + public void testRetryMessages() throws Exception { + // lets speed up redelivery + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; + factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); + factory.getRedeliveryPolicy().setMaximumRedeliveries(1); + factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); + factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); + factory.getRedeliveryPolicy().setUseExponentialBackOff(false); + factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); + + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long initialQueueSize = queue.getQueueSize(); + echo("current queue size: " + initialQueueSize); + assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); + + // lets create a duff consumer which keeps rolling back... + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); + Message message = consumer.receive(5000); + while (message != null) { + echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); + session.rollback(); + message = consumer.receive(2000); + } + consumer.close(); + session.close(); + + // now lets get the dead letter queue + Thread.sleep(1000); + + ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); + QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); + + long initialDlqSize = dlq.getQueueSize(); + CompositeData[] compdatalist = dlq.browse(); + int dlqQueueSize = compdatalist.length; + if (dlqQueueSize == 0) { + fail("There are no messages in the queue:"); + } + else { + echo("Current DLQ queue size: " + dlqQueueSize); + } + int messageCount = dlqQueueSize; + String[] messageIDs = new String[messageCount]; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + messageIDs[i] = messageID; + } + + int dlqMemUsage = dlq.getMemoryPercentUsage(); + assertTrue("dlq has some memory usage", dlqMemUsage > 0); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + + echo("About to retry " + messageCount + " messages"); + + for (String messageID : messageIDs) { + echo("Retrying message: " + messageID); + dlq.retryMessage(messageID); + } + + long queueSize = queue.getQueueSize(); + compdatalist = queue.browse(); + int actualCount = compdatalist.length; + echo("Orginal queue size is now " + queueSize); + echo("Original browse queue size: " + actualCount); + + long dlqSize = dlq.getQueueSize(); + echo("DLQ size: " + dlqSize); + + assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); + assertEquals("queue size", initialQueueSize, queueSize); + assertEquals("browse queue size", initialQueueSize, actualCount); + + assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); + } + + public void testMoveMessagesBySelector() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString() ); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = getSecondDestinationString(); + queue.moveMatchingMessagesTo("counter > 2", newDestination); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); + + queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + int movedSize = MESSAGE_COUNT-3; + assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); + + // now lets remove them by selector + queue.removeMatchingMessages("counter > 2"); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + } + + public void testCopyMessagesBySelector() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = getSecondDestinationString(); + long queueSize = queue.getQueueSize(); + assertTrue(queueSize > 0); + queue.copyMatchingMessagesTo("counter > 2", newDestination); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); + + queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); + assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); + // now lets remove them by selector + queue.removeMatchingMessages("counter > 2"); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + } + + public void testCreateDestinationWithSpacesAtEnds() throws Exception { + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertTrue("broker is not a slave", !broker.isSlave()); + // create 2 topics + broker.addTopic(getDestinationString() + "1 "); + broker.addTopic(" " + getDestinationString() + "2"); + broker.addTopic(" " + getDestinationString() + "3 "); + + + assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1 "); + assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName= " + getDestinationString() + "2"); + assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName= " + getDestinationString() + "3 "); + + ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1"); + ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2"); + ObjectName topicObjName3 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "3"); + + TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); + TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); + TopicViewMBean topic3 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName3, TopicViewMBean.class, true); + + assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); + assertEquals("topic3 Durable subscriber count", 0, topic3.getConsumerCount()); + + String topicName = getDestinationString(); + String selector = null; + + // create 1 subscriber for each topic + broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector); + broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector); + broker.createDurableSubscriber(clientID, "topic3.subscriber1", topicName + "3", selector); + + assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); + assertEquals("topic3 Durable subscriber count", 1, topic3.getConsumerCount()); + } + + protected void assertSendViaMBean() throws Exception { + String queueName = getDestinationString() + ".SendMBBean"; + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + echo("Create QueueView MBean..."); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(queueName); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName); + + echo("Create QueueView MBean..."); + QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + proxy.purge(); + + int count = 5; + for (int i = 0; i < count; i++) { + String body = "message:" + i; + + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("JMSCorrelationID", "MyCorrId"); + headers.put("JMSDeliveryMode", Boolean.FALSE); + headers.put("JMSXGroupID", "MyGroupID"); + headers.put("JMSXGroupSeq", 1234); + headers.put("JMSPriority", i + 1); + headers.put("JMSType", "MyType"); + headers.put("MyHeader", i); + headers.put("MyStringHeader", "StringHeader" + i); + + proxy.sendTextMessage(headers, body); + } + + browseAndVerify(proxy); + } + + private void browseAndVerify(QueueViewMBean proxy) throws Exception { + browseAndVerifyTypes(proxy, false); + } + + @SuppressWarnings("rawtypes") + private void browseAndVerifyTypes(QueueViewMBean proxy, boolean allStrings) throws Exception { + CompositeData[] compdatalist = proxy.browse(); + if (compdatalist.length == 0) { + fail("There is no message in the queue:"); + } + + for (int i = 0; i < compdatalist.length; i++) { + CompositeData cdata = compdatalist[i]; + + if (i == 0) { + echo("Columns: " + cdata.getCompositeType().keySet()); + } + + assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); + assertComplexData(i, cdata, "JMSPriority", i + 1); + assertComplexData(i, cdata, "JMSType", "MyType"); + assertComplexData(i, cdata, "JMSCorrelationID", "MyCorrId"); + assertComplexData(i, cdata, "JMSDeliveryMode", "NON-PERSISTENT"); + String expected = "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}"; + // The order of the properties is different when using the ibm jdk. + if (System.getProperty("java.vendor").equals("IBM Corporation")) { + expected = "{MyHeader=" + i + ", MyStringHeader=StringHeader" + i + "}"; + } + assertComplexData(i, cdata, "PropertiesText", expected); + + if (allStrings) { + Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES); + assertEquals("stringProperties size()", 2, stringProperties.size()); + assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader")); + assertEquals("stringProperties.MyHeader", "" + i, stringProperties.get("MyHeader")); + + } else { + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertEquals("intProperties size()", 1, intProperties.size()); + assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader")); + + Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES); + assertEquals("stringProperties size()", 1, stringProperties.size()); + assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader")); + } + + Map properties = CompositeDataHelper.getMessageUserProperties(cdata); + assertEquals("properties size()", 2, properties.size()); + assertEquals("properties.MyHeader", allStrings ? "" + i : i, properties.get("MyHeader")); + assertEquals("properties.MyHeader", "StringHeader" + i, properties.get("MyStringHeader")); + + assertComplexData(i, cdata, "JMSXGroupSeq", 1234); + assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID"); + assertComplexData(i, cdata, "Text", "message:" + i); + } + } + + protected void assertSendCsnvViaMBean() throws Exception { + String queueName = getDestinationString() + ".SendMBBean"; + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + echo("Create QueueView MBean..."); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(queueName); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queueName); + + echo("Create QueueView MBean..."); + QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + proxy.purge(); + + int count = 5; + for (int i = 0; i < count; i++) { + String props = "body=message:" + i; + + props += ",JMSCorrelationID=MyCorrId"; + props += ",JMSDeliveryMode=1"; + props += ",JMSXGroupID=MyGroupID"; + props += ",JMSXGroupSeq=1234"; + props += ",JMSPriority=" + (i + 1); + props += ",JMSType=MyType"; + props += ",MyHeader=" + i; + props += ",MyStringHeader=StringHeader" + i; + + proxy.sendTextMessageWithProperties(props); + } + + browseAndVerifyTypes(proxy, true); + } + + protected void assertComplexData(int messageIndex, CompositeData cdata, String name, Object expected) { + Object value = cdata.get(name); + assertEquals("Message " + messageIndex + " CData field: " + name, expected, value); + } + + protected void assertQueueBrowseWorks() throws Exception { + Integer mbeancnt = mbeanServer.getMBeanCount(); + echo("Mbean count :" + mbeancnt); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + echo("Create QueueView MBean..."); + QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long concount = proxy.getConsumerCount(); + echo("Consumer Count :" + concount); + long messcount = proxy.getQueueSize(); + echo("current number of messages in the queue :" + messcount); + + // lets browse + CompositeData[] compdatalist = proxy.browse(); + if (compdatalist.length == 0) { + fail("There is no message in the queue:"); + } + String[] messageIDs = new String[compdatalist.length]; + + for (int i = 0; i < compdatalist.length; i++) { + CompositeData cdata = compdatalist[i]; + + if (i == 0) { + echo("Columns: " + cdata.getCompositeType().keySet()); + } + messageIDs[i] = (String)cdata.get("JMSMessageID"); + echo("message " + i + " : " + cdata.values()); + } + + TabularData table = proxy.browseAsTable(); + echo("Found tabular data: " + table); + assertTrue("Table should not be empty!", table.size() > 0); + + assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); + + String messageID = messageIDs[0]; + String newDestinationName = "queue://dummy.test.cheese"; + echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName); + proxy.copyMessageTo(messageID, newDestinationName); + + assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize()); + + messageID = messageIDs[1]; + echo("Attempting to remove: " + messageID); + proxy.removeMessage(messageID); + + assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize()); + + echo("Worked!"); + } + + protected void assertCreateAndDestroyDurableSubscriptions() throws Exception { + // lets create a new topic + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + echo("Create QueueView MBean..."); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + broker.addTopic(getDestinationString()); + + assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); + + String topicName = getDestinationString(); + String selector = null; + ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector); + broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector); + assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length); + + assertNotNull("Should have created an mbean name for the durable subscriber!", name1); + + LOG.info("Created durable subscriber with name: " + name1); + + // now lets try destroy it + broker.destroyDurableSubscriber(clientID, "subscriber1"); + assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length); + } + + protected void assertConsumerCounts() throws Exception { + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertTrue("broker is not a slave", !broker.isSlave()); + // create 2 topics + broker.addTopic(getDestinationString() + "1"); + broker.addTopic(getDestinationString() + "2"); + + ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1"); + ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2"); + TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); + TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); + + assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); + + String topicName = getDestinationString(); + String selector = null; + + // create 1 subscriber for each topic + broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector); + broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector); + + assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); + + // create 1 more subscriber for topic1 + broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector); + + assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); + + // destroy topic1 subscriber + broker.destroyDurableSubscriber(clientID, "topic1.subscriber1"); + + assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); + + // destroy topic2 subscriber + broker.destroyDurableSubscriber(clientID, "topic2.subscriber1"); + + assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); + + // destroy remaining topic1 subscriber + broker.destroyDurableSubscriber(clientID, "topic1.subscriber2"); + + assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); + assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); + } + + protected void assertProducerCounts() throws Exception { + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertTrue("broker is not a slave", !broker.isSlave()); + // create 2 topics + broker.addTopic(getDestinationString() + "1"); + broker.addTopic(getDestinationString() + "2"); + + ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "1"); + ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=" + getDestinationString() + "2"); + TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); + TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); + + assertEquals("topic1 Producer count", 0, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 0, topic2.getProducerCount()); + assertEquals("broker Topic Producer count", 0, broker.getTopicProducers().length); + + // create 1 producer for each topic + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest1 = session.createTopic(getDestinationString() + "1"); + Destination dest2 = session.createTopic(getDestinationString() + "2"); + MessageProducer producer1 = session.createProducer(dest1); + MessageProducer producer2 = session.createProducer(dest2); + Thread.sleep(500); + + assertEquals("topic1 Producer count", 1, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 1, topic2.getProducerCount()); + + assertEquals("broker Topic Producer count", 2, broker.getTopicProducers().length); + + // create 1 more producer for topic1 + MessageProducer producer3 = session.createProducer(dest1); + Thread.sleep(500); + + assertEquals("topic1 Producer count", 2, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 1, topic2.getProducerCount()); + + assertEquals("broker Topic Producer count", 3, broker.getTopicProducers().length); + + // destroy topic1 producer + producer1.close(); + Thread.sleep(500); + + assertEquals("topic1 Producer count", 1, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 1, topic2.getProducerCount()); + + assertEquals("broker Topic Producer count", 2, broker.getTopicProducers().length); + + // destroy topic2 producer + producer2.close(); + Thread.sleep(500); + + assertEquals("topic1 Producer count", 1, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 0, topic2.getProducerCount()); + + assertEquals("broker Topic Producer count", 1, broker.getTopicProducers().length); + + // destroy remaining topic1 producer + producer3.close(); + Thread.sleep(500); + + assertEquals("topic1 Producer count", 0, topic1.getProducerCount()); + assertEquals("topic2 Producer count", 0, topic2.getProducerCount()); + + MessageProducer producer4 = session.createProducer(null); + Thread.sleep(500); + assertEquals(1, broker.getDynamicDestinationProducers().length); + producer4.close(); + Thread.sleep(500); + + assertEquals("broker Topic Producer count", 0, broker.getTopicProducers().length); + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, Exception { + final ObjectName objectName = new ObjectName(name); + final AtomicBoolean result = new AtomicBoolean(false); + assertTrue("Bean registered: " + objectName, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + result.set(mbeanServer.isRegistered(objectName)); + } catch (Exception ignored) { + LOG.debug(ignored.toString()); + } + return result.get(); + } + })); + return objectName; + } + + protected ObjectName assertNotRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + fail("Found the MBean!: " + objectName); + } else { + echo("Bean not registered Registered: " + objectName); + } + return objectName; + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + useTopic = false; + super.setUp(); + ManagementContext managementContext = broker.getManagementContext(); + mbeanServer = managementContext.getMBeanServer(); + } + + @Override + protected void tearDown() throws Exception { + if (waitForKeyPress) { + // We are running from the command line so let folks browse the + // mbeans... + System.out.println(); + System.out.println("Press enter to terminate the program."); + System.out.println("In the meantime you can use your JMX console to view the current MBeans"); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + reader.readLine(); + } + + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setDeleteAllMessagesOnStartup(true); + answer.setUseJmx(true); + + // apply memory limit so that %usage is visible + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMemoryLimit(1024*1024*4); + policyMap.setDefaultEntry(defaultEntry); + answer.setDestinationPolicy(policyMap); + + // allow options to be visible via jmx + answer.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(QUEUE_WITH_OPTIONS + "?topQueue=true&hasOptions=2")}); + + answer.addConnector(bindAddress); + return answer; + } + + protected void useConnection(Connection connection) throws Exception { + connection.setClientID(clientID); + connection.start(); + Session session = connection.createSession(transacted, authMode); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message message = session.createTextMessage("Message: " + i); + message.setIntProperty("counter", i); + message.setJMSCorrelationID("MyCorrelationID"); + message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); + message.setJMSType("MyType"); + message.setJMSPriority(5); + producer.send(message); + } + Thread.sleep(1000); + } + + protected void useConnectionWithBlobMessage(Connection connection) throws Exception { + connection.setClientID(clientID); + connection.start(); + ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BlobMessage message = session.createBlobMessage(new URL("http://foo.bar/test")); + message.setIntProperty("counter", i); + message.setJMSCorrelationID("MyCorrelationID"); + message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); + message.setJMSType("MyType"); + message.setJMSPriority(5); + producer.send(message); + } + Thread.sleep(1000); + } + + protected void useConnectionWithByteMessage(Connection connection) throws Exception { + connection.setClientID(clientID); + connection.start(); + ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, authMode); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(("Message: " + i).getBytes()); + message.setIntProperty("counter", i); + message.setJMSCorrelationID("MyCorrelationID"); + message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); + message.setJMSType("MyType"); + message.setJMSPriority(5); + producer.send(message); + } + Thread.sleep(1000); + } + + protected void echo(String text) { + //LOG.info(text); + } + + protected String getSecondDestinationString() { + return "test.new.destination." + getClass() + "." + getName(); + } + + public void testDynamicProducerView() throws Exception { + connection = connectionFactory.createConnection(); + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertEquals(0, broker.getDynamicDestinationProducers().length); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + Destination dest1 = session.createTopic("DynamicDest-1"); + Destination dest2 = session.createTopic("DynamicDest-2"); + Destination dest3 = session.createQueue("DynamicDest-3"); + + // Wait a bit to let the producer get registered. + Thread.sleep(100); + + assertEquals(1, broker.getDynamicDestinationProducers().length); + + ObjectName viewName = broker.getDynamicDestinationProducers()[0]; + assertNotNull(viewName); + ProducerViewMBean view = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true); + assertNotNull(view); + + assertEquals("NOTSET", view.getDestinationName()); + + producer.send(dest1, session.createTextMessage("Test Message 1")); + Thread.sleep(200); + assertEquals(((ActiveMQDestination)dest1).getPhysicalName(), view.getDestinationName()); + assertTrue(view.isDestinationTopic()); + assertFalse(view.isDestinationQueue()); + assertFalse(view.isDestinationTemporary()); + + producer.send(dest2, session.createTextMessage("Test Message 2")); + Thread.sleep(200); + assertEquals(((ActiveMQDestination)dest2).getPhysicalName(), view.getDestinationName()); + assertTrue(view.isDestinationTopic()); + assertFalse(view.isDestinationQueue()); + assertFalse(view.isDestinationTemporary()); + + producer.send(dest3, session.createTextMessage("Test Message 3")); + Thread.sleep(200); + assertEquals(((ActiveMQDestination)dest3).getPhysicalName(), view.getDestinationName()); + assertTrue(view.isDestinationQueue()); + assertFalse(view.isDestinationTopic()); + assertFalse(view.isDestinationTemporary()); + + producer.close(); + Thread.sleep(200); + assertEquals(0, broker.getDynamicDestinationProducers().length); + } + + public void testTempQueueJMXDelete() throws Exception { + connection = connectionFactory.createConnection(); + + connection.setClientID(clientID); + connection.start(); + Session session = connection.createSession(transacted, authMode); + ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue(); + Thread.sleep(1000); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=" + + JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString()) + + ",destinationName=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName())); + + // should not throw an exception + + mbeanServer.getObjectInstance(queueViewMBeanName); + + tQueue.delete(); + Thread.sleep(1000); + try { + // should throw an exception + mbeanServer.getObjectInstance(queueViewMBeanName); + + fail("should be deleted already!"); + } catch (Exception e) { + // expected! + } + } + + // Test for AMQ-3029 + public void testBrowseBlobMessages() throws Exception { + connection = connectionFactory.createConnection(); + useConnectionWithBlobMessage(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + if (initialQueueSize == 0) { + fail("There is no message in the queue:"); + } + else { + echo("Current queue size: " + initialQueueSize); + } + int messageCount = initialQueueSize; + String[] messageIDs = new String[messageCount]; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + + messageIDs[i] = messageID; + } + + assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); + } + + public void testDestinationOptionsAreVisible() throws Exception { + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + QUEUE_WITH_OPTIONS ); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + assertEquals("name match", QUEUE_WITH_OPTIONS, queue.getName()); + + String options = queue.getOptions(); + LOG.info("Got options: " + options); + + Map<String, String> optionsMap = URISupport.parseQuery(options); + assertEquals("got a map", 2, optionsMap.size()); + assertTrue("matches our options", optionsMap.containsKey("hasOptions")); + assertTrue("matches our options", optionsMap.containsKey("topQueue")); + + assertTrue("matches our options", optionsMap.containsValue("true")); + assertTrue("matches our options", optionsMap.containsValue("2")); + } + + public void testSubscriptionViewToConnectionMBean() throws Exception { + + connection = connectionFactory.createConnection("admin", "admin"); + connection.setClientID("MBeanTest"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(getDestinationString() + ".Queue"); + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + Thread.sleep(100); + + assertTrue(broker.getQueueSubscribers().length == 1); + + ObjectName subscriptionName = broker.getQueueSubscribers()[0]; + LOG.info("Looking for Subscription: " + subscriptionName); + + SubscriptionViewMBean subscriberView = + MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, subscriptionName, SubscriptionViewMBean.class, true); + assertNotNull(subscriberView); + + ObjectName connectionName = subscriberView.getConnection(); + LOG.info("Looking for Connection: " + connectionName); + assertNotNull(connectionName); + ConnectionViewMBean connectionView = + MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, connectionName, ConnectionViewMBean.class, true); + assertNotNull(connectionView); + + // Our consumer plus one advisory consumer. + assertEquals(2, connectionView.getConsumers().length); + + assertEquals("client id match", "MBeanTest", connectionView.getClientId()); + + // Check that the subscription view we found earlier is in this list. + boolean found = false; + for (ObjectName name : connectionView.getConsumers()) { + if (name.equals(subscriptionName)) { + found = true; + } + } + assertTrue("We should have found: " + subscriptionName, found); + + // Our producer and no others. + assertEquals(1, connectionView.getProducers().length); + + // Bean should detect the updates. + queueConsumer.close(); + producer.close(); + + Thread.sleep(200); + + // Only an advisory consumers now. + assertEquals(1, connectionView.getConsumers().length); + assertEquals(0, connectionView.getProducers().length); + } + + public void testCreateAndUnsubscribeDurableSubscriptions() throws Exception { + + connection = connectionFactory.createConnection("admin", "admin"); + connection.setClientID("MBeanTest"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String topicName = getDestinationString() + ".DurableTopic"; + Topic topic = session.createTopic(topicName); + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + echo("Create QueueView MBean..."); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); + assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length); + + MessageConsumer durableConsumer1 = session.createDurableSubscriber(topic, "subscription1"); + MessageConsumer durableConsumer2 = session.createDurableSubscriber(topic, "subscription2"); + + Thread.sleep(100); + + assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length); + assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length); + + durableConsumer1.close(); + durableConsumer2.close(); + + Thread.sleep(100); + + assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); + assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length); + + session.unsubscribe("subscription1"); + + Thread.sleep(100); + + assertEquals("Inactive Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length); + + session.unsubscribe("subscription2"); + + assertEquals("Inactive Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length); + } + + public void testUserNamePopulated() throws Exception { + doTestUserNameInMBeans(true); + } + + public void testUserNameNotPopulated() throws Exception { + doTestUserNameInMBeans(false); + } + + @SuppressWarnings("unused") + private void doTestUserNameInMBeans(boolean expect) throws Exception { + broker.setPopulateUserNameInMBeans(expect); + + connection = connectionFactory.createConnection("admin", "admin"); + connection.setClientID("MBeanTest"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(getDestinationString() + ".Queue"); + Topic topic = session.createTopic(getDestinationString() + ".Topic"); + MessageProducer producer = session.createProducer(queue); + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + MessageConsumer durable = session.createDurableSubscriber(topic, "Durable"); + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + Thread.sleep(100); + + assertTrue(broker.getQueueProducers().length == 1); + assertTrue(broker.getTopicSubscribers().length == 2); + assertTrue(broker.getQueueSubscribers().length == 1); + + ObjectName producerName = broker.getQueueProducers()[0]; + ProducerViewMBean producerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, producerName, ProducerViewMBean.class, true); + assertNotNull(producerView); + + if (expect) { + assertEquals("admin", producerView.getUserName()); + } else { + assertNull(producerView.getUserName()); + } + + for (ObjectName name : broker.getTopicSubscribers()) { + SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true); + if (expect) { + assertEquals("admin", subscriberView.getUserName()); + } else { + assertNull(subscriberView.getUserName()); + } + } + + for (ObjectName name : broker.getQueueSubscribers()) { + SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true); + if (expect) { + assertEquals("admin", subscriberView.getUserName()); + } else { + assertNull(subscriberView.getUserName()); + } + } + ObjectName query = //new ObjectName(domain + ":type=Broker,brokerName=localhost,connector=*," + "connectorName=*,connectionName=MBeanTest"); + BrokerMBeanSupport.createConnectionQuery(domain, "localhost", connection.getClientID()); + + Set<ObjectName> names = mbeanServer.queryNames(query, null); + boolean found = false; + for (ObjectName name : names) { + if (name.toString().endsWith("connectionName=MBeanTest")) { + + ConnectionViewMBean connectionView = + MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, ConnectionViewMBean.class, true); + assertNotNull(connectionView); + + if (expect) { + assertEquals("admin", connectionView.getUserName()); + } else { + assertNull(connectionView.getUserName()); + } + + found = true; + break; + } + } + + assertTrue("Should find the connection's ManagedTransportConnection", found); + } + + public void testMoveMessagesToRetainOrder() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = getSecondDestinationString(); + queue.moveMatchingMessagesTo("", newDestination); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); + + queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + int movedSize = MESSAGE_COUNT; + assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(newDestination); + MessageConsumer consumer = session.createConsumer(destination); + + int last = -1; + int current = -1; + Message message = null; + while ((message = consumer.receive(2000)) != null) { + if (message.propertyExists("counter")) { + current = message.getIntProperty("counter"); + assertEquals(last, current - 1); + last = current; + } + } + + // now lets remove them by selector + queue.removeMatchingMessages(""); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + } + + public void testConnectionCounts() throws Exception { + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + assertEquals(0, broker.getCurrentConnectionsCount()); + + connection = connectionFactory.createConnection(); + useConnection(connection); + + assertEquals(1, broker.getCurrentConnectionsCount()); + connection.close(); + assertEquals(0, broker.getCurrentConnectionsCount()); + assertEquals(1, broker.getTotalConnectionsCount()); + } + + public void testCopyMessagesToRetainOrder() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String newDestination = getSecondDestinationString(); + queue.copyMatchingMessagesTo("", newDestination); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination ); + + queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + int movedSize = MESSAGE_COUNT; + assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(newDestination); + MessageConsumer consumer = session.createConsumer(destination); + + int last = -1; + int current = -1; + Message message = null; + while ((message = consumer.receive(2000)) != null) { + if (message.propertyExists("counter")) { + current = message.getIntProperty("counter"); + assertEquals(last, current - 1); + last = current; + } + } + + // now lets remove them by selector + queue.removeMatchingMessages(""); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + } + + public void testRemoveMatchingMessageRetainOrder() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + String queueName = getDestinationString(); + queue.removeMatchingMessages("counter < 10"); + + int newSize = MESSAGE_COUNT - 10; + assertEquals("Unexpected number of messages ", newSize, queue.getQueueSize()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(destination); + + int last = 9; + int current = 0; + Message message = null; + while ((message = consumer.receive(2000)) != null) { + if (message.propertyExists("counter")) { + current = message.getIntProperty("counter"); + assertEquals(last, current - 1); + last = current; + } + } + + // now lets remove them by selector + queue.removeMatchingMessages(""); + + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); + assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + } + + public void testBrowseBytesMessages() throws Exception { + connection = connectionFactory.createConnection(); + useConnectionWithByteMessage(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + if (initialQueueSize == 0) { + fail("There is no message in the queue:"); + } + else { + echo("Current queue size: " + initialQueueSize); + } + int messageCount = initialQueueSize; + String[] messageIDs = new String[messageCount]; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + messageIDs[i] = messageID; + + Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW); + assertNotNull("should be a preview", preview); + assertTrue("not empty", preview.length > 0); + } + + assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); + + // consume all the messages + echo("Attempting to consume all bytes messages from: " + destination); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i=0; i<MESSAGE_COUNT; i++) { + Message message = consumer.receive(5000); + assertNotNull(message); + assertTrue(message instanceof BytesMessage); + } + consumer.close(); + session.close(); + } + + public void testBrowseOrder() throws Exception { + connection = connectionFactory.createConnection(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(20); + ((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + assertEquals("expected", MESSAGE_COUNT, initialQueueSize); + + int messageCount = initialQueueSize; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertTrue("not empty", intProperties.size() > 0); + assertEquals("counter in order", i, intProperties.get("counter")); + } + + echo("Attempting to consume 5 bytes messages from: " + destination); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i=0; i<5; i++) { + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("ordered", i, message.getIntProperty("counter")); + echo("Consumed: " + message.getIntProperty("counter")); + } + consumer.close(); + session.close(); + connection.close(); + + // browse again and verify order + compdatalist = queue.browse(); + initialQueueSize = compdatalist.length; + assertEquals("5 gone", MESSAGE_COUNT - 5, initialQueueSize); + + messageCount = initialQueueSize; + for (int i = 0; i < messageCount - 4; i++) { + CompositeData cdata = compdatalist[i]; + + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertTrue("not empty", intProperties.size() > 0); + assertEquals("counter in order", i + 5, intProperties.get("counter")); + echo("Got: " + intProperties.get("counter")); + } + } + + public void testAddRemoveConnectorBrokerView() throws Exception { + + ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean brokerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + Map connectors = brokerView.getTransportConnectors(); + LOG.info("Connectors: " + connectors); + assertEquals("one connector", 1, connectors.size()); + + ConnectorViewMBean connector = getProxyToConnectionView("tcp"); + assertNotNull(connector); + + String name = connectors.keySet().iterator().next().toString(); + + brokerView.removeConnector(name); + + connectors = brokerView.getTransportConnectors(); + assertEquals("empty", 0, connectors.size()); + + name = brokerView.addConnector("tcp://0.0.0.0:0"); + + connector = getProxyToConnectionView("tcp"); + assertNotNull(connector); + + connectors = brokerView.getTransportConnectors(); + LOG.info("Connectors: " + connectors); + assertEquals("one connector", 1, connectors.size()); + assertTrue("name is in map: " + connectors.keySet(), connectors.keySet().contains(name)); + } + + public void testConnectorView() throws Exception { + ConnectorViewMBean connector = getProxyToConnectionView("tcp"); + assertNotNull(connector); + + assertFalse(connector.isRebalanceClusterClients()); + assertFalse(connector.isUpdateClusterClientsOnRemove()); + assertFalse(connector.isUpdateClusterClients()); + assertFalse(connector.isAllowLinkStealingEnabled()); + } + + protected ConnectorViewMBean getProxyToConnectionView(String connectionType) throws Exception { + ObjectName connectorQuery = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName="+connectionType+"_//*"); + + Set<ObjectName> results = broker.getManagementContext().queryNames(connectorQuery, null); + + if (results == null || results.isEmpty() || results.size() > 1) { + throw new Exception("Unable to find the exact Connector instance."); + } + + ConnectorViewMBean proxy = (ConnectorViewMBean) broker.getManagementContext() + .newProxyInstance(results.iterator().next(), ConnectorViewMBean.class, true); + return proxy; + } + + public void testDynamicProducers() throws Exception { + connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,endpoint=dynamicProducer,*"); + Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null); + assertEquals(mbeans.size(), 1); + producer.close(); + } + + public void testDurableSubQuery() throws Exception { + connection = connectionFactory.createConnection(); + connection.setClientID("test"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = session.createDurableSubscriber(session.createTopic("test.topic"), "test.consumer"); + + ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic,endpoint=Consumer,consumerId=Durable(*),*"); + Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null); + assertEquals(mbeans.size(), 1); + sub.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java new file mode 100644 index 0000000..ce8e3ae --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import junit.textui.TestRunner; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A specific test of Queue.purge() functionality + */ +public class PurgeTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(PurgeTest.class); + + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + protected String clientID = "foo"; + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + protected int messageCount = 10; + public PersistenceAdapter persistenceAdapter; + + public static void main(String[] args) { + TestRunner.run(PurgeTest.class); + } + + public static Test suite() { + return suite(PurgeTest.class); + } + + public void testPurge() throws Exception { + // Send some messages + connection = connectionFactory.createConnection(); + connection.setClientID(clientID); + connection.start(); + Session session = connection.createSession(transacted, authMode); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < messageCount; i++) { + Message message = session.createTextMessage("Message: " + i); + producer.send(message); + } + + // Now get the QueueViewMBean and purge + String objectNameStr = broker.getBrokerObjectName().toString(); + objectNameStr += ",destinationType=Queue,destinationName="+getDestinationString(); + ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr); + QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long count = proxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + proxy.purge(); + count = proxy.getQueueSize(); + assertEquals("Queue size", count, 0); + assertEquals("Browse size", proxy.browseMessages().size(), 0); + + // Queues have a special case once there are more than a thousand + // dead messages, make sure we hit that. + messageCount += 1000; + for (int i = 0; i < messageCount; i++) { + Message message = session.createTextMessage("Message: " + i); + producer.send(message); + } + + count = proxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + proxy.purge(); + count = proxy.getQueueSize(); + assertEquals("Queue size", count, 0); + assertEquals("Browse size", proxy.browseMessages().size(), 0); + + producer.close(); + } + + public void initCombosForTestDelete() { + addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()}); + } + + public void testDeleteSameProducer() throws Exception { + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(); + + MessageProducer producer = session.createProducer(destination); + Message message = session.createTextMessage("Test Message"); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(destination); + + Message received = consumer.receive(1000); + assertEquals(message, received); + + ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true); + + brokerProxy.removeQueue(getDestinationString()); + producer.send(message); + + received = consumer.receive(1000); + + assertNotNull("Message not received", received); + assertEquals(message, received); + } + + public void testDelete() throws Exception { + // Send some messages + connection = connectionFactory.createConnection(); + connection.setClientID(clientID); + connection.start(); + Session session = connection.createSession(transacted, authMode); + destination = createDestination(); + sendMessages(session, messageCount); + + // Now get the QueueViewMBean and purge + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + QueueViewMBean queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); + BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true); + + long count = queueProxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + brokerProxy.removeQueue(getDestinationString()); + + sendMessages(session, messageCount); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + count = queueProxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + queueProxy.purge(); + + // Queue have a special case once there are more than a thousand + // dead messages, make sure we hit that. + messageCount += 1000; + sendMessages(session, messageCount); + + count = queueProxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + + brokerProxy.removeQueue(getDestinationString()); + + sendMessages(session, messageCount); + + queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + count = queueProxy.getQueueSize(); + assertEquals("Queue size", count, messageCount); + } + + private void sendMessages(Session session, int count) throws Exception { + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < messageCount; i++) { + Message message = session.createTextMessage("Message: " + i); + producer.send(message); + } + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + echo("Bean Registered: " + objectName); + } else { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + useTopic = false; + super.setUp(); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(true); + answer.setEnableStatistics(true); + answer.addConnector(bindAddress); + answer.setPersistenceAdapter(persistenceAdapter); + answer.deleteAllMessages(); + return answer; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + protected void echo(String text) { + LOG.info(text); + } + + /** + * Returns the name of the destination used in this test case + */ + protected String getDestinationString() { + return getClass().getName() + "." + getName(true); + } +}
