http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
new file mode 100644
index 0000000..2facb98
--- /dev/null
+++
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -0,0 +1,1759 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class BrokerTest extends BrokerTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public byte destinationType;
+ public boolean durableConsumer;
+ protected static final int MAX_NULL_WAIT=500;
+
+ public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(1);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ Message m2 = receiveMessage(connection2);
+
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertNotNull("m2 is null for index: " + i, m2);
+
+ assertNotSame(m1.getMessageId(), m2.getMessageId());
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ connection2.send(createAck(consumerInfo2, m2, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTestQueueBrowserWith2Consumers() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testQueueBrowserWith2Consumers() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(10);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ //as the messages are sent async - need to synchronize the last
+ //one to ensure they arrive in the order we want
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Setup a second connection with a queue browser.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(1);
+ consumerInfo2.setBrowser(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+ List<Message> messages = new ArrayList<Message>();
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = messages.get(i);
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("m2 is null for index: " + i, m2);
+ assertEquals(m1.getMessageId(), m2.getMessageId());
+ connection2.send(createAck(consumerInfo2, m2, 1,
MessageAck.DELIVERED_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+
+ /*
+ * change the order of the above test
+ */
+ public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+ deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+
+ // Setup a second connection with a queue browser.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(10);
+ consumerInfo2.setBrowser(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(10);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ //as the messages are sent async - need to synchronize the last
+ //one to ensure they arrive in the order we want
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+
+ List<Message> messages = new ArrayList<Message>();
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ // no messages present in queue browser as there were no messages when
it
+ // was created
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+ deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(10);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Setup a second connection with a queue browser.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(1);
+ consumerInfo2.setBrowser(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ //as the messages are sent async - need to synchronize the last
+ //one to ensure they arrive in the order we want
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+
+ List<Message> messages = new ArrayList<Message>();
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = messages.get(i);
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("m2 is null for index: " + i, m2);
+ assertEquals(m1.getMessageId(), m2.getMessageId());
+ connection2.send(createAck(consumerInfo2, m2, 1,
MessageAck.DELIVERED_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+
+ public void initCombosForTestConsumerPrefetchAndStandardAck() {
+ addCombinationValues("deliveryMode", new Object[] {
+ // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testConsumerPrefetchAndStandardAck() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setPrefetchSize(1);
+ connection.send(consumerInfo);
+
+ // Send 3 messages to the broker.
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Make sure only 1 message was delivered.
+ Message m1 = receiveMessage(connection);
+ assertNotNull(m1);
+ assertNoMessagesLeft(connection);
+
+ // Acknowledge the first message. This should cause the next message to
+ // get dispatched.
+ connection.send(createAck(consumerInfo, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ Message m2 = receiveMessage(connection);
+ assertNotNull(m2);
+ connection.send(createAck(consumerInfo, m2, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ Message m3 = receiveMessage(connection);
+ assertNotNull(m3);
+ connection.send(createAck(consumerInfo, m3, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ connection.send(closeConnectionInfo(connectionInfo));
+ }
+
+ public void initCombosForTestTransactedAckWithPrefetchOfOne() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedAckWithPrefetchOfOne() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination,
deliveryMode);
+ connection1.send(message);
+ }
+
+
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.send(ack);
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1,
txid));
+ }
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestTransactedSend() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedSend() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination,
deliveryMode);
+ message.setTransactionId(txid);
+ connection1.request(message);
+ }
+
+ // The point of this test is that message should not be delivered until
+ // send is committed.
+ assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestQueueTransactedAck() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueTransactedAck() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination,
deliveryMode);
+ connection1.send(message);
+ }
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Acknowledge the first 2 messages.
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.request(ack);
+ }
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // The queue should now only have the remaining 2 messages
+ assertEquals(2, countMessagesInQueue(connection1, connectionInfo1,
destination));
+ }
+
+ public void initCombosForTestConsumerCloseCausesRedelivery() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new
ActiveMQQueue("TEST")});
+ }
+
+ public void testConsumerCloseCausesRedelivery() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // Receive the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertFalse(m1.isRedelivered());
+ }
+
+ // Close the consumer without acking.. this should cause re-delivery of
+ // the messages.
+ connection1.send(consumerInfo1.createRemoveCommand());
+
+ // Create another consumer that should get the messages again.
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo2.setPrefetchSize(100);
+ connection1.request(consumerInfo2);
+
+ // Receive the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertTrue(m1.isRedelivered());
+ }
+ assertNoMessagesLeft(connection1);
+
+ }
+
+ public void testTopicDurableSubscriptionCanBeRestored() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("clientid1");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setSubscriptionName("test");
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo1, destination,
DeliveryMode.PERSISTENT));
+ connection1.send(createMessage(producerInfo1, destination,
DeliveryMode.PERSISTENT));
+ connection1.send(createMessage(producerInfo1, destination,
DeliveryMode.PERSISTENT));
+ connection1.request(createMessage(producerInfo1, destination,
DeliveryMode.PERSISTENT));
+
+ // Get the messages
+ Message m = null;
+ for (int i = 0; i < 2; i++) {
+ m = receiveMessage(connection1);
+ assertNotNull(m);
+ }
+ // Ack the last message.
+ connection1.send(createAck(consumerInfo1, m, 2,
MessageAck.STANDARD_ACK_TYPE));
+ // Close the connection.
+ connection1.request(closeConnectionInfo(connectionInfo1));
+ connection1.stop();
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ connectionInfo2.setClientId("clientid1");
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(100);
+ consumerInfo2.setSubscriptionName("test");
+
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(consumerInfo2);
+
+ // Get the rest of the messages
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertNotNull("m1 is null for index: " + i, m1);
+ }
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws
Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.send(consumerInfo1);
+
+ // Send the messages.
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo, destination,
deliveryMode);
+ message.setGroupID("TEST-GROUP");
+ message.setGroupSequence(i + 1);
+ connection1.request(message);
+ }
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(1);
+ connection2.send(consumerInfo2);
+
+ // All the messages should have been sent down connection 1.. just get
+ // the first 3
+ for (int i = 0; i < 3; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the first consumer.
+ connection1.request(closeConsumerInfo(consumerInfo1));
+
+ // The last messages should now go the the second consumer.
+ for (int i = 0; i < 1; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertNotNull("m1 is null for index: " + i, m1);
+ connection2.request(createAck(consumerInfo2, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE,
Boolean.FALSE});
+ }
+
+ public void testTopicConsumerOnlySeeMessagesAfterCreation() throws
Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("A");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // Send the 1st message
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // Create the durable subscription.
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ if (durableConsumer) {
+ consumerInfo1.setSubscriptionName("test");
+ }
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ Message m = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(m);
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // Subscription should skip over the first message
+ Message m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ assertEquals(m.getMessageId(), m2.getMessageId());
+ m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void
initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE,
Boolean.FALSE});
+ }
+
+ public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws
Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("A");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // Send the messages
+ Message m = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(m);
+
+ // Create the durable subscription.
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ if (durableConsumer) {
+ consumerInfo1.setSubscriptionName("test");
+ }
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setRetroactive(true);
+ connection1.send(consumerInfo1);
+
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.request(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // the behavior is VERY dependent on the recovery policy used.
+ // But the default broker settings try to make it as consistent as
+ // possible
+
+ // Subscription should see all messages sent.
+ Message m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ assertEquals(m.getMessageId(), m2.getMessageId());
+ for (int i = 0; i < 2; i++) {
+ m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ //
+ // TODO: need to reimplement this since we don't fail when we send to a
+ // non-existant
+ // destination. But if we can access the Region directly then we should be
+ // able to
+ // check that if the destination was removed.
+ //
+ // public void initCombosForTestTempDestinationsRemovedOnConnectionClose()
{
+ // addCombinationValues( "deliveryMode", new Object[]{
+ // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ // Integer.valueOf(DeliveryMode.PERSISTENT)} );
+ // addCombinationValues( "destinationType", new Object[]{
+ // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
+ // }
+ //
+ // public void testTempDestinationsRemovedOnConnectionClose() throws
+ // Exception {
+ //
+ // // Setup a first connection
+ // StubConnection connection1 = createConnection();
+ // ConnectionInfo connectionInfo1 = createConnectionInfo();
+ // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ // connection1.send(connectionInfo1);
+ // connection1.send(sessionInfo1);
+ // connection1.send(producerInfo1);
+ //
+ // destination = createDestinationInfo(connection1, connectionInfo1,
+ // destinationType);
+ //
+ // StubConnection connection2 = createConnection();
+ // ConnectionInfo connectionInfo2 = createConnectionInfo();
+ // SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ // ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ // connection2.send(connectionInfo2);
+ // connection2.send(sessionInfo2);
+ // connection2.send(producerInfo2);
+ //
+ // // Send from connection2 to connection1's temp destination. Should
+ // succeed.
+ // connection2.send(createMessage(producerInfo2, destination,
+ // deliveryMode));
+ //
+ // // Close connection 1
+ // connection1.request(closeConnectionInfo(connectionInfo1));
+ //
+ // try {
+ // // Send from connection2 to connection1's temp destination. Should not
+ // succeed.
+ // connection2.request(createMessage(producerInfo2, destination,
+ // deliveryMode));
+ // fail("Expected JMSException.");
+ // } catch ( JMSException success ) {
+ // }
+ //
+ // }
+
+ // public void initCombosForTestTempDestinationsAreNotAutoCreated() {
+ // addCombinationValues( "deliveryMode", new Object[]{
+ // Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ // Integer.valueOf(DeliveryMode.PERSISTENT)} );
+ // addCombinationValues( "destinationType", new Object[]{
+ // Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ // Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
+ // }
+ //
+ //
+
+ // We create temp destination on demand now so this test case is no longer
+ // valid.
+ //
+ // public void testTempDestinationsAreNotAutoCreated() throws Exception {
+ //
+ // // Setup a first connection
+ // StubConnection connection1 = createConnection();
+ // ConnectionInfo connectionInfo1 = createConnectionInfo();
+ // SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ // ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ // connection1.send(connectionInfo1);
+ // connection1.send(sessionInfo1);
+ // connection1.send(producerInfo1);
+ //
+ // destination =
+ //
ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1",
+ // destinationType);
+ //
+ // // Should not be able to send to a non-existant temp destination.
+ // try {
+ // connection1.request(createMessage(producerInfo1, destination,
+ // deliveryMode));
+ // fail("Expected JMSException.");
+ // } catch ( JMSException success ) {
+ // }
+ //
+ // }
+
+
+ public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testExclusiveQueueDeliversToOnlyOneConsumer() throws Exception
{
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(1);
+ consumerInfo1.setExclusive(true);
+ connection1.send(consumerInfo1);
+
+ // Send a message.. this should make consumer 1 the exclusive owner.
+ connection1.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setPrefetchSize(1);
+ consumerInfo2.setExclusive(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+ // Second message should go to consumer 1 even though consumer 2 is
+ // ready
+ // for dispatch.
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Acknowledge the first 2 messages
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the first consumer.
+ connection1.send(closeConsumerInfo(consumerInfo1));
+
+ // The last two messages should now go the the second consumer.
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertNotNull(m1);
+ connection2.send(createAck(consumerInfo2, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTestWildcardConsume() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testWildcardConsume() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // setup the wildcard consumer.
+ ActiveMQDestination compositeDestination =
ActiveMQDestination.createDestination("WILD.*.TEST",
+
destinationType);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
compositeDestination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // These two message should NOT match the wild card.
+ connection1.send(createMessage(producerInfo1,
ActiveMQDestination.createDestination("WILD.CARD",
+
destinationType),
+ deliveryMode));
+ connection1.send(createMessage(producerInfo1,
ActiveMQDestination.createDestination("WILD.TEST",
+
destinationType),
+ deliveryMode));
+
+ // These two message should match the wild card.
+ ActiveMQDestination d1 =
ActiveMQDestination.createDestination("WILD.CARD.TEST", destinationType);
+ connection1.send(createMessage(producerInfo1, d1, deliveryMode));
+
+ Message m = receiveMessage(connection1);
+ assertNotNull(m);
+ assertEquals(d1, m.getDestination());
+
+ ActiveMQDestination d2 =
ActiveMQDestination.createDestination("WILD.FOO.TEST", destinationType);
+ connection1.request(createMessage(producerInfo1, d2, deliveryMode));
+ m = receiveMessage(connection1);
+ assertNotNull(m);
+ assertEquals(d2, m.getDestination());
+
+ assertNoMessagesLeft(connection1);
+ connection1.send(closeConnectionInfo(connectionInfo1));
+ }
+
+ public void initCombosForTestCompositeConsume() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testCompositeConsume() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // setup the composite consumer.
+ ActiveMQDestination compositeDestination =
ActiveMQDestination.createDestination("A,B",
+
destinationType);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
compositeDestination);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Publish to the two destinations
+ ActiveMQDestination destinationA =
ActiveMQDestination.createDestination("A", destinationType);
+ ActiveMQDestination destinationB =
ActiveMQDestination.createDestination("B", destinationType);
+
+ // Send a message to each destination .
+ connection1.send(createMessage(producerInfo1, destinationA,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destinationB,
deliveryMode));
+
+ // The consumer should get both messages.
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ }
+
+ assertNoMessagesLeft(connection1);
+ connection1.send(closeConnectionInfo(connectionInfo1));
+ }
+
+ public void initCombosForTestCompositeSend() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testCompositeSend() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ActiveMQDestination destinationA =
ActiveMQDestination.createDestination("A", destinationType);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destinationA);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ ActiveMQDestination destinationB =
ActiveMQDestination.createDestination("B", destinationType);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destinationB);
+ consumerInfo2.setRetroactive(true);
+ consumerInfo2.setPrefetchSize(100);
+ connection2.request(consumerInfo2);
+
+ // Send the messages to the composite destination.
+ ActiveMQDestination compositeDestination =
ActiveMQDestination.createDestination("A,B",
+
destinationType);
+ for (int i = 0; i < 4; i++) {
+ connection1.request(createMessage(producerInfo1,
compositeDestination, deliveryMode));
+ }
+
+ // The messages should have been delivered to both the A and B
+ // destination.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ Message m2 = receiveMessage(connection2);
+
+ assertNotNull(m1);
+ assertNotNull(m2);
+
+ assertEquals(m1.getMessageId(), m2.getMessageId());
+ assertEquals(compositeDestination, m1.getOriginalDestination());
+ assertEquals(compositeDestination, m2.getOriginalDestination());
+
+ connection1.request(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ connection2.request(createAck(consumerInfo2, m2, 1,
MessageAck.STANDARD_ACK_TYPE));
+
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+
+ connection1.send(closeConnectionInfo(connectionInfo1));
+ connection2.send(closeConnectionInfo(connectionInfo2));
+ }
+
+ public void initCombosForTestConnectionCloseCascades() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new
ActiveMQTopic("TEST"),
+ new
ActiveMQQueue("TEST")});
+ }
+
+ public void testConnectionCloseCascades() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setNoLocal(true);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // Send the messages
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // give the async ack a chance to perculate and validate all are
currently consumed
+ Message msg = receiveMessage(connection1, MAX_NULL_WAIT);
+ assertNull("all messages were received " + msg, msg);
+
+ // Close the connection, this should in turn close the consumer.
+ connection1.request(closeConnectionInfo(connectionInfo1));
+
+ // Send another message, connection1 should not get the message.
+ connection2.request(createMessage(producerInfo2, destination,
deliveryMode));
+
+ assertNull("no message received", receiveMessage(connection1,
MAX_NULL_WAIT));
+ }
+
+ public void initCombosForTestSessionCloseCascades() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new
ActiveMQTopic("TEST"),
+ new
ActiveMQQueue("TEST")});
+ }
+
+ public void testSessionCloseCascades() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setNoLocal(true);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // Send the messages
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the session, this should in turn close the consumer.
+ connection1.request(closeSessionInfo(sessionInfo1));
+
+ // Send another message, connection1 should not get the message.
+ connection2.request(createMessage(producerInfo2, destination,
deliveryMode));
+
+ Message msg = receiveMessage(connection1,MAX_NULL_WAIT);
+ assertNull("no message received from connection1 after session close",
msg);
+ }
+
+ public void initCombosForTestConsumerClose() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new
ActiveMQTopic("TEST"),
+ new
ActiveMQQueue("TEST")});
+ }
+
+ public void testConsumerClose() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setNoLocal(true);
+ connection1.request(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // Send the messages
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+ connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ connection1.send(createAck(consumerInfo1, m1, 1,
MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // give the async ack a chance to perculate and validate all are
currently consumed
+ // use receive rather than poll as broker info is sent async and may
still need to be dequeued
+ Message result = receiveMessage(connection1, MAX_NULL_WAIT);
+ assertNull("no more messages " + result, result);
+
+ // Close the consumer.
+ connection1.request(closeConsumerInfo(consumerInfo1));
+
+ // Send another message, connection1 should not get the message.
+ connection2.request(createMessage(producerInfo2, destination,
deliveryMode));
+
+ result = receiveMessage(connection1, MAX_NULL_WAIT);
+ assertNull("no message received after close " + result, result);
+ }
+
+ public void initCombosForTestTopicNoLocal() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testTopicNoLocal() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setNoLocal(true);
+ connection1.send(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setRetroactive(true);
+ consumerInfo2.setPrefetchSize(100);
+ consumerInfo2.setNoLocal(true);
+ connection2.send(consumerInfo2);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // The 2nd connection should get the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertNotNull(m1);
+ }
+
+ // Send a message with the 2nd connection
+ Message message = createMessage(producerInfo2, destination,
deliveryMode);
+ connection2.send(message);
+
+ // The first connection should not see the initial 4 local messages
sent
+ // but should
+ // see the messages from connection 2.
+ Message m = receiveMessage(connection1);
+ assertNotNull(m);
+ assertEquals(message.getMessageId(), m.getMessageId());
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTopicDispatchIsBroadcast() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testTopicDispatchIsBroadcast() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
+ consumerInfo2.setRetroactive(true);
+ consumerInfo2.setPrefetchSize(100);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(consumerInfo2);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination,
deliveryMode));
+
+ // Get the messages
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ m1 = receiveMessage(connection2);
+ assertNotNull(m1);
+ }
+ }
+
+ public void
initCombosForTestQueueDispatchedAreRedeliveredOnConsumerClose() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueDispatchedAreRedeliveredOnConsumerClose() throws
Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection1.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Get the messages
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertFalse(m1.isRedelivered());
+ }
+ // Close the consumer without sending any ACKS.
+ connection1.send(closeConsumerInfo(consumerInfo1));
+
+ // Drain any in flight messages..
+ while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS)
!= null) {
+ }
+
+ // Add the second consumer
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo2.setPrefetchSize(100);
+ connection1.send(consumerInfo2);
+
+ // Make sure the messages were re delivered to the 2nd consumer.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertTrue(m1.isRedelivered());
+ }
+ }
+
+ public void initCombosForTestQueueBrowseMessages() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueBrowseMessages() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Use selector to skip first message.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setBrowser(true);
+ connection.send(consumerInfo);
+
+ for (int i = 0; i < 4; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ connection.send(createAck(consumerInfo, m, 1,
MessageAck.DELIVERED_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection);
+ }
+
+ public void initCombosForTestQueueSendThenAddConsumer() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueSendThenAddConsumer() throws Exception {
+
+ // Start a producer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ // Send a message to the broker.
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Start the consumer
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.send(consumerInfo);
+
+ // Make sure the message was delivered.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+
+ }
+
+ public void initCombosForTestQueueAckRemovesMessage() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueAckRemovesMessage() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ Message message1 = createMessage(producerInfo, destination,
deliveryMode);
+ Message message2 = createMessage(producerInfo, destination,
deliveryMode);
+ connection.send(message1);
+ connection.send(message2);
+
+ // Make sure the message was delivered.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.request(consumerInfo);
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m.getMessageId(), message1.getMessageId());
+
+ assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 2);
+ connection.send(createAck(consumerInfo, m, 1,
MessageAck.DELIVERED_ACK_TYPE));
+ assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 2);
+ connection.send(createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE));
+ assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 1);
+
+ }
+
+ public void initCombosForTestSelectorSkipsMessages() {
+ addCombinationValues("destination", new Object[] {new
ActiveMQTopic("TEST_TOPIC"),
+ new
ActiveMQQueue("TEST_QUEUE")});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testSelectorSkipsMessages() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setSelector("JMSType='last'");
+ connection.send(consumerInfo);
+
+ Message message1 = createMessage(producerInfo, destination,
deliveryMode);
+ message1.setType("first");
+ Message message2 = createMessage(producerInfo, destination,
deliveryMode);
+ message2.setType("last");
+ connection.send(message1);
+ connection.send(message2);
+
+ // Use selector to skip first message.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m.getMessageId(), message2.getMessageId());
+ connection.send(createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE));
+ connection.send(closeConsumerInfo(consumerInfo));
+
+ assertNoMessagesLeft(connection);
+ }
+
+ public void initCombosForTestAddConsumerThenSend() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testAddConsumerThenSend() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.send(consumerInfo);
+
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Make sure the message was delivered.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ }
+
+ public void initCombosForTestConsumerPrefetchAtOne() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testConsumerPrefetchAtOne() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setPrefetchSize(1);
+ connection.send(consumerInfo);
+
+ // Send 2 messages to the broker.
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Make sure only 1 message was delivered.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ assertNoMessagesLeft(connection);
+
+ }
+
+ public void initCombosForTestConsumerPrefetchAtTwo() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testConsumerPrefetchAtTwo() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setPrefetchSize(2);
+ connection.send(consumerInfo);
+
+ // Send 3 messages to the broker.
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Make sure only 1 message was delivered.
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ m = receiveMessage(connection);
+ assertNotNull(m);
+ assertNoMessagesLeft(connection);
+
+ }
+
+ public void initCombosForTestConsumerPrefetchAndDeliveredAck() {
+ addCombinationValues("deliveryMode", new Object[]
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+
Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[]
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testConsumerPrefetchAndDeliveredAck() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo,
destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setPrefetchSize(1);
+ connection.request(consumerInfo);
+
+ // Send 3 messages to the broker.
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.send(createMessage(producerInfo, destination,
deliveryMode));
+ connection.request(createMessage(producerInfo, destination,
deliveryMode));
+
+ // Make sure only 1 message was delivered.
+ Message m1 = receiveMessage(connection);
+ assertNotNull(m1);
+
+ assertNoMessagesLeft(connection);
+
+ // Acknowledge the first message. This should cause the next message to
+ // get dispatched.
+ connection.request(createAck(consumerInfo, m1, 1,
MessageAck.DELIVERED_ACK_TYPE));
+
+ Message m2 = receiveMessage(connection);
+ assertNotNull(m2);
+ connection.request(createAck(consumerInfo, m2, 1,
MessageAck.DELIVERED_ACK_TYPE));
+
+ Message m3 = receiveMessage(connection);
+ assertNotNull(m3);
+ connection.request(createAck(consumerInfo, m3, 1,
MessageAck.DELIVERED_ACK_TYPE));
+ }
+
+ public void testGetServices() throws Exception {
+ assertTrue(broker.getServices().length != 0);
+ }
+
+ public static Test suite() {
+ return suite(BrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}