http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 9c9e6f5..ebea686 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.management; import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -27,6 +29,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.json.JsonArray; import javax.json.JsonObject; @@ -60,27 +63,46 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(value = Parameterized.class) public class QueueControlTest extends ManagementTestBase { private ActiveMQServer server; private ClientSession session; private ServerLocator locator; + private final boolean durable; + + @Parameterized.Parameters(name = "durable={0}") + public static Collection<Object[]> getParams() { + return Arrays.asList(new Object[][] {{true}, {false}}); + } + + + /** + * @param durable + */ + public QueueControlTest(boolean durable) { + super(); + this.durable = durable; + } + @Test public void testAttributes() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); SimpleString filter = new SimpleString("color = 'blue'"); - boolean durable = RandomUtil.randomBoolean(); - session.createQueue(address, queue, filter, durable); + session.createQueue(address, RoutingType.MULTICAST, queue, filter, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(queue.toString(), queueControl.getName()); @@ -97,7 +119,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(queue.toString(), queueControl.getName()); @@ -112,7 +134,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); final SimpleString deadLetterAddress = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertNull(queueControl.getDeadLetterAddress()); @@ -137,7 +159,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); String deadLetterAddress = RandomUtil.randomString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -155,7 +177,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); final SimpleString expiryAddress = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertNull(queueControl.getExpiryAddress()); @@ -180,7 +202,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); String expiryAddress = RandomUtil.randomString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -200,7 +222,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -220,7 +242,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -250,18 +272,18 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, getMessageCount(queueControl)); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); - Assert.assertEquals(1, getMessageCount(queueControl)); + producer.send(session.createMessage(durable)); + assertMessageMetrics(queueControl, 1, durable); consumeMessages(1, session, queue); - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -271,7 +293,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, getMessageCount(queueControl)); @@ -302,15 +324,15 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, getMessagesAdded(queueControl)); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Assert.assertEquals(1, getMessagesAdded(queueControl)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Assert.assertEquals(2, getMessagesAdded(queueControl)); consumeMessages(2, session, queue); @@ -325,7 +347,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, false); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); @@ -351,13 +373,13 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getScheduledCount()); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay); producer.send(message); @@ -366,13 +388,17 @@ public class QueueControlTest extends ManagementTestBase { Thread.sleep(100); } - Assert.assertEquals(1, queueControl.getScheduledCount()); + assertScheduledMetrics(queueControl, 1, durable); + assertMessageMetrics(queueControl, 1, durable); + consumeMessages(0, session, queue); Thread.sleep(delay * 2); Assert.assertEquals(0, queueControl.getScheduledCount()); consumeMessages(1, session, queue); + assertMessageMetrics(queueControl, 0, durable); + assertScheduledMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -395,14 +421,14 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); queue = RandomUtil.randomSimpleString(); - transSession.createQueue(address, queue, null, false); + transSession.createQueue(address, RoutingType.MULTICAST, queue, null, durable); final QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = transSession.createProducer(address); for (int i = 0; i < numMsg; i++) { - ClientMessage message = transSession.createMessage(false); + ClientMessage message = transSession.createMessage(durable); message.putIntProperty(new SimpleString("seqno"), i); producer.send(message); } @@ -480,17 +506,17 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); int intValue = RandomUtil.randomInt(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); Queue srvqueue = server.locateQueue(queue); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); message.putIntProperty(new SimpleString("key"), intValue); producer.send(message); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); ClientConsumer consumer = session.createConsumer(queue); session.start(); @@ -525,20 +551,22 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); int intValue = RandomUtil.randomInt(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay); message.putIntProperty(new SimpleString("key"), intValue); producer.send(message); // unscheduled message - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Map<String, Object>[] messages = queueControl.listScheduledMessages(); Assert.assertEquals(1, messages.length); + assertScheduledMetrics(queueControl, 1, durable); + Assert.assertEquals(intValue, Integer.parseInt((messages[0].get("key")).toString())); Thread.sleep(delay + 500); @@ -557,7 +585,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); int intValue = RandomUtil.randomInt(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -567,7 +595,7 @@ public class QueueControlTest extends ManagementTestBase { message.putIntProperty(new SimpleString("key"), intValue); producer.send(message); // unscheduled message - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); String jsonString = queueControl.listScheduledMessagesAsJSON(); Assert.assertNotNull(jsonString); @@ -593,10 +621,10 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getDeliveringCount()); @@ -604,11 +632,11 @@ public class QueueControlTest extends ManagementTestBase { ClientConsumer consumer = session.createConsumer(queue); ClientMessage message = consumer.receive(500); Assert.assertNotNull(message); - Assert.assertEquals(1, queueControl.getDeliveringCount()); + assertDeliveringMetrics(queueControl, 1, durable); message.acknowledge(); session.commit(); - Assert.assertEquals(0, queueControl.getDeliveringCount()); + assertDeliveringMetrics(queueControl, 0, durable); consumer.close(); session.deleteQueue(queue); @@ -629,7 +657,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); try { - session.createQueue(address, RoutingType.ANYCAST, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); for (int i = 0; i < THREAD_COUNT; i++) { producerExecutor.submit(() -> { @@ -665,7 +693,8 @@ public class QueueControlTest extends ManagementTestBase { producerCountDown.await(30, TimeUnit.SECONDS); consumerCountDown.await(30, TimeUnit.SECONDS); - QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + QueueControl queueControl = createManagementControl(address, queue, RoutingType.MULTICAST); + Thread.sleep(200); Assert.assertEquals(0, queueControl.getMessageCount()); Assert.assertEquals(0, queueControl.getConsumerCount()); Assert.assertEquals(0, queueControl.getDeliveringCount()); @@ -694,12 +723,12 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); int intValue = RandomUtil.randomInt(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); message.putIntProperty(new SimpleString("key"), intValue); producer.send(message); @@ -731,17 +760,18 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); + assertMessageMetrics(queueControl, 2, durable); Map<String, Object>[] messages = queueControl.listMessages(filter); Assert.assertEquals(1, messages.length); Assert.assertEquals(matchingValue, Long.parseLong(messages[0].get("key").toString())); @@ -750,6 +780,7 @@ public class QueueControlTest extends ManagementTestBase { messages = queueControl.listMessages(filter); Assert.assertEquals(0, messages.length); + assertMessageMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -759,12 +790,12 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); Map<String, Object>[] messages = queueControl.listMessages(null); Assert.assertEquals(2, messages.length); @@ -782,12 +813,12 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); Map<String, Object>[] messages = queueControl.listMessages(""); Assert.assertEquals(2, messages.length); @@ -810,14 +841,14 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); @@ -853,8 +884,8 @@ public class QueueControlTest extends ManagementTestBase { AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla); server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings); - session.createQueue(dla, dlq, null, false); - session.createQueue(adName, qName, null, false); + session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable); + session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable); // Send message to queue. ClientProducer producer = session.createProducer(adName); @@ -874,7 +905,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertNull(clientMessage); QueueControl queueControl = createManagementControl(dla, dlq); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); final long messageID = getFirstMessageId(queueControl); // Retry the message - i.e. it should go from DLQ to original Queue. @@ -882,6 +913,7 @@ public class QueueControlTest extends ManagementTestBase { // Assert DLQ is empty... Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); // .. and that the message is now on the original queue once more. clientMessage = clientConsumer.receive(500); @@ -909,8 +941,8 @@ public class QueueControlTest extends ManagementTestBase { server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), addressSettings); // create target queue, DLQ and source topic - session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false); - session.createQueue(forwardingAddress, RoutingType.ANYCAST, forwardingQueue, null, false); + session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable); + session.createQueue(forwardingAddress, RoutingType.MULTICAST, forwardingQueue, null, durable); session.createAddress(myTopic, RoutingType.MULTICAST, false); DivertConfiguration divert = new DivertConfiguration().setName("local-divert") @@ -935,15 +967,16 @@ public class QueueControlTest extends ManagementTestBase { clientMessage = clientConsumer.receiveImmediate(); Assert.assertNull(clientMessage); - QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.ANYCAST); - Assert.assertEquals(1, getMessageCount(queueControl)); + QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.MULTICAST); + assertMessageMetrics(queueControl, 1, durable); + final long messageID = getFirstMessageId(queueControl); // Retry the message - i.e. it should go from DLQ to original Queue. Assert.assertTrue(queueControl.retryMessage(messageID)); // Assert DLQ is empty... - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); // .. and that the message is now on the original queue once more. clientMessage = clientConsumer.receive(500); @@ -970,8 +1003,8 @@ public class QueueControlTest extends ManagementTestBase { AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla); server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings); - session.createQueue(dla, dlq, null, false); - session.createQueue(adName, qName, null, false); + session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable); + session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable); // Send message to queue. ClientProducer producer = session.createProducer(adName); @@ -1014,13 +1047,13 @@ public class QueueControlTest extends ManagementTestBase { assertTrue(queueMemorySize2.get() > 0); QueueControl dlqQueueControl = createManagementControl(dla, dlq); - Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl)); + assertMessageMetrics(dlqQueueControl, numMessagesToTest, durable); // Retry all messages - i.e. they should go from DLQ to original Queue. Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages()); // Assert DLQ is empty... - Assert.assertEquals(0, getMessageCount(dlqQueueControl)); + assertMessageMetrics(dlqQueueControl, 0, durable); //Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move assertTrue(queueMemorySize1.get() > 0); @@ -1056,12 +1089,12 @@ public class QueueControlTest extends ManagementTestBase { SimpleString otherAddress = RandomUtil.randomSimpleString(); SimpleString otherQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); - session.createQueue(otherAddress, otherQueue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); SimpleString key = RandomUtil.randomSimpleString(); long value = RandomUtil.randomLong(); message.putLongProperty(key, value); @@ -1076,7 +1109,7 @@ public class QueueControlTest extends ManagementTestBase { AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); //verify memory usage is greater than 0 Assert.assertTrue(queueMemorySize.get() > 0); @@ -1084,7 +1117,7 @@ public class QueueControlTest extends ManagementTestBase { // moved all messages to otherQueue int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString()); Assert.assertEquals(1, movedMessagesCount); - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); //verify memory usage is 0 after move Assert.assertEquals(0, queueMemorySize.get()); @@ -1111,9 +1144,9 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queueB = new SimpleString("B"); SimpleString queueC = new SimpleString("C"); - server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, false); - server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, false); - server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, false); + server.createQueue(address, RoutingType.MULTICAST, queueA, null, durable, false); + server.createQueue(address, RoutingType.MULTICAST, queueB, null, durable, false); + server.createQueue(address, RoutingType.MULTICAST, queueC, null, durable, false); QueueControl queueControlA = createManagementControl(address, queueA); @@ -1174,18 +1207,18 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); SimpleString unknownQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); SimpleString key = RandomUtil.randomSimpleString(); long value = RandomUtil.randomLong(); message.putLongProperty(key, value); producer.send(message); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); // moved all messages to unknown queue try { @@ -1194,6 +1227,7 @@ public class QueueControlTest extends ManagementTestBase { } catch (Exception e) { } Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); consumeMessages(1, session, queue); @@ -1208,7 +1242,6 @@ public class QueueControlTest extends ManagementTestBase { * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li> * </ol> */ - @Test public void testMoveMessagesWithFilter() throws Exception { SimpleString key = new SimpleString("key"); @@ -1220,20 +1253,20 @@ public class QueueControlTest extends ManagementTestBase { SimpleString otherAddress = RandomUtil.randomSimpleString(); SimpleString otherQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); - session.createQueue(otherAddress, otherQueue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + session.createQueue(otherAddress, RoutingType.MULTICAST,otherQueue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // moved matching messages to otherQueue int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString()); @@ -1267,18 +1300,18 @@ public class QueueControlTest extends ManagementTestBase { SimpleString otherAddress = RandomUtil.randomSimpleString(); SimpleString otherQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); - session.createQueue(otherAddress, otherQueue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, null, durable); ClientProducer producer = session.createProducer(address); // send 2 messages on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); - Assert.assertEquals(2, getMessageCount(queueControl)); - Assert.assertEquals(0, getMessageCount(otherQueueControl)); + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 0, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1287,8 +1320,8 @@ public class QueueControlTest extends ManagementTestBase { boolean moved = queueControl.moveMessage(messageID, otherQueue.toString()); Assert.assertTrue(moved); - Assert.assertEquals(1, getMessageCount(queueControl)); - Assert.assertEquals(1, getMessageCount(otherQueueControl)); + assertMessageMetrics(queueControl, 1, durable); + assertMessageMetrics(otherQueueControl, 1, durable); consumeMessages(1, session, queue); consumeMessages(1, session, otherQueue); @@ -1303,14 +1336,14 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); SimpleString unknownQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send 2 messages on queue - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1347,19 +1380,19 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" + matchingValue); @@ -1391,24 +1424,24 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " =" + matchingValue); Assert.assertEquals(1, removedMatchedMessagesCount); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -1431,20 +1464,20 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(null); Assert.assertEquals(2, removedMatchedMessagesCount); - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -1454,20 +1487,20 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeAllMessages(); Assert.assertEquals(2, removedMatchedMessagesCount); - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -1477,20 +1510,20 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // removed matching messages to otherQueue int removedMatchedMessagesCount = queueControl.removeMessages(""); Assert.assertEquals(2, removedMatchedMessagesCount); - Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); session.deleteQueue(queue); } @@ -1500,15 +1533,15 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send 2 messages on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1518,7 +1551,7 @@ public class QueueControlTest extends ManagementTestBase { // delete 1st message boolean deleted = queueControl.removeMessage(messageID); Assert.assertTrue(deleted); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); // check there is a single message to consume from queue consumeMessages(1, session, queue); @@ -1531,15 +1564,15 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send 2 messages on queue, both scheduled long timeout = System.currentTimeMillis() + 5000; - ClientMessage m1 = session.createMessage(true); + ClientMessage m1 = session.createMessage(durable); m1.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout); producer.send(m1); - ClientMessage m2 = session.createMessage(true); + ClientMessage m2 = session.createMessage(durable); m2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout); producer.send(m2); @@ -1554,7 +1587,7 @@ public class QueueControlTest extends ManagementTestBase { // delete 1st message boolean deleted = queueControl.removeMessage(messageID); Assert.assertTrue(deleted); - Assert.assertEquals(1, queueControl.getScheduledCount()); + assertScheduledMetrics(queueControl, 1, durable); // check there is a single message to consume from queue while (timeout > System.currentTimeMillis() && queueControl.getScheduledCount() == 1) { @@ -1571,14 +1604,14 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send messages on queue for (int i = 0; i < 100; i++) { - ClientMessage msg = session.createMessage(false); + ClientMessage msg = session.createMessage(durable); msg.putIntProperty("count", i); producer.send(msg); } @@ -1592,7 +1625,7 @@ public class QueueControlTest extends ManagementTestBase { } QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(100, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 100, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1604,7 +1637,7 @@ public class QueueControlTest extends ManagementTestBase { // delete 1st message boolean deleted = queueControl.removeMessage(messageID); Assert.assertTrue(deleted); - Assert.assertEquals(99, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 99, durable); cons.close(); @@ -1623,13 +1656,13 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(matchingMessage); producer.send(unmatchingMessage); @@ -1637,6 +1670,7 @@ public class QueueControlTest extends ManagementTestBase { QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(3, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 3, durable); Assert.assertEquals(2, queueControl.countMessages(key + " =" + matchingValue)); Assert.assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue)); @@ -1653,18 +1687,18 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue for (int i = 0; i < 100; i++) { - ClientMessage msg = session.createMessage(false); + ClientMessage msg = session.createMessage(durable); msg.putStringProperty(key, SimpleString.toSimpleString(matchingValue)); producer.send(msg); } for (int i = 0; i < 10; i++) { - ClientMessage msg = session.createMessage(false); + ClientMessage msg = session.createMessage(durable); msg.putStringProperty(key, SimpleString.toSimpleString(nonMatchingValue)); producer.send(msg); } @@ -1679,7 +1713,7 @@ public class QueueControlTest extends ManagementTestBase { assertNull(consumer.receiveImmediate()); QueueControl queueControl = createManagementControl(address, queue); - Assert.assertEquals(110, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 110, durable); Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty like \'%Temp/88\'")); @@ -1700,14 +1734,14 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - ClientMessage matchingMessage = session.createMessage(false); + ClientMessage matchingMessage = session.createMessage(durable); matchingMessage.putLongProperty(key, matchingValue); producer.send(matchingMessage); - ClientMessage unmatchingMessage = session.createMessage(false); + ClientMessage unmatchingMessage = session.createMessage(durable); unmatchingMessage.putLongProperty(key, unmatchingValue); producer.send(unmatchingMessage); @@ -1716,7 +1750,7 @@ public class QueueControlTest extends ManagementTestBase { int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue); Assert.assertEquals(1, expiredMessagesCount); - Assert.assertEquals(1, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 1, durable); // consume the unmatched message from queue ClientConsumer consumer = session.createConsumer(queue); @@ -1742,17 +1776,17 @@ public class QueueControlTest extends ManagementTestBase { SimpleString expiryAddress = RandomUtil.randomSimpleString(); SimpleString expiryQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); - session.createQueue(expiryAddress, expiryQueue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + session.createQueue(expiryAddress, RoutingType.MULTICAST, expiryQueue, null, durable); ClientProducer producer = session.createProducer(address); // send on queue - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); QueueControl expiryQueueControl = createManagementControl(expiryAddress, expiryQueue); - Assert.assertEquals(1, getMessageCount(queueControl)); - Assert.assertEquals(0, getMessageCount(expiryQueueControl)); + assertMessageMetrics(queueControl, 1, durable); + assertMessageMetrics(expiryQueueControl, 0, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1764,8 +1798,9 @@ public class QueueControlTest extends ManagementTestBase { boolean expired = queueControl.expireMessage(messageID); Assert.assertTrue(expired); - Assert.assertEquals(0, getMessageCount(queueControl)); - Assert.assertEquals(1, getMessageCount(expiryQueueControl)); + Thread.sleep(200); + assertMessageMetrics(queueControl, 0, durable); + assertMessageMetrics(expiryQueueControl, 1, durable); consumeMessages(0, session, queue); consumeMessages(1, session, expiryQueue); @@ -1782,17 +1817,17 @@ public class QueueControlTest extends ManagementTestBase { SimpleString deadLetterAddress = RandomUtil.randomSimpleString(); SimpleString deadLetterQueue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); - session.createQueue(deadLetterAddress, deadLetterQueue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + session.createQueue(deadLetterAddress, RoutingType.MULTICAST, deadLetterQueue, null, durable); ClientProducer producer = session.createProducer(address); // send 2 messages on queue - producer.send(session.createMessage(false)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); QueueControl queueControl = createManagementControl(address, queue); QueueControl deadLetterQueueControl = createManagementControl(deadLetterAddress, deadLetterQueue); - Assert.assertEquals(2, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 2, durable); // the message IDs are set on the server Map<String, Object>[] messages = queueControl.listMessages(null); @@ -1805,8 +1840,9 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(0, getMessageCount(deadLetterQueueControl)); boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID); Assert.assertTrue(movedToDeadLetterAddress); - Assert.assertEquals(1, getMessageCount(queueControl)); - Assert.assertEquals(1, getMessageCount(deadLetterQueueControl)); + assertMessageMetrics(queueControl, 1, durable); + Thread.sleep(200); + assertMessageMetrics(deadLetterQueueControl, 1, durable); // check there is a single message to consume from queue consumeMessages(1, session, queue); @@ -1826,10 +1862,10 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); message.setPriority(originalPriority); producer.send(message); @@ -1860,10 +1896,10 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); producer.send(message); QueueControl queueControl = createManagementControl(address, queue); @@ -1894,7 +1930,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); @@ -1908,7 +1944,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(0, info.getCount()); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Thread.sleep(200); jsonString = queueControl.listMessageCounter(); @@ -1918,7 +1954,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(1, info.getCount()); Assert.assertEquals(1, info.getCountDelta()); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Thread.sleep(200); jsonString = queueControl.listMessageCounter(); @@ -1946,7 +1982,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); @@ -1960,7 +1996,7 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(0, info.getCount()); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2); jsonString = queueControl.listMessageCounter(); @@ -1998,7 +2034,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); String history = queueControl.listMessageCounterAsHTML(); @@ -2013,7 +2049,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); @@ -2033,7 +2069,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); @@ -2049,8 +2085,8 @@ public class QueueControlTest extends ManagementTestBase { @Test public void testMoveMessagesBack() throws Exception { - server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false); - server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false); + server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, durable, false); + server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, durable, false); ServerLocator locator = createInVMNonHALocator(); @@ -2061,7 +2097,7 @@ public class QueueControlTest extends ManagementTestBase { ClientProducer prod1 = session.createProducer("q1"); for (int i = 0; i < 10; i++) { - ClientMessage msg = session.createMessage(true); + ClientMessage msg = session.createMessage(durable); msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i)); @@ -2113,8 +2149,8 @@ public class QueueControlTest extends ManagementTestBase { @Test public void testMoveMessagesBack2() throws Exception { - server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false); - server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false); + server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, durable, false); + server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, durable, false); ServerLocator locator = createInVMNonHALocator(); @@ -2127,7 +2163,7 @@ public class QueueControlTest extends ManagementTestBase { int NUMBER_OF_MSGS = 10; for (int i = 0; i < NUMBER_OF_MSGS; i++) { - ClientMessage msg = session.createMessage(true); + ClientMessage msg = session.createMessage(durable); msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i)); @@ -2191,7 +2227,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString queue = RandomUtil.randomSimpleString(); try { - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); @@ -2203,7 +2239,6 @@ public class QueueControlTest extends ManagementTestBase { queueControl.resume(); Assert.assertFalse(queueControl.isPaused()); } catch (Exception e) { - // TODO Auto-generated catch block e.printStackTrace(); } } @@ -2213,15 +2248,15 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, getMessagesAdded(queueControl)); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Assert.assertEquals(1, getMessagesAdded(queueControl)); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); Assert.assertEquals(2, getMessagesAdded(queueControl)); consumeMessages(2, session, queue); @@ -2240,16 +2275,16 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); consumeMessages(1, session, queue); Assert.assertEquals(1, queueControl.getMessagesAcknowledged()); - producer.send(session.createMessage(false)); + producer.send(session.createMessage(durable)); consumeMessages(1, session, queue); Assert.assertEquals(2, queueControl.getMessagesAcknowledged()); @@ -2265,13 +2300,13 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getMessagesExpired()); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); producer.send(message); // the message IDs are set on the server @@ -2282,7 +2317,7 @@ public class QueueControlTest extends ManagementTestBase { queueControl.expireMessage(messageID); Assert.assertEquals(1, queueControl.getMessagesExpired()); - message = session.createMessage(false); + message = session.createMessage(durable); producer.send(message); // the message IDs are set on the server @@ -2305,13 +2340,13 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(0, queueControl.getMessagesExpired()); ClientProducer producer = session.createProducer(address); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(durable); producer.send(message); // the message IDs are set on the server @@ -2321,6 +2356,7 @@ public class QueueControlTest extends ManagementTestBase { queueControl.sendMessageToDeadLetterAddress(messageID); Assert.assertEquals(1, queueControl.getMessagesKilled()); + assertMessageMetrics(queueControl, 0, durable); message = session.createMessage(false); producer.send(message); @@ -2354,7 +2390,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString testQueueName = new SimpleString("newQueue"); String testQueueName2 = "newQueue2"; - this.server.createQueue(testQueueName, RoutingType.ANYCAST, testQueueName, null, false, false); + this.server.createQueue(testQueueName, RoutingType.MULTICAST, testQueueName, null, durable, false); Notification notif = listener.getNotification(); @@ -2369,7 +2405,7 @@ public class QueueControlTest extends ManagementTestBase { ActiveMQServerControl control = ManagementControlHelper.createActiveMQServerControl(mbeanServer); - control.createQueue(testQueueName2, testQueueName2); + control.createQueue(testQueueName2, testQueueName2, RoutingType.MULTICAST.toString()); notif = listener.getNotification(); System.out.println("got notif: " + notif); @@ -2387,7 +2423,7 @@ public class QueueControlTest extends ManagementTestBase { SimpleString address = RandomUtil.randomSimpleString(); SimpleString queue = RandomUtil.randomSimpleString(); - session.createQueue(address, queue, null, false); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); QueueControl queueControl = createManagementControl(address, queue); @@ -2438,4 +2474,46 @@ public class QueueControlTest extends ManagementTestBase { JsonObject object = (JsonObject) array.get(0); return object.getJsonNumber("messageID").longValue(); } + + protected void assertMessageMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception { + assertMetrics(queueControl, messageCount, durable, queueControl::getMessageCount, + queueControl::getPersistentSize, queueControl::getDurableMessageCount, queueControl::getDurablePersistentSize); + } + + protected void assertScheduledMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception { + assertMetrics(queueControl, messageCount, durable, queueControl::getScheduledCount, + queueControl::getScheduledSize, queueControl::getDurableScheduledCount, queueControl::getDurableScheduledSize); + } + + protected void assertDeliveringMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception { + assertMetrics(queueControl, messageCount, durable, queueControl::getDeliveringCount, + queueControl::getDeliveringSize, queueControl::getDurableDeliveringCount, queueControl::getDurableDeliveringSize); + } + + protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable, + Supplier<Number> count, Supplier<Number> size, + Supplier<Number>durableCount, Supplier<Number> durableSize) throws Exception { + + //make sure count stat equals message count + Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100)); + + if (messageCount > 0) { + //verify size stat greater than 0 + Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() > 0, 3, 100)); + + //If durable then make sure durable count and size are correct + if (durable) { + Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == messageCount, 3, 100)); + Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() > 0, 3, 100)); + } else { + Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == 0, 3, 100)); + Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() == 0, 3, 100)); + } + } else { + Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == 0, 3, 100)); + Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == 0, 3, 100)); + Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() == 0, 3, 100)); + Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() == 0, 3, 100)); + } + } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 09621af..aafbb5b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -16,16 +16,24 @@ */ package org.apache.activemq.artemis.tests.integration.management; -import javax.management.openmbean.CompositeData; import java.util.HashMap; import java.util.Map; +import javax.management.openmbean.CompositeData; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(value = Parameterized.class) public class QueueControlUsingCoreTest extends QueueControlTest { + public QueueControlUsingCoreTest(boolean durable) { + super(durable); + } + @Override protected QueueControl createManagementControl(final SimpleString address, final SimpleString queue) throws Exception { @@ -117,6 +125,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public long getDeliveringSize() { + return (Long) proxy.retrieveAttributeValue("deliveringSize", Long.class); + } + + @Override + public int getDurableDeliveringCount() { + return (Integer) proxy.retrieveAttributeValue("durableDeliveringCount", Integer.class); + } + + @Override + public long getDurableDeliveringSize() { + return (Long) proxy.retrieveAttributeValue("durableDeliveringSize", Long.class); + } + + @Override public String getExpiryAddress() { return (String) proxy.retrieveAttributeValue("expiryAddress"); } @@ -187,6 +210,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public long getScheduledSize() { + return (Long) proxy.retrieveAttributeValue("scheduledSize", Long.class); + } + + @Override + public long getDurableScheduledCount() { + return (Long) proxy.retrieveAttributeValue("durableScheduledCount", Long.class); + } + + @Override + public long getDurableScheduledSize() { + return (Long) proxy.retrieveAttributeValue("durableScheduledSize", Long.class); + } + + @Override public boolean isDurable() { return (Boolean) proxy.retrieveAttributeValue("durable"); } @@ -455,6 +493,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest { public String listDeliveringMessagesAsJSON() throws Exception { return (String) proxy.invokeOperation("listDeliveringMessagesAsJSON"); } + + @Override + public long getPersistentSize() { + return (Long) proxy.retrieveAttributeValue("persistentSize", Long.class); + } + + @Override + public long getDurableMessageCount() { + return (Long) proxy.retrieveAttributeValue("durableMessageCount", Long.class); + } + + @Override + public long getDurablePersistentSize() { + return (Long) proxy.retrieveAttributeValue("durablePersistentSize", Long.class); + } }; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java index 6f47ba7..4714580 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java @@ -89,15 +89,17 @@ public class PagingCounterTest extends ActiveMQTestBase { Transaction tx = new TransactionImpl(server.getStorageManager()); - counter.increment(tx, 1); + counter.increment(tx, 1, 1000); assertEquals(0, counter.getValue()); + assertEquals(0, counter.getPersistentSize()); tx.commit(); storage.waitOnOperations(); assertEquals(1, counter.getValue()); + assertEquals(1000, counter.getPersistentSize()); } finally { sf.close(); session.close(); @@ -121,7 +123,7 @@ public class PagingCounterTest extends ActiveMQTestBase { for (int i = 0; i < 2100; i++) { - counter.increment(tx, 1); + counter.increment(tx, 1, 1000); if (i % 200 == 0) { tx.commit(); @@ -129,6 +131,7 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); assertEquals(i + 1, counter.getValue()); + assertEquals((i + 1) * 1000, counter.getPersistentSize()); tx = new TransactionImpl(server.getStorageManager()); } @@ -139,6 +142,7 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); assertEquals(2100, counter.getValue()); + assertEquals(2100 * 1000, counter.getPersistentSize()); server.stop(); @@ -153,6 +157,7 @@ public class PagingCounterTest extends ActiveMQTestBase { counter = locateCounter(queue); assertEquals(2100, counter.getValue()); + assertEquals(2100 * 1000, counter.getPersistentSize()); } finally { sf.close(); @@ -180,7 +185,7 @@ public class PagingCounterTest extends ActiveMQTestBase { for (int i = 0; i < 2100; i++) { - counter.increment(tx, 1); + counter.increment(tx, 1, 1000); if (i % 200 == 0) { tx.commit(); @@ -188,6 +193,7 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); assertEquals(i + 1, counter.getValue()); + assertEquals((i + 1) * 1000, counter.getPersistentSize()); tx = new TransactionImpl(server.getStorageManager()); } @@ -198,6 +204,7 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); assertEquals(2100, counter.getValue()); + assertEquals(2100 * 1000, counter.getPersistentSize()); server.stop(); @@ -212,6 +219,7 @@ public class PagingCounterTest extends ActiveMQTestBase { counter = locateCounter(queue); assertEquals(0, counter.getValue()); + assertEquals(0, counter.getPersistentSize()); } finally { sf.close(); @@ -230,15 +238,17 @@ public class PagingCounterTest extends ActiveMQTestBase { Transaction tx = new TransactionImpl(server.getStorageManager()); - counter.increment(tx, 1); + counter.increment(tx, 1, 1000); assertEquals(0, counter.getValue()); + assertEquals(0, counter.getPersistentSize()); tx.commit(); storage.waitOnOperations(); assertEquals(1, counter.getValue()); + assertEquals(1000, counter.getPersistentSize()); sl.close(); @@ -255,6 +265,7 @@ public class PagingCounterTest extends ActiveMQTestBase { counter = locateCounter(queue); assertEquals(1, counter.getValue()); + assertEquals(1000, counter.getPersistentSize()); } @@ -283,7 +294,7 @@ public class PagingCounterTest extends ActiveMQTestBase { Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300); for (int i = 0; i < 2000; i++) { - counter.increment(tx, 1); + counter.increment(tx, 1, 1000); } assertEquals(0, counter.getValue()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java new file mode 100644 index 0000000..2247111 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.persistence.metrics; + +import java.util.Enumeration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public abstract class AbstractPersistentStatTestSupport extends JMSTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(AbstractPersistentStatTestSupport.class); + + protected static int defaultMessageSize = 1000; + + @Override + protected boolean usePersistence() { + return true; + } + + protected void consumeTestQueueMessages(String queueName, int num) throws Exception { + + // Start the connection + Connection connection = cf.createConnection(); + connection.setClientID("clientId2" + queueName); + connection.start(); + Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer; + try { + consumer = session.createConsumer(queue); + for (int i = 0; i < num; i++) { + consumer.receive(); + } + consumer.close(); + } finally { + // consumer.close(); + connection.close(); + } + + } + + protected void browseTestQueueMessages(String queueName) throws Exception { + // Start the connection + Connection connection = cf.createConnection(); + connection.setClientID("clientId2" + queueName); + connection.start(); + Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + QueueBrowser queueBrowser = session.createBrowser(queue); + @SuppressWarnings("unchecked") + Enumeration<Message> messages = queueBrowser.getEnumeration(); + while (messages.hasMoreElements()) { + messages.nextElement(); + } + + } finally { + connection.close(); + } + + } + + protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName, + AtomicLong publishedMessageSize) throws Exception { + + + Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + + try { + TopicSubscriber consumer = session.createDurableSubscriber(topic, sub); + for (int i = 0; i < size; i++) { + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); + if (publishedMessageSize != null) { + publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize()); + } + } + + } finally { + session.close(); + } + + } + + protected void publishTestQueueMessages(int count, String queueName, int deliveryMode, int messageSize, + AtomicLong publishedMessageSize, boolean transacted) throws Exception { + + // Start the connection + Connection connection = cf.createConnection(); + connection.setClientID("clientId" + queueName); + connection.start(); + Session session = transacted ? connection.createSession(transacted, QueueSession.SESSION_TRANSACTED) : + connection.createSession(transacted, QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + MessageProducer prod = session.createProducer(queue); + prod.setDeliveryMode(deliveryMode); + for (int i = 0; i < count; i++) { + prod.send(createMessage(i, session, messageSize, publishedMessageSize)); + } + + if (transacted) { + session.commit(); + } + } finally { + connection.close(); + } + } + + protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing, + boolean shared) + throws Exception { + this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize, + publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT, shared); + } + + protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing, + int deliveryMode, boolean shared) throws Exception { + + Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + for (String subName : subNames) { + if (shared) { + session.createSharedDurableConsumer(topic, subName); + } else { + session.createDurableSubscriber(topic, subName); + } + } + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(topic); + prod.setDeliveryMode(deliveryMode); + for (int i = 0; i < publishSize; i++) { + prod.send(createMessage(i, session, messageSize, publishedMessageSize)); + } + + } finally { + session.close(); + } + + } + + /** + * Generate random messages between 100 bytes and maxMessageSize + * + * @param session + * @return + * @throws JMSException + * @throws ActiveMQException + */ + protected BytesMessage createMessage(int count, Session session, int maxMessageSize, AtomicLong publishedMessageSize) + throws JMSException, ActiveMQException { + final ActiveMQBytesMessage message = (ActiveMQBytesMessage) session.createBytesMessage(); + + final Random randomSize = new Random(); + int size = randomSize.nextInt((maxMessageSize - 100) + 1) + 100; + final byte[] data = new byte[size]; + final Random rng = new Random(); + rng.nextBytes(data); + message.writeBytes(data); + if (publishedMessageSize != null) { + publishedMessageSize.addAndGet(message.getCoreMessage().getPersistentSize()); + } + + return message; + } +}
